python 多线程编程,python多线程并行编程

  python 多线程编程,python多线程并行编程

  一穿线模块介绍多进程模块的完全模仿了穿线模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍

  官网链接:https://docs.python.org/3/library/threading.html?突出显示=线程#

  二开启线程的两种方式

  #方式一

  从线程导入线程

  导入时间

  def sayhi(姓名):

  时间。睡眠(2)

  打印( %s say hello %name)

  if __name__==__main__ :

  t=Thread(target=sayhi,args=(egon ,))

  启动()

  打印(主线程)方式一

  #方式二

  从线程导入线程

  导入时间

  类赛希(线程):

  def __init__(self,name):

  超级()。__init__()

  self.name=name

  定义运行(自身):

  时间。睡眠(2)

  打印(“%s向“% self.name”问好)

  if __name__==__main__ :

  t=Sayhi(egon )

  启动()

  打印(主线程)方式二

  三在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

  从线程导入线程

  从多重处理导入流程

  导入操作系统

  定义工时():

  打印("你好")

  if __name__==__main__ :

  #在主进程下开启线程

  t=线程(目标=工作)

  启动()

  打印(主线程/主进程)

  打印结果:

  你好

  主线程/主进程

  #在主进程下开启子进程

  t=过程(目标=工作)

  启动()

  打印(主线程/主进程)

  打印结果:

  主线程/主进程

  你好

  1 谁的开启速度快

  从线程导入线程

  从多重处理导入流程

  导入操作系统

  定义工时():

  print(hello ,os.getpid())

  if __name__==__main__ :

  #第一部分:在主进程下开启多个线程,每个线程都跟主进程的pid一样

  t1=线程(目标=工作)

  t2=线程(目标=工作)

  t1.start()

  t2.start()

  打印(主线程/主进程pid ,os.getpid())

  #第二部分:开多个进程,每个进程都有不同的pid

  p1=过程(目标=工作)

  p2=过程(目标=工作)

  p1.start()

  p2.start()

  打印(主线程/主进程pid ,os.getpid())2瞅一瞅pid

  从线程导入线程

  从多重处理导入流程

  导入操作系统

  定义工时():

  全球n

  n=0

  if __name__==__main__ :

  # n=100

  # p=过程(目标=工作)

  # p.start()

  # p.join()

  #打印(主,n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100

  n=1

  t=线程(目标=工作)

  启动()

  t.join()

  打印(主,n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据3同一进程内的线程共享该进程的数据?

  四练习练习一:

  # _ * _编码:utf-8_*_

  #!/usr/bin/env python

  导入多重处理

  导入线程

  导入插座

  s=socket.socket(socket .AF_INET,插座.袜子_流)

  s.bind((127.0.0.1 ,8080))

  s。听(5)

  定义动作(连接):

  虽然正确:

  data=conn.recv(1024)

  打印(数据)

  conn.send(data.upper())

  if __name__==__main__ :

  虽然正确:

  conn,addr=s.accept()

  p=螺纹。线程(target=action,args=(conn,))

  开始()多线程并发的窝服务端

  # _ * _编码:utf-8_*_

  #!/usr/bin/env python

  导入插座

  s=socket.socket(socket .AF_INET,插座.袜子_流)

  s.connect((127.0.0.1 ,8080))

  虽然正确:

  msg=input(:).条状()

  如果没有消息:继续

  s.send(msg.encode(utf-8 ).

  data=s.recv(1024)

  打印(数据)客户端

  练习二:三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件

  从线程导入线程

  msg_l=[]

  format_l=[]

  定义对话():

  虽然正确:

  msg=input(:).条状()

  如果没有消息:继续

  消息_l。附加(消息)

  极好的格式_消息():

  虽然正确:

  如果消息_l:

  res=msg_l.pop()

  格式_l.append(res.upper())

  定义保存():

  虽然正确:

  如果格式_l:

  用打开( db.txt , a ,编码=utf-8 )作为女:

  res=format_l.pop()

  f.write(%s\n %res )

  if __name__==__main__ :

  t1=线程(目标=通话)

  t2=线程(目标=格式_消息)

  t3=线程(目标=保存)

  t1.start()

  t2.start()

  t3.start()查看代码

  五线程相关的其他方法线实例对象的方法

  # isAlive():返回线程是否活动的。

  # getName():返回线程名。

  #集合名称():设置线程名。

  穿线模块提供的一些方法:

  # threading.currentThread():返回当前线程变量。

  # threading.enumerate():返回包含正在运行的线程的列表。运行是指开始后结束前的线程,不包括开始前和结束后的线程。

  # threading.activeCount():返回正在运行的线程数,结果与len(threading.enumerate())相同。

  从线程导入线程

  导入线程

  从多重处理导入流程

  导入操作系统

  定义工时():

  导入时间

  时间.睡眠(3)

  print(threading.current_thread()。getName())

  if __name__==__main__ :

  #启动主进程下的线程

  t=线程(目标=工作)

  启动()

  print(threading.current_thread()。getName())

  print(threading . current _ thread())#主线程

  Print(threading.enumerate()) #与主线程一起运行的还有两个线程

  print(threading.active_count())

  打印(“主线程/主进程”)

  打印结果:

  主线程

  _MainThread(MainThread,已启动140735268892672)

  [ _MainThread(MainThread,已启动140735268892672),]

  主线程/主进程

  线程1

  查看代码

  主线程等待子线程完成。

  从线程导入线程

  导入时间

  def sayhi(姓名):

  时间.睡眠(2)

  打印( %s say hello %name)

  if __name__==__main__ :

  t=Thread(target=sayhi,args=(egon ,))

  启动()

  t.join()

  打印(“主线程”)

  print(t.is_alive())

  埃贡说你好

  主流中泓线

  错误的

  六个守护线程,无论是进程还是线程,都遵循这样的规则:守护xxx会等待主xxx运行并被销毁。

  需要强调的是,操作的完成并不是操作的终止。

  #1.对于主进程,运行完毕意味着主进程代码已经运行完毕。

  #2.对于主线程来说,运行完毕意味着主线程所在进程中的所有非守护线程都已经运行完毕,主线程被认为运行完毕。详细解释:

  #1主进程在其代码结束后运行完毕(此时守护进程被回收),然后主进程会等到所有非守护进程子进程运行完毕回收子进程的资源(否则会产生僵尸进程)后再结束。

  #2在其他非守护线程完成运行之前,主线程没有完成(守护线程此时被回收)。因为主线程的结束意味着进程的结束,进程的全部资源都会被回收,进程必须保证所有的非守护线程都已经运行完才能结束。从线程导入线程

  导入时间

  def sayhi(姓名):

  时间.睡眠(2)

  打印( %s say hello %name)

  if __name__==__main__ :

  t=Thread(target=sayhi,args=(egon ,))

  T.setDaemon(True) #必须在t.start()之前设置

  启动()

  打印(“主线程”)

  print(t.is_alive())

  主流中泓线

  真实的

  从线程导入线程

  导入时间

  def foo():

  打印(123)

  时间.睡眠(1)

  打印(“end123”)

  定义栏():

  打印(456)

  时间.睡眠(3)

  打印(“end456”)

  t1=线程(目标=foo)

  t2=线程(目标=bar)

  t1.daemon=True

  t1.start()

  t2.start()

  Print (main -)是一个令人困惑的例子

  七python Gil(全局解释器锁)

  八锁同步需要注意的三点:

  #1.线程抢占GIL锁,相当于执行权。在正确执行之前,您无法获得互斥锁。其他线程也可以抓取GIL,但是如果你发现锁还是没有释放,就会被阻塞。即使你得到了正确的执行,GIL应该被立即移交。

  #2.join等待的是一切,也就是整个串行,而锁只锁修改共享数据的部分,也就是部分串行。保证数据安全的根本原则是让并发变成串行,join和mutex都可以实现。无疑,互斥的部分串行效率更高。

  #3.一定要看GIL和互斥体的经典分析,在这一节的最后GIL VS洛克

  机智的同学可能会问这个问题,那就是既然你之前说了,Python已经有GIL保证同一时间只能有一个线程执行了。为什么这里还需要锁?

  首先我们需要达成一个共识:锁的目的是保护共享数据,同一时间只有一个线程可以修改共享数据。

  然后,我们可以得出结论,应该添加不同的锁来保护不同的数据。

  最后,问题清楚了。GIL和锁是保护不同数据的两种锁。前者是解释器级的(当然保护的是解释器级的数据,比如垃圾收集的数据),后者保护的是用户自己开发的应用的数据。显然,GIL对此并不负责,只有用户可以自定义锁定过程,即锁定。

  进程分析:所有线程都抢GIL锁,或者所有线程都抢执行权限。

  线程1抓取了GIL锁,获得了执行权限,开始执行,然后添加了一个锁,但是还没有执行完,也就是说线程1还没有释放锁。可能是线程2获取了GIL锁并开始执行。在执行过程中,发现线程1没有释放锁,所以线程2被阻塞,被剥夺了执行权限。有可能线程1获得了GIL,然后正常执行,直到锁被释放。这就导致了串行操作的效果。

  既然是连载,那就执行吧

  t1.start()

  t1 .加入

  t2.start()

  t2.join()

  这也是串行执行,为什么还要加锁?你要知道join是等待t1的所有代码执行完的,相当于锁定了t1的所有代码,而Lock只是锁定了一些操作共享数据的代码。

  因为python解释器帮助你定时自动回收内存,所以你可以理解为Python解释器中有一个独立的线程。每隔一段时间,它就会从wake up开始一次全局轮询,看看哪些内存数据可以清空。此时,您自己程序中的线程和py解释器自己的线程正在并发运行。假设你的线程删除了一个变量,在py解释器的垃圾收集线程在清除这个变量的过程中的清除时刻,可能是另一个线程刚刚重新分配了还没有被清除的内存空间,结果新分配的数据可能会被删除。为了解决类似的问题,python解释器简单粗暴的加了一个锁,即当一个线程运行时,其他人都不能动,从而解决了上述问题,可以说是早期版本Python的遗留问题。详细的

  从线程导入线程

  导入操作系统,时间

  定义工时():

  全球n

  温度=n

  时间.睡眠(0.1)

  n=温度-1

  if __name__==__main__ :

  n=100

  l=[]

  对于范围内的I(100):

  p=线程(目标=工作)

  l .追加(p)

  开始()

  对于l中的p:

  连接()

  Print(n) #结果可能是99。锁通常用于实现对共享资源的同步访问。为每个共享资源创建一个锁对象。当需要访问资源时,调用acquire方法获取锁对象(如果其他线程已经获取了锁,则当前线程需要等待它被释放)。访问资源后,调用release方法来释放锁:

  导入线程

  r=螺纹。锁定()

  r .收购()

  对公共数据的操作

  发布版本()

  从螺纹导入螺纹,锁紧

  导入操作系统,时间

  定义工时():

  全球n

  lock.acquire()

  温度=n

  时间.睡眠(0.1)

  n=温度-1

  lock.release()

  if __name__==__main__ :

  lock=Lock()

  n=100

  l=[]

  对于范围内的I(100):

  p=线程(目标=工作)

  l .追加(p)

  开始()

  对于l中的p:

  连接()

  print(n) #的结果肯定是0,由原来的并发执行变为串行执行,牺牲了执行效率,保证了数据安全视图代码。

  分析:

  #1.100线程抢GIL锁,也就是抢执行权限

  #2.必须有一个线程先抓住GIL(暂时称为线程1),然后开始执行。一旦执行,它将获得lock.acquire()

  #3.最有可能的是,在线程1结束运行之前,另一个线程2抓住GIL开始运行。但是线程2发现互斥锁还没有被线程1释放,于是被阻塞,被迫交出执行权限,也就是释放GIL。

  #4.直到线程1再次抢占GIL,才会从上一次暂停位置继续执行,直到互斥锁正常释放,然后其他线程会重复2 3 4的过程。GIL锁和互斥锁综合分析(重点!)

  #无锁:并发执行,高速,数据不安全。

  从线程导入当前线程,线程,锁

  导入操作系统,时间

  定义任务():

  全球n

  打印(“%s”正在运行“%current_thread()。getName())

  温度=n

  时间.睡眠(0.5)

  n=温度-1

  if __name__==__main__ :

  n=100

  lock=Lock()

  线程=[]

  start_time=time.time()

  对于范围内的I(100):

  t=线程(目标=任务)

  threads.append

  启动()

  对于螺纹中的t:

  t.join()

  stop_time=time.time()

  print( master:% s n:% s %(stop _ time-start _ time,n))

  线程1正在运行

  线程2正在运行

  .

  线程100正在运行

  主号码:0.5216062068939209 n:99

  #无锁:解锁部分并发执行,锁定部分串行执行,速度慢,数据安全。

  从线程导入当前线程,线程,锁

  导入操作系统,时间

  定义任务():

  #解锁代码并发运行

  时间.睡眠(3)

  打印( %s开始运行 %current_thread()。getName())

  全球n

  #锁定的代码串行运行。

  lock.acquire()

  温度=n

  时间.睡眠(0.5)

  n=温度-1

  lock.release()

  if __name__==__main__ :

  n=100

  lock=Lock()

  线程=[]

  start_time=time.time()

  对于范围内的I(100):

  t=线程(目标=任务)

  threads.append

  启动()

  对于螺纹中的t:

  t.join()

  stop_time=time.time()

  print( master:% s n:% s %(stop _ time-start _ time,n))

  线程1正在运行

  线程2正在运行

  .

  线程100正在运行

  主数据:53.28239746n:0

  #有的同学可能会有一个疑问:既然加锁会让操作串行,那么我在start之后马上使用join,就不需要加锁了,这也是串行效应。

  #没错:start后立即使用jion,一定会把100个任务的执行变成串行。毫无疑问,N的最终结果肯定是0,这是安全的,但问题是

  #start后立即加入:任务中的所有代码都是串行执行的,而锁定,也就是锁定的部分,也就是修改共享数据的部分,是串行的。

  #从保证数据安全方面来说,两者都可以实现,但显然加锁更高效。

  从线程导入当前线程,线程,锁

  导入操作系统,时间

  定义任务():

  时间.睡眠(3)

  打印( %s开始运行 %current_thread()。getName())

  全球n

  温度=n

  时间.睡眠(0.5)

  n=温度-1

  if __name__==__main__ :

  n=100

  lock=Lock()

  start_time=time.time()

  对于范围内的I(100):

  t=线程(目标=任务)

  启动()

  t.join()

  stop_time=time.time()

  print( master:% s n:% s %(stop _ time-start _ time,n))

  线程1开始运行

  线程2开始运行

  .

  线程100开始运行

  大师:350.697336921692n:0 #时间是多么恐怖!

  互斥和连接的区别(关键!)

  九死锁现象和递归锁进程也有死锁和递归锁。过程中忘记说了,但这里什么都说了。

  死锁是指两个或两个以上的进程或线程在执行过程中因为争夺资源而相互等待的现象。没有外力,他们将无法前进。此时,系统被称为处于死锁状态或系统有死锁。这些总是在等待对方的进程叫做死锁进程,下面就是死锁。

  从螺纹导入螺纹,锁紧

  导入时间

  mutexA=Lock()

  mutexB=Lock()

  类MyThread(线程):

  定义运行(自身):

  self.func1()

  self.func2()

  def func1(自身):

  mutexA.acquire()

  Print(\033[41m%s获得一个锁\033[0m %self.name )

  mutexB.acquire()

  Print(\033[42m%s获得B锁\033[0m %self.name )

  mutexB.release()

  mutexA.release()

  定义功能2(自身):

  mutexB.acquire()

  Print(\033[43m%s获得B锁\033[0m %self.name )

  时间.睡眠(2)

  mutexA.acquire()

  Print(\033[44m%s获得一个锁\033[0m %self.name )

  mutexA.release()

  mutexB.release()

  if __name__==__main__ :

  对于范围(10)内的I:

  t=MyThread()

  启动()

  线程1获得一个锁

  线程1获得B锁

  线程1获得B锁

  线程2获得一个锁

  然后就卡住了,僵持不下。

  查看代码

  解,递归锁。在python中,为了支持在同一线程中对同一资源的多个请求,Python提供了可重入锁RLock。

  这个RLock在内部维护一个锁和一个计数器变量,计数器记录获取的次数,以便可以多次获取资源。直到一个线程的所有请求都被释放,其他线程才能获得资源。在上面的示例中,如果使用RLock而不是Lock,则不会发生死锁:

  mute xa=mute XB=threading . r lock()#当线程获得锁时,计数器递增1,计数器继续递增1。在此期间,所有其他线程只能等待该线程释放所有锁,也就是说,直到计数器递减到0,十的信号量与进程的信号量相同。

  信号量管理一个内置的计数器,

  每当调用acquire()时,内置counter-1;

  调用release()时内置计数器1;

  计数器不能小于0;当计数器为0时,acquire()将阻塞该线程,直到其他线程调用release()。

  示例:(只有5个线程可以同时获得信号量,即最大连接数可以限制为5):

  从线程导入线程,信号量

  导入线程

  导入时间

  # def func():

  # if sm.acquire():

  # print (threading.currentThread()。getName()获取信号量)

  #时间.睡眠(2)

  # sm.release()

  定义函数():

  sm.acquire()

  print( % s get sm % threading . current _ thread()。getName())

  时间.睡眠(3)

  sm.release()

  if __name__==__main__ :

  sm=信号量(5)

  对于范围(23)中的I:

  t=线程(target=func)

  t.start()查看代码

  它和进程池是完全不同的概念。进程池Pool(4)最多只能生成四个进程,而且从头到尾都只是这四个进程,不会生成任何新的,而信号量生成一堆线程/进程。

  互斥锁和信号量推荐博客:http://url.cn/5DMsS9r

  11个事件与流程中的事件相同。

  线程的一个关键特性是每个线程独立运行,其状态不可预测。如果程序中的其他线程需要通过判断一个线程的状态来决定自己下一步的操作,那么线程同步的问题就会变得非常困难。为了解决这些问题,我们需要使用线程库中的事件对象。对象包含一个可以由线程设置的信号量,它允许线程等待特定事件的发生。最初,事件对象中的信号标志被设置为false。如果一个线程正在等待一个事件对象,并且该事件对象的标志为假,那么该线程将被阻塞,直到该标志为真。如果一个线程将一个事件对象的信号标志设置为真,它将唤醒所有等待这个事件对象的线程。如果一个线程等待一个已经设置为true的事件对象,它将忽略这个事件并继续执行。

  Event.isSet():返回事件的状态值;

  Event.wait():如果event.isSet()==False,线程将被阻塞;

  Event.set():将Event的状态值设置为True,阻塞池中的所有线程都被激活,准备被操作系统调度;

  Event.clear():还原事件的状态值为False。

  例如,有多个工作线程试图链接MySQL。我们希望在连接之前确保MySQL服务正常,以便那些工作线程可以连接到MySQL服务器。如果连接不成功,他们将尝试重新连接。然后我们可以使用线程。事件机制来协调每个工作线程的连接操作。

  从线程导入线程,事件

  导入线程

  导入时间,随机

  def conn_mysql():

  计数=1

  while not event.is_set():

  如果计数为3:

  引发TimeoutError(“链接超时”)

  print((%s)第% s次尝试链接“% (threading.current _ thread()。getname(),count))

  事件等待时间(0.5)

  计数=1

  打印( %s链接成功 % threading.current _ thread()。getname())

  定义check_mysql():

  Print(\033[45m[%s]检查MySQL \ 033[0m % threading . current _ thread()。getname())

  time.sleep(random.randint(2,4))

  event.set()

  if __name__==__main__ :

  事件=事件()

  conn1=线程(target=conn_mysql)

  conn2=Thread(target=conn_mysql)

  check=Thread(target=check _ MySQL)

  conn1.start()

  conn2.start()

  check.start()查看代码

  十二个条件(知道)让线程等待,只有满足一定的条件,才会释放N个线程。

  导入线程

  定义运行(n):

  con.acquire()

  con.wait()

  打印(运行线程:%s %n )

  con.release()

  if __name__==__main__ :

  con=线程。条件()

  对于范围(10)内的I:

  t=螺纹。线程(target=run,args=(i,))

  启动()

  虽然正确:

  inp=输入( )

  如果inp==q :

  破裂

  con.acquire()

  con.notify(int(inp))

  con.release()

  定义条件函数():

  ret=False

  inp=输入( )

  如果inp==1 :

  ret=True

  返回ret

  定义运行(n):

  con.acquire()

  条件等待(条件函数)

  打印(运行线程:%s %n )

  con.release()

  if __name__==__main__ :

  con=线程。条件()

  对于范围(10)内的I:

  t=螺纹。线程(target=run,args=(i,))

  t.start()查看代码

  十三定时器timer,指定N秒后执行一个操作。

  从线程导入计时器

  def hello():

  打印("你好,世界")

  t=定时器(1,你好)

  t。开始()# 1秒钟后,将打印“你好,世界”

  十四线程队列队列队列:使用导入队列,用法与进程长队一样

  当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。

  班级队列Queue (maxsize=0) #先进先出

  导入队列

  q=队列。队列()

  q.put(“第一个")

  q.put(秒)

  q.put(第三)

  print(q.get())

  print(q.get())

  print(q.get())

  结果(先进先出):

  第一

  第二

  第三

  查看代码

  班级队列LIFO队列(maxsize=0)#后进先出

  导入队列

  q=队列LifoQueue()

  q.put(“第一个")

  q.put(秒)

  q.put(第三)

  print(q.get())

  print(q.get())

  print(q.get())

  结果(后进先出):

  第三

  第二

  第一

  查看代码

  班级队列PriorityQueue (maxsize=0) #存储数据时可设置优先级的队列

  导入队列

  q=队列。优先级队列()

  #上传进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高

  q.put((20, a ))

  q.put((10, b ))

  q.put((30, c ))

  print(q.get())

  print(q.get())

  print(q.get())

  结果(数字越小优先级越高,优先级高的优先出队):

  (10,“b”)

  (20,《答》)

  (三十,"丙")

  查看代码

  其他

  优先级队列的构造函数最大尺寸是一个整数,它设置队列中可以放置的项目数的上限。一旦达到这个大小,插入将被阻止,直到队列项被用完。如果最大允许的上传大小小于或等于零,则队列大小是无限的。

  首先检索最低值的条目(最低值的条目是由已排序(列表(条目))[0])返回的条目)。条目的典型模式是:(优先级编号,数据)形式的元组。

  异常队列。空的

  对空的队列对象调用非阻塞get()(或get_nowait())时引发异常。

  异常队列。全部

  对已满的队列对象调用非阻塞put()(或put_nowait())时引发异常。

  Queue.qsize()

  排队。empty()#如果为空,则返回真实的

  Queue.full() #如果已满,则返回真实的

  Queue.put(项目,块=真,超时=无)

  将项目放入队列。如果可选的args块为真,超时为无(缺省值),则在有空闲槽可用之前,根据需要阻塞。如果超时是正数,它最多阻塞超时秒数,如果在该时间内没有可用的空闲槽,则引发完全异常。否则(区块为假),如果有空闲槽立即可用,则将一个项目放入队列,否则引发完整异常(在这种情况下,超时被忽略)。

  Queue.put_nowait(item)

  相当于put(item,False).

  排队。获取(块=真,超时=无)

  从队列中移除并返回一个项目。如果可选的args块为真,超时为无(缺省值),则在项目可用之前,根据需要阻塞。如果超时是正数,它最多阻塞超时秒数,如果在该时间内没有可用的项,则引发空异常。否则(区块为假),如果一个项目立即可用,则返回该项目,否则引发空异常(在这种情况下忽略超时)。

  Queue.get_nowait()

  相当于得到(假的).

  提供了两种方法来支持跟踪队列中的任务是否已被守护程序使用者线程完全处理。

  Queue.task_done()

  表示先前排队任务已完成。由队列使用者线程使用。对于用于获取任务的每个get(),对task_done()的后续调用会告知队列任务的处理已经完成。

  如果加入()当前被阻塞,它将在处理完所有项目后恢复(这意味着对于每个已经放入()队列的项目,都会收到task_done()调用)。

  如果调用的次数多于队列中放置的项目数,将引发值错误.

  Queue.join()块直到长队被消费完毕查看代码

  十五计算机编程语言标准模块https://docs.python.org/dev/library/concurrent.futures.html期货公司

  #1 介绍

  并行未来模块提供了高度封装的异步调用接口

  ThreadPoolExecutor:线程池,提供异步调用

  进程池执行者:进程池,提供异步调用

  两者都实现了相同的接口,该接口由抽象的执行者类定义。

  #2 基本方法

  #submit(fn,*args,**kwargs)

  异步提交任务

  #map(func,*iterables,timeout=None,chunksize=1)

  取代为循环使服从的操作

  #关机(等待=真)

  相当于进程池的pool.close() pool.join()操作

  等待=真,等待池内所有任务执行完毕回收完资源后才继续

  等待=假,立即返回,并不会等待池内的任务执行完毕

  但不管等待参数为何值,整个程序都会等到所有任务执行完毕

  使服从和地图必须在关机之前

  #结果(超时=无)

  取得结果

  #添加完成回调(fn)

  回调函数

  #介绍

  进程池执行器类是一个执行者子类,它使用一个进程池来异步执行调用ProcessPoolExecutor使用多处理模块,这允许它避开全局解释器锁,但也意味着只能执行和返回可选择的对象。

  类并发。期货。processpoolexecutor(max _ workers=None,mp_context=None)

  一个执行者子类,使用最多最大工人数进程池异步执行调用。如果最大工人数为没有人或未给定,它将默认为机器上的处理器数量。如果最大工人数小于或等于0,则将引发值错误.

  #用法

  从并行未来导入线程池执行程序

  导入操作系统,时间,随机

  定义任务(n):

  打印( %s正在运行%os.getpid())

  time.sleep(random.randint(1,3))

  返回n**2

  if __name__==__main__ :

  executor=ProcessPoolExecutor(max _ workers=3)

  期货=[]

  对于范围(11)中的我:

  未来=执行者。提交(任务,我)

  futures.append未来)

  executor.shutdown(True)

  打印()

  对于未来的未来:

  打印(未来。result())ProcessPoolExecutor

  #介绍

  ThreadPoolExecutor是一个执行者子类,它使用线程池来异步执行调用。

  类并发。期货。线程池执行器(max _ workers=None,thread_name_prefix= )

  一个执行者子类,它使用最多最大工人数线程池来异步执行调用。

  在3.5版本中更改:如果最大工人数为没有人或未给定,则默认为机器上的处理器数乘以5,假设ThreadPoolExecutor经常用于重叠输入输出而不是中央处理器工作,并且工作线程数应该高于进程池执行器的工作线程数。

  版本3.6中的新增功能:增加了线程名称前缀参数,允许用户控制线程。由池创建的工作线程的线程名称,以便于调试。

  #用法

  与进程池执行器相同线程p

  oolExecutor

  from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

  import os,time,random

  def task(n):

   print(%s is runing %os.getpid())

   time.sleep(random.randint(1,3))

   return n**2

  if __name__ == __main__:

   executor=ThreadPoolExecutor(max_workers=3)

   # for i in range(11):

   # future=executor.submit(task,i)

   executor.map(task,range(1,12)) #map取代了for+submitmap的用法

  from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

  from multiprocessing import Pool

  import requests

  import json

  import os

  def get_page(url):

   print( 进程%s get %s %(os.getpid(),url))

   respone=requests.get(url)

   if respone.status_code == 200:

   return {url:url,text:respone.text}

  def parse_page(res):

   res=res.result()

   print( 进程%s parse %s %(os.getpid(),res[url]))

   parse_res=url: %s size:[%s]\n %(res[url],len(res[text]))

   with open(db.txt,a) as f:

   f.write(parse_res)

  if __name__ == __main__:

   urls=[

   https://www.baidu.com,

   https://www.python.org,

   https://www.openstack.org,

   https://help.github.com/,

   http://www.sina.com.cn/

   ]

   # p=Pool(3)

   # for url in urls:

   # p.apply_async(get_page,args=(url,),callback=pasrse_page)

   # p.close()

   # p.join()

   p=ProcessPoolExecutor(3)

   for url in urls:

   p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果回调函数

   ©

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: