python进程池和队列,python线程池和进程池
本文主要详细介绍Python对线程池线程安全队列的实现。本文中的示例代码非常详细,具有一定的参考价值。感兴趣的朋友可以参考一下。
00-1010一、线程池的构成二。线程安全队列III的实现。测试逻辑3.1。测试阻塞逻辑3.2。测试读取、写入和锁定逻辑。本例分享用Python实现线程池线程安全队列的具体代码,供大家参考。具体内容如下
目录
一个完整的线程池由以下几部分组成:线程安全队列、任务对象、线程处理对象和线程池对象。线程安全队列之一是实现线程池和任务队列的基础。在这一节中,我们通过互斥线程实现了一个简单且读安全的线程队列。Lock()和条件变量threading。线程包中的条件()。
一、线程池组成
包括put、pop、get和其他方法。为了保证线程安全,读写操作时要添加互斥体。并且pop操作可以设置等待时间来阻塞当前获取该元素的线程,当新元素被写入队列时,等待操作被条件变量取消。
类ThreadSafeQueue(对象):
def __init__(self,max_size=0):
self.queue=[]
自我。max _ size=max _ size # max _ size 0表示无穷大。
Self.lock=线程。Lock() #互斥体
self . condition=threading . condition()#条件变量
定义尺寸(自身):
获取当前队列的大小。
:返回:队列长度
#锁定
self.lock.acquire()
size=len(self.queue)
self.lock.release()
退货尺寸
定义放置(自身,项目):
将单个元素放入队列中。
:参数项目:
:返回:
#队列已满。max_size为0表示无穷大。
if self.max_size!=0且self.size()=self.max_size:
返回ThreadSafeException()
#锁定
self.lock.acquire()
self.queue.append(项目)
self.lock.release()
self.condition.acquire()
#通知等待读取的线程
self.condition.notify()
self.condition.release()
退货项目
定义批投放(自身,项目列表):
添加批量元素
:参数item_list:
:返回:
如果不是isinstance(item_list,list):
item_list=list(item_list)
RES=[item _ list中项目的self . put(item)]
返回资源
def pop(self,block=False,timeout=0):
从队列的头部获取元素。
:param block:阻塞线程?
:参数超时:等待时间
:返回:
if self.size()==0:
如果块:
self.condition.acquire()
self.condition.wait(timeout)
self.condition.release()
else:
return None
# 加锁
self.lock.acquire()
item = None
if len(self.queue):
item = self.queue.pop()
self.lock.release()
return item
def get(self, index):
"""
获取指定位置的元素
:param index:
:return:
"""
if self.size() == 0 or index >= self.size():
return None
# 加锁
self.lock.acquire()
item = self.queue[index]
self.lock.release()
return item
class ThreadSafeException(Exception):
pass
三、测试逻辑
3.1、测试阻塞逻辑
def thread_queue_test_1():thread_queue = ThreadSafeQueue(10)
def producer():
while True:
thread_queue.put(random.randint(0, 10))
time.sleep(2)
def consumer():
while True:
print(current time before pop is %d % time.time())
item = thread_queue.pop(block=True, timeout=3)
# item = thread_queue.get(2)
if item is not None:
print(get value from queue is %s % item)
else:
print(item)
print(current time after pop is %d % time.time())
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
测试结果:
我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。
3.2、测试读写加锁逻辑
def thread_queue_test_2():thread_queue = ThreadSafeQueue(10)
def producer():
while True:
thread_queue.put(random.randint(0, 10))
time.sleep(2)
def consumer(name):
while True:
item = thread_queue.pop(block=True, timeout=1)
# item = thread_queue.get(2)
if item is not None:
print(%s get value from queue is %s % (name, item))
else:
print(%s get value from queue is None % name)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=(thread1,))
t3 = threading.Thread(target=consumer, args=(thread2,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
测试结果:
生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。