python 线程池使用,Python进程池
线程池的作用:线程池的实现
线程池的角色
当系统启动时,线程池会创建大量的空闲线程。只要一个程序向线程池提交一个函数,线程池就会启动一个空闲线程来执行它。当函数完成后,线程并不会死亡,而是会再次返回线程池,变成空闲,等待下一个函数的执行。
有时候,我们无法知道。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中有大量并发线程时,系统性能会急剧下降,甚至Python解释器也会崩溃。线程池中线程的最大数量可以控制系统中并发线程的数量不超过这个数量。
比如我已经在u盘上实现了病毒扫描。如果定义了5个线程,扫描了20个文件夹,那么将同时扫描5个文件夹,剩下15个。扫描的5个文件夹中,有一个文件夹比较小,会快速扫描。如果5秒钟后,该文件夹被扫描,并且此时一个线程是空的,则第六个文件夹将被扫描。这时候还有5个线程,然后就剩了。
线程池实现1、threadpool实现线程池
#编码:utf-8导入时间导入线程池导入随机导入线程def say hello (str): #执行方法sleep _ seconds=random.randint (1,3) print 线程名:%s,参数:%s,睡眠时间:% s% (threading.current _ thread()。name,str,sleep _ seconds)time . sleep(sleep _ seconds)name _ list=[ aa , bb , cc ,Dd] #总共需要执行4个线程start _ time=time . time()# start time pool=thread pool。thread pool(2)# create 2 threads # create request list requests=thread pool . make requests(say hello,Name _ list)for requests in requests:pool . put request(req)#将每个请求添加到线程池中. wait() #等待线程完成执行后再执行主线程print 总运行时间:% d秒 % (time.time ()-start _ time)结果如下:
我们可以看到,线程池一共创建了2个线程来执行4个任务,线程1的睡眠时间是3秒,线程2的睡眠时间是2秒。也就是说会先执行Thread-2,然后线程池会自动调度下一个任务分配给Thread-2,从而实现线程池控制线程数量,自动调度线程的功能!
2、ThreadPoolExecutor实现线程池
Threadpool是一个比较老的模块,虽然有人还在用,但是已经不是主流了。至于python多线程,已经开始进入未来(未来模块)。使用concurrent.futures模块,这是python3附带的一个模块。不过Python版以上也可以安装使用。具体用法如下(python2.7安装:pip安装期货)
# encoding:utf-8 from concurrent . futures导入threadpoolexecuturimport线程导入随机导入时间# Thread task def task(I):sleep _ seconds=random . randint(1,3) # random sleep time print 线程名称:%s,参数:%s,睡眠时间:% s% (threading.current _ thread()。name,I,Sleep _ seconds)time . Sleep(Sleep _ seconds)#定义睡眠时间#创建一个有2个线程的线程池=threadpooleexecutor(max _ workers=2)#定义两个线程#向线程池提交5个任务用于范围内的I(5):future 1=pool . Submit(Task,i) # Task加入线程池. shutdown()运行结果:3、自定义线程池
有时,我们可以手动构建线程池,而不依赖于上述模块。
任务获取和执行:
(1)任务加入队列,等待线程获取并执行。
(2)按需生成线程,每个线程循环取任务。
螺纹破坏:
(1)当采集任务是终止符时,线程停止。
(2)当线程池关闭()时,向任务队列添加与生成的线程相同数量的终止符。
(3)当线程池terminate()时,将线程的下一个任务设置为终止符。
操作流程图如下:
# -*-编码:utf-8-*-导入队列导入线程导入contextlibStopEvent=object()类线程池(object):def _ _ init _ _(self,max_num): self.q=Queue .队列()#存放任务的队列self.max_num=max_num#最大线程并发数self.terminal=False#如果为真实的终止所有线程,不再获取新任务self.generate_list=[] #已经创建的线程self.free_list=[]#闲置的线程,当无空闲的线程,且有新任务调用了奔跑方法,则需要创建新的线程def run(self,func,args,callback=None): 线程池执行一个任务:参数函数:任务函数:参数参数:任务函数所需参数:参数回调:任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为没有,即:不执行回调函数):返回:如果线程池已经终止,则返回真实的否则无如果len(self.free_list)==0且len(自我。generate _ list)自我。最大数量:#无空闲线程和不超过最大线程数self.generate_thread() #创建线程w=(函数,参数,回调),#保存参数为元组self.q.put(w)#添加到任务队列def generate_thread(self): 创建一个线程 t=线程.线程(目标=自身。call)t . start()def call(self): 循环去获取任务函数并执行任务函数 当前线程=线程。当前线程号获取当前线程对象自我。生成_列表。追加(当前线程)#添加到已创建线程里event=self.q.get() #获取任务正在…事件!=停止事件:#如果不为停止信号函数,参数,回调=事件#分别取值,try:result=func(*参数)#运行函数,把结果赋值给结果状态=真#运行结果是否正常例外为e:状态=假#不正常结果=e #结果为错误信息如果回拨不是无:#是否有回调函数尝试:回拨(状态,结果)#执行回调函数Exception as e:pass if self。终端:#默认为假的,如果调用末端的方法事件=停止事件#停止信号否则:#自我。free _ list。追加(当前线程)#执行完毕任务,添加到闲置列表# event=self.q.get() #获取任务# self.free_list.remove(当前线程)#获取到任务之后,从闲置里删除和self在一起。worker _ state(self。free _ list,current _ thread):event=self。q . get()else:self。生成_列表。删除(当前线程)#如果收到终止信号,就从已创建的列表删除定义关闭(自身):#终止线程num=len(self.generate_list) #获取总已创建的线程而num:self。q . put(停止事件)#添加停止信号,有几个线程就添加几个数量=1 #终止线程(清空队列)def termin ate(自我):自我。terminal=True #更改为没错,而self.generate_list: #如果有已创建线程存活self.q.put(StopEvent) #有几个就发几个信号self.q.empty() #清空队列@上下文库。上下文管理器def worker _ state(self,free_list,current _ thread):free _ list。append(当前线程)try:yield finally:free _ list。remove(current _ thread)导入时间导入随机定义工作(I):sleep _ sec=random。randint(1,3)print(穿线。当前线程().name str(I) str(sleep _ sec))时间。sleep(sleep _ sec)pool=范围(7)的线程池(3):pool。run(func=work,args=(item,)# pool.terminate()pool.close()运行结果:
以上是我个人的总结。如有疑问,欢迎私聊。谢谢大家!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。