本文主要详细介绍了python队列模块的相关信息,具有一定的参考价值。感兴趣的朋友可以参考一下。
队列用于多线程应用,多线程访问共享变量。对于多线程,队列在访问共享变量时是线程安全的。从queue的具体实现可以看出,queue使用了一个线程互斥体(pthread。Lock())和三个条件标量(pthread.condition())来确保线程安全。
关于队列的互斥和条件变量,请参考另一篇文章:python线程中的同步锁。
队列的用法如下:
进口雀雀
a=[1,2,3]
device_que=Queque.queue()
device_que.put(a)
device=device_que.get()
先看看它的初始化函数__init__(self,maxsize=0):
def __init__(self,maxsize=0):
self.maxsize=maxsize
自我。_init(maxsize)
每当队列发生变化时,必须持有# mutex。所有方法
#获取互斥体必须在返回之前释放它。互斥(体)…
#在三个条件之间共享,因此获取和
#释放条件也会获取和释放互斥体。
self.mutex=_threading。锁定()
# Notify not_empty每当一个项目被添加到队列时;a
#然后通知等待获取的线程。
self.not_empty=_threading。条件(自身互斥)
# Notify not_full每当从队列中移除项目时;
#然后通知等待放置的线程。
self.not_full=_threading。条件(自身互斥)
#每当未完成的任务数
#下降到零;等待加入()的线程被通知恢复
self.all_tasks_done=_threading。条件(自身互斥)
self.unfinished_tasks=0
定义队列时有一个默认参数maxsize。如果没有指定队列的长度,即manxsize=0,则队列的长度是无限的。如果定义了大于0的值,则队列长度为maxsize。
自我。_init(maxsize): deque deque,python自带的,用于存储元素。
Self.mutex mutex:获取队列状态的任何操作(empty()、qsize()等)。)或修改队列的内容(get、put等。)必须持有这个互斥体。有两种操作:require获取锁,release释放锁。同时互斥体同时被三个共享变量共享,也就是操作conditiond时的require和release操作,也就是操作互斥体。
Self.not_full条件变量:当一个元素被添加到队列中时,通知等待该元素被添加的其他线程,唤醒并等待所需互斥体,或者当一个线程从队列中取出一个元素时,通知其他线程唤醒并等待所需互斥体。
Self.not empty条件变量:一个线程向队列添加数据后,会调用self.not_empty.notify()通知其他线程,唤醒并等待require互斥,然后读取队列。
Self.all_tasks_done条件变量:消费者线程从队列中获取任务后,任务处理完成。当处理完队列中的所有任务后,调用queue.join()的线程将返回,表明队列中的任务已被处理。
Queue.put (self,item,block=true,timeout=none)函数:
申请互斥锁,获得互斥锁后,如果队列未满,则向队列中添加数据,并通知其他一些被阻塞的线程,并唤醒以等待所需的互斥锁。如果队列已满,它将等待。最终处理完成后释放互斥体。还有阻塞、非阻塞、超时等逻辑。你可以自己看看:
def put(自身、项目、块=真,超时=无):
''将项目放入队列。
如果可选参数“block”为true且“timeout”为None(默认值),
如有必要,阻塞,直到有空闲的插槽可用。如果“超时”为
一个非负数,它最多阻塞“超时”秒并引发
如果在该时间内没有可用的空闲槽,则为完全异常。
否则(' block '为false),如果有空闲槽,则将一个项目放入队列
立即可用,否则引发完整异常(“超时”)
在那种情况下被忽略)。
'''
self.not_full.acquire()
尝试:
如果self.maxsize为0:
如果不阻止:
如果自我. qsize()==self.maxsize:
全额加注
否则如果超时为无:
而自我. qsize()==self.maxsize:
self.not_full.wait()
否则如果超时0:
提升值错误(“超时"必须为非负数)
否则:
endtime=_time()超时
而自我. qsize()==self.maxsize:
剩余时间=结束时间-_时间()
如果剩余=0.0:
全额加注
self.not_full.wait(剩余)
自我. put(项目)
self.unfinished_tasks=1
self.not_empty.notify()
最后:
self.not_full.release()
queue.get(self,block=True,timeout=None)函数:
从队列中获取任务,并且从队列中移除此任务。首先尝试获取互斥锁,获取成功则队列中得到任务,如果此时队列为空,则等待等待生产者线程添加数据得到。到任务后,会调用self.not_full.notify()通知生产者线程,队列可以添加元素了。最后释放互斥锁。
def get(self,block=True,timeout=None):
' ' '从队列中移除并返回项目。
如果可选参数"阻止"为真实的且"超时"为无(默认值),
如有必要,在某项可用之前阻止。如果"超时"为
一个非负数,它最多阻塞"超时"秒并引发
如果在该时间内没有可用的项目,则为空异常。
否则('块'为假),如果一个项目立即
可用,否则引发空异常("超时"被忽略)
那样的话)。
'''
self.not_empty.acquire()
尝试:
如果不阻止:
如果不是自我. qsize():
举起空的
否则如果超时为无:
而不是自我. qsize():
self.not_empty.wait()
否则如果超时0:
提升值错误(“超时"必须为非负数)
否则:
endtime=_time()超时
而不是自我. qsize():
剩余时间=结束时间-_时间()
如果剩余=0.0:
举起空的
self.not_empty.wait(剩余)
物品=自身. get()
self.not_full.notify()
退货项目
最后:
self.not_empty.release()
queue.put_nowait():无阻塞的向队列中添加任务,当队列为满时,不等待,而是直接抛出全部异常,重点是理解block=False:
def put_nowait(self,item):
' ' '将项目放入队列中,而不阻止。
仅当空闲槽立即可用时,才将项目排入队列。
否则引发完整的异常。
'''
返回self.put(item,False)
queue.get_nowait():无阻塞的向队列中得到任务,当队列为空时,不等待,而是直接抛出空的异常,重点是理解block=False:
def get_nowait(self):
' ' ' '从队列中移除并返回项目,而不阻止。
只有当一个项目立即可用时,才获取该项目。否则
引发空异常。
'''
return self.get(False)
queue.qsize空满分别获取队列的长度,是否为空,是否已满等:
定义qsize(自身):
''返回队列的大概大小(不可靠!).'''
self.mutex.acquire()
n=自我. qsize()
self.mutex.release()
返回
定义空(自身):
''如果队列为空,则返回没错,否则返回假(不可靠!).'''
self.mutex.acquire()
n=非自我. qsize()
self.mutex.release()
返回
定义完整(自身):
''如果队列已满,则返回没错,否则返回假(不可靠!).'''
self.mutex.acquire()
n=0 self.maxsize==self ._qsize()
self.mutex.release()
返回
queue.join()阻塞等待队列中任务全部处理完毕,需要配合队列。任务完成使用:
定义任务_完成(自身):
""表示先前排队任务已完成。
由队列使用者线程使用。对于用于获取任务的每个get(),
对task_done()的后续调用告诉队列处理
上的任务完成了。
如果加入()当前被阻塞,当所有项目
已被处理(意味着收到了task_done()调用
对于已经放入()队列中的每个项目)。
如果调用次数多于项目数,则引发值错误
放在队列中。
'''
self.all_tasks_done.acquire()
尝试:
未完成=自我。未完成的任务-1
如果未完成=0:
如果未完成0:
引发值错误(" task _ done()调用次数过多")
self.all_tasks_done.notify_all()
self.unfinished_tasks=未完成
最后:
self.all_tasks_done.release()
定义联接(自身):
""会一直阻止,直到队列中的所有项目都已被获取和处理。
每当向中添加一项时,未完成任务的计数就会增加
排队。每当使用者线程调用task_done()时,计数就会下降
以指示该项已被检索到,并且对它的所有工作都已完成。
当未完成任务的数量降至零时,加入()会解除阻塞。
'''
self.all_tasks_done.acquire()
尝试:
while self.unfinished_tasks:
self.all_tasks_done.wait()
最后:
self.all_tasks_done.release()
长队模块除了长队线性安全队列(先进先出),还有优先级队列LifoQueue(后进先出),也就是新添加的先被得到到优先级队列具有优先级的队列,即队列中的元素是一个元祖类型,(优先级级别,数据)。
类别优先级队列(队列):
' ' ' ' '按优先级顺序(最低优先)检索未结条目的队列变量。
条目通常是以下形式的元组:(优先级数,数据)。
'''
def _init(self,maxsize):
self.queue=[]
def _qsize(self,len=len):
返回len(自身队列)
def _put(self,item,heappush=heapq.heappush):
希普什(自我队列,项目)
def _get(self,heappop=heapq.heappop):
返回heappop(self.queue)
类LifoQueue(队列):
' ' ' ' '首先检索最近添加的条目的队列变量。'''
def _init(self,maxsize):
self.queue=[]
def _qsize(self,len=len):
返回len(自身队列)
def _put(self,item):
self.queue.append(项目)
def _get(self):
返回self.queue.pop()
至此长队模块介绍完毕,重点是理解互斥锁,条件变量如果协同工作,保证队列的线程安全。
下面是长队的完全代码:
类别队列:
' ' '创建具有给定最大大小的队列对象。
如果maxsize=0,则队列大小是无限的。
'''
def __init__(self,maxsize=0):
self.maxsize=maxsize
自我. init(maxsize)
每当队列发生变化时,必须持有#互斥体.所有方法
#获取互斥体必须在返回之前释放它。互斥(体)…
#在三个条件之间共享,因此获取和
#释放条件也会获取和释放互斥体。
self . mutex=threading .锁定()
#通知not_empty每当一个项目被添加到队列时;a
#然后通知等待获取的线程。
self.not_empty=_threading .条件(自身互斥)
#通知未满_已满每当从队列中移除项目时;
#然后通知等待放置的线程。
self.not_full=_threading .条件(自身互斥)
#每当未完成的任务数
#下降到零;等待加入()的线程被通知恢复
self.all_tasks_done=_threading .条件(自身互斥)
self.unfinished_tasks=0
定义任务_完成(自身):
""表示先前排队任务已完成。
由队列使用者线程使用。对于用于获取任务的每个get(),
对task_done()的后续调用告诉队列处理
上的任务完成了。
如果加入()当前被阻塞,当所有项目
已被处理(意味着收到了task_done()调用
对于已经放入()队列中的每个项目)。
如果调用次数多于项目数,则引发值错误
放在队列中。
'''
self.all_tasks_done.acquire()
尝试:
未完成=自我。未完成的任务-1
如果未完成=0:
如果未完成0:
引发值错误(" task _ done()调用次数过多")
self.all_tasks_done.notify_all()
self.unfinished_tasks=未完成
最后:
self.all_tasks_done.release()
定义联接(自身):
""会一直阻止,直到队列中的所有项目都已被获取和处理。
每当向中添加一项时,未完成任务的计数就会增加
排队。每当使用者线程调用task_done()时,计数就会下降
以指示该项已被检索到,并且对它的所有工作都已完成。
当未完成任务的数量降至零时,加入()会解除阻塞。
'''
self.all_tasks_done.acquire()
尝试:
while self.unfinished_tasks:
self.all_tasks_done.wait()
最后:
self.all_tasks_done.release()
定义qsize(自身):
''返回队列的大概大小(不可靠!).'''
self.mutex.acquire()
n=自我. qsize()
self.mutex.release()
返回
定义空(自身):
''如果队列为空,则返回没错,否则返回假(不可靠!).'''
self.mutex.acquire()
n=非自我. qsize()
self.mutex.release()
返回
定义完整(自身):
''如果队列已满,则返回没错,否则返回假(不可靠!).'''
self.mutex.acquire()
n=0 self.maxsize==self ._qsize()
self.mutex.release()
返回
定义put(自身、项目、块=真,超时=无):
''将项目放入队列。
如果可选参数"阻止"为真实的且"超时"为无(默认值),
如有必要,阻塞,直到有空闲的插槽可用。如果"超时"为
一个非负数,它最多阻塞"超时"秒并引发
如果在该时间内没有可用的空闲槽,则为完全异常。
否则('块'为假),如果有空闲槽,则将一个项目放入队列
立即可用,否则引发完整异常("超时")
在那种情况下被忽略)。
'''
self.not_full.acquire()
尝试:
如果self.maxsize为0:
如果不阻止:
如果自我. qsize()==self.maxsize:
全额加注
否则如果超时为无:
而自我. qsize()==self.maxsize:
self.not_full.wait()
否则如果超时0:
提升值错误(“超时"必须为非负数)
否则:
endtime=_time()超时
而自我. qsize()==self.maxsize:
剩余时间=结束时间-_时间()
如果剩余=0.0:
全额加注
self.not_full.wait(剩余)
自我. put(项目)
self.unfinished_tasks=1
self.not_empty.notify()
最后:
self.not_full.release()
def put_nowait(self,item):
' ' '将项目放入队列中,而不阻止。
仅当空闲槽立即可用时,才将项目排入队列。
否则引发完整的异常。
'''
返回self.put(item,False)
def get(self,block=True,timeout=None):
' ' '从队列中移除并返回项目。
如果可选参数"阻止"为真实的且"超时"为无(默认值),
如有必要,在某项可用之前阻止。如果"超时"为
一个非负数,它最多阻塞"超时"秒并引发
如果在该时间内没有可用的项目,则为空异常。
否则('块'为假),如果一个项目立即
可用,否则引发空异常("超时"被忽略)
那样的话)。
'''
self.not_empty.acquire()
尝试:
如果不阻止:
如果不是自我. qsize():
举起空的
否则如果超时为无:
而不是自我. qsize():
self.not_empty.wait()
否则如果超时0:
提升值错误(“超时"必须为非负数)
否则:
endtime=_time()超时
而不是自我. qsize():
剩余时间=结束时间-_时间()
如果剩余=0.0:
举起空的
self.not_empty.wait(剩余)
物品=自身. get()
self.not_full.notify()
退货项目
最后:
self.not_empty.release()
def get_nowait(self):
' ' ' '从队列中移除并返回项目,而不阻止。
只有当一个项目立即可用时,才获取该项目。否则
引发空异常。
'''
return self.get(False)
#重写这些方法以实现其他队列组织
#(例如堆栈或优先级队列)。
#只有在持有适当锁的情况下才会调用这些函数
#初始化队列表示
def _init(self,maxsize):
self.queue=deque()
def _qsize(self,len=len):
返回len(自身队列)
#将新项目放入队列
def _put(self,item):
self.queue.append(项目)
#从队列中获取项目
def _get(self):
返回self.queue.popleft()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。