python多线程并发,python3 多进程并发执行

  python多线程并发,python3 多进程并发执行

  多重处理模块介绍了python中的多线程不能利用多核的优势。如果要充分利用多核CPU的资源(os.cpu_count () view),大多数情况下需要使用python中的多进程。Python提供了多重处理。

  多处理模块用于打开子进程,并在子进程中执行我们定制的任务(如函数),类似于多线程模块threading的编程接口。

  多处理模块有许多功能:它支持子进程,通信和共享数据,执行不同形式的同步,并提供进程、队列、管道、锁和其他组件。

  需要再次强调的是,与线程不同,进程没有共享状态,被进程修改的数据只在进程内改变。

  第二个流程类的引入创建了流程的类:

  Process ([group [,target [,name [,args [,kwargs]]),由该类实例化的对象,表示子进程中的任务(尚未启动)。

  强调:

  1.您需要使用关键字来指定参数。

  2.args指定的location参数被传递给目标函数,该函数采用元组的形式,并且必须由逗号参数引入:

  不使用1组参数,其值始终为None。

  2

  目标3表示调用对象,即子流程要执行的任务。

  四

  5 args表示调用对象的位置参数元组,args=(1,2, egon ,)

  六

  7 kwargs表示调用对象的字典,kwargs={name: egon , age: 18}

  八

  9是子进程的名称。方法介绍:

  1.p.start():启动流程,并在这个子流程中调用p.run()。

  2 p.run():进程启动时运行的方法,它调用target指定的函数。这个方法必须在我们的自定义类的类中实现。

  三

  4 p.terminate():强制终止进程p,不做任何清理操作。如果p创建了一个子进程,这个子进程就变成了僵尸进程。使用这种方法时,这种情况需要特别小心。如果P也保存了一个锁,它不会被释放,这将导致死锁。

  5.is _ alive():如果p还在运行,返回True

  六

  7.Join ([timeout]):主线程等待P终止(重点:是主线程处于等待状态,而P处于运行状态)。超时是可选的超时。需要强调的是,p.join只能加入start启动的进程,不能加入run启动的进程。

  1 p.daemon:默认值为False。如果设置为True,则表示P是后台运行的守护进程。当P的父进程终止时,P也会终止。设置为True后,P不能创建自己的新进程,必须在p.start()之前设置

  2

  3.name:流程的名称

  四

  页(page的缩写)pid:流程的PID

  六

  页(page的缩写)退出代码:进程在运行时为None如果是n,则意味着它由信号n结束(只需知道)

  八

  9 p.authkey:进程的认证密钥。默认情况下,它是由os.urandom()随机生成的32个字符的字符串。此密钥的目的是为涉及网络连接的底层进程间通信提供安全性。这种连接只有拥有相同的认证密钥(知道就好)才能成功。注意:在windows中,process()必须放在# if _ _ name _= _ _ main _ :下。

  由于Windows没有fork,多处理模块启动一个新的Python进程,并导入调用模块。

  如果Process()在导入时被调用,那么这将引发新进程的无限延续(或者直到您的机器耗尽资源)。

  这就是将Process()调用隐藏在内部的原因

  if __name__==__main__

  因为这个if语句中的语句在导入时不会被调用。

  由于Windows中没有fork,多处理模块启动一个新的Python进程,并导入调用模块。

  如果在导入过程中调用Process(),那么这将启动一个具有无限继承性的新进程(或者直到机器耗尽资源)。

  这是隐藏对Process()的内部调用的原始代码。使用if _ _ name _ _=="_ _ main _ _ ",在导入过程中将不会调用此if语句中的语句。详细说明

  创建和启动子进程的两种方法。

  #启动流程的方法1:

  导入时间

  随机导入

  从多重处理导入流程

  def piao(姓名):

  打印( %s票据 %名称)

  time.sleep(random.randrange(1,5))

  打印((%s票结束 %name )

  P1=进程(target=piano,args=(egon ,))#必须加,签。

  p2=进程(target=piao,args=(alex ,))

  p3=进程(target=piao,args=(wupeqi ,))

  p4=进程(target=piao,args=(元昊,))

  p1.start()

  p2.start()

  p3.start()

  p4.start()

  打印(主线程)方法一

  #开进程的方法二:

  导入时间

  随机导入

  从多重处理导入流程

  类票(流程):

  def __init__(self,name):

  超级()。__init__()

  self.name=name

  定义运行(自身):

  打印( %s飘%self.name)

  time.sleep(random.randrange(1,5))

  打印( %s piao end %self.name )

  p1=朴(《埃贡》)

  p2=朴("亚历克斯")

  p3=飘("无佩琪")

  p4=飘(元昊)

  p1.start() #start会自动调用奔跑

  p2.start()

  p3.start()

  p4.start()

  打印(主线程)方法二

  进程直接的内存空间是隔离的

  从多重处理导入流程

  n=100 #在窗子系统中应该把全局变量定义在if __name__==__main__ 之上就可以了

  定义工时():

  全球n

  n=0

  打印(子进程内: ,n)

  if __name__==__main__ :

  p=过程(目标=工作)

  开始()

  打印(主进程内: ,n)查看代码

  练习1:把窝通信变成并发的形式

  从套接字导入*

  从多重处理导入流程

  服务器=套接字(AF_INET,SOCK_STREAM)

  server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

  server.bind((127.0.0.1 ,8080))

  server.listen(5)

  定义对话(连接,客户端地址):

  虽然正确:

  尝试:

  msg=conn.recv(1024)

  如果没有消息:中断

  conn.send(msg.upper())

  例外情况除外:

  破裂

  if _ _ name _ _== _ _ main _ _ :# windows下开始进程一定要写到这下面

  虽然正确:

  conn,client_addr=server.accept()

  p=进程(target=talk,args=(conn,client_addr))

  启动()服务器端

  从套接字导入*

  客户端=套接字(AF_INET,SOCK_STREAM)

  客户端连接(( 127.0.0.1 ,8080))

  虽然正确:

  msg=input(:).条状()

  如果没有消息:继续

  client.send(msg.encode(utf-8 ))

  msg=client.recv(1024)

  print(msg.decode(utf-8 ))多个客户端

  每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。

  解决方法:进程池这么实现有没有问题?

  过程对象的加入方法

  从多重处理导入流程

  导入时间

  随机导入

  类票(流程):

  def __init__(self,name):

  self.name=name

  超级()。__init__()

  定义运行(自身):

  打印( %s正在浮动%self.name )

  time.sleep(random.randrange(1,3))

  打印(" %s是票尾" %self.name ")

  p=朴(《埃贡》)

  开始()

  p .加入(0.0001)等待p停止,等0.0001秒就不再等了

  打印(开始)加入:主进程等,等待子进程结束

  从多重处理导入流程

  导入时间

  随机导入

  def飘(姓名):

  打印( %s正在浮动% name’)

  time.sleep(random.randint(1,3))

  打印( %s是票尾%name)

  p1=进程(target=piao,args=(egon ,))

  p2=进程(target=piao,args=(alex ,))

  p3=进程(target=piao,args=(元昊,))

  p4=进程(target=piao,args=(wupeiqi ,))

  p1.start()

  p2.start()

  p3.start()

  p4.start()

  #有的同学会有疑问:既然加入是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?

  #当然不是了,必须明确:p.join()是让谁等?

  #很明显连接()是让主线程等待p的结束,卡住的是主线程而绝非进程p,

  #详细解析如下:

  #进程只要开始就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了

  #而我们p1.join()是在等第一亲代结束,没错第一亲代只要不结束主线程就会一直卡在原地,这也是问题的关键

  #加入是让主线程等,而p1-p4仍然是并发执行的p1 .加入的时候,其余p2、p3、p4仍然在运行,等#p1。加入结束,可能p2、p3、p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待

  # 所以四个加入花费的总时间仍然是耗费时间最长的那个进程运行的时间

  p1.join()

  p2.join()

  p3.join()

  p4.join()

  打印(主线程)

  #上述启动进程与加入进程可以简写为

  # p_l=[p1,p2,p3,p4]

  #

  #对于损益中的p:

  # p.start()

  #

  #对于损益中的p:

  # p.join()有了加入,程序不就是串行了吗?

  过程对象的其他方法或属性(了解)

  #处理对象1的其他方法:terminate,is_alive

  从多重处理导入流程

  导入时间

  随机导入

  类票(流程):

  def __init__(self,name):

  self.name=name

  超级()。__init__()

  定义运行(自身):

  打印( %s正在浮动 %self.name )

  time.sleep(random.randrange(1,5))

  打印(“%s是票尾“%self.name”)

  p1=票( egon1 )

  p1.start()

  P1.terminate()#关闭进程,不会立即关闭,所以is_alive立即查看的结果可能还活着。

  Print(p1.is_alive()) #结果为真

  打印(“开始”)

  Print(p1.is_alive()) #结果为Falseterminate和is_alive

  从多重处理导入流程

  导入时间

  随机导入

  类票(流程):

  def __init__(self,name):

  # self.name=name

  #超级()。__init__()#进程的_ _ init _ _方法将执行self.name=Piao-1 1,

  # #所以在这里添加它将覆盖我们自己。name=name

  #为我们启动的流程命名的做法

  超级()。__init__()

  self.name=name

  定义运行(自身):

  打印( %s正在浮动 %self.name )

  time.sleep(random.randrange(1,3))

  打印(“%s是票尾“%self.name”)

  p=朴( egon )

  开始()

  打印(“开始”)

  打印(p.pid) #查看pid名称和pid

  僵尸进程和孤儿进程(理解)

  博客:http://www.cnblogs.com/Anker/p/3271773.html

  一:僵尸进程(有害)

  僵尸:一个进程使用fork创建一个子进程。如果子进程退出,而父进程没有调用wait或waitpid来获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这个过程被称为僵尸过程。详细解释如下

  我们知道,在unix/linux中,正常情况下,子进程是由父进程创建的,子进程在创建新的进程。子进程的结束和父进程的运行是一个异步的过程,即父进程永远无法预测子进程何时结束。如果子进程一结束就恢复了它的所有资源,那么子进程的状态信息在父进程中将是不可用的。

  因此,UN 提供了一种机制,保证父进程可以随时获得子进程结束时的状态信息:

  1.当每个进程退出时,内核释放该进程的所有资源,包括打开的文件、占用的内存等。但是仍然为它保留了一些信息(包括进程号、进程ID、退出状态、进程的终止状态、运行时间、进程占用的CPU时间等。)

  2.直到父进程通过wait/waitpid得到它,它才会被释放。但这会导致问题。如果进程不调用wait/waitpid,那么保留的信息不会被释放,它的进程号将一直被占用。但是,系统可以使用的进程数量是有限的。如果生成了大量死进程,系统将无法生成新进程,因为没有可用的进程号。这就是僵尸进程的危害,应该避免。

  任何子进程(除了init)在exit()之后都不会立即消失,而是留下一个数据结构叫僵尸进程,等待父进程处理。这是每个子流程最后都要经历的阶段。如果子进程在exit()后面,父进程没时间处理,那么用ps命令可以看到子进程的状态是“Z”。如果父进程能及时处理,用ps命令看到子进程的僵尸状态可能就来不及了,但这并不代表子进程不经历僵尸状态。如果父进程在子进程结束前退出,子进程将由init接管。Init将把处于僵死状态的子进程作为父进程来处理。

  二:孤儿进程(无害)

  孤立进程:如果一个父进程退出,而它的一个或多个子进程仍在运行,这些子进程将成为孤立进程。孤儿将被init进程(进程号1)收养,init进程将收集它们的状态。

  孤儿进程是没有父进程的进程,所以孤儿进程就落在了init进程身上,init进程就像一个民政局,负责处理孤儿进程的善后事宜。每当出现一个孤儿进程,内核就将孤儿进程的父进程设置为init,init进程会循环等待()其已经退出的子进程。这样,当一个孤儿进程悲惨地结束它的生命周期时,init进程将代表党和政府处理它的所有善后事宜。因此,孤儿进程不会造成任何伤害。

  我们来测试一下(子进程创建后,主进程的脚本退出,当父进程先于子进程结束时,子进程会被init采用,成为孤儿进程,而不是僵尸进程),文件的内容。

  导入操作系统

  导入系统

  导入时间

  pid=os.getpid()

  ppid=os.getppid()

  打印他是父亲, pid ,pid, ppid ,ppid

  pid=os.fork()

  如果pid为0:

  打印“父亲去世了”

  sys.exit(0)

  #确保主线程完全退出。

  时间.睡眠(1)

  print im child ,os.getpid(),os.getppid()

  执行文件并输出结果:

  我是pid 32515 ppid 32015的父亲

  父亲去世了.

  我是孩子32516 1

  看,子进程已经被pid为1的init进程接收了,所以这种情况下僵尸进程不存在,只有孤儿进程存在,孤儿进程在其声明期结束时自然会被init销毁。

  三。僵尸流程的危险场景:

  例如,有一个进程会定期产生一个子进程。这个子进程需要做的事情很少,做完该做的事情就退出了。所以这个子进程的生命周期很短。而父进程只生成新的子进程,至于子进程退出后会发生什么,它视而不见。这样,系统运行一段时间后,系统中就会出现很多死进程。如果你用ps命令检查它们,你会严格地说,僵尸进程并不是问题的根源,罪魁祸首是产生大量僵尸进程的父进程。所以,当我们寻求如何消灭系统中的大量僵尸进程时,答案就是射杀产生大量僵尸进程的元凶(即通过KILL发送SIGTERM或SIGKILL信号)。罪魁祸首进程被枪毙后,其产生的僵尸进程就成了孤儿进程。这些孤儿进程会被init进程接管,init进程会等待()这些孤儿进程,释放它们所占用的系统进程表中的资源。这样,这些僵尸孤儿进程就可以留下来了。

  四:测试

  #1,生成僵尸进程的test.py程序有以下内容

  #编码:utf-8

  从多重处理导入流程

  导入时间,操作系统

  定义运行():

  Print (child ,os.getpid())

  if __name__==__main__ :

  p=过程(目标=运行)

  开始()

  Print (master ,os.getpid())

  时间.睡眠(1000)

  #2,在unix或linux系统上执行

  [root @ VM 172-31-0-19 ~]# python 3 test . py

  [1] 18652

  [root @ VM 172-31-0-19 ~]# Master 18652

  字18653

  [root @ VM 172-31-0-19 ~]# PS aux grep Z

  用户PID %CPU %MEM VSZ RSS TTY统计开始时间命令

  root 18653 0.0 0.000 pts/0 z 20:02 0:00[Python 3]#僵尸进程出现

  root 18656 0.0 0.0 112648 952 pts/0S 20:02 0:00 grep-color=auto Z

  [root@vm172-31-0-19 ~]# top #执行top命令查找1僵尸

  前20分03秒42最长31分钟,3个用户,平均负载:0.01,0.06,0.12

  任务:总共93,2运行,90睡觉,0停止,1僵尸

  % Cpu:0.0 us,0.3 sy,0.0 ni,99.7 id,0.0 wa,0.0 hi,0.0 si,0.0 st

  KiB内存:总计1016884,可用内存97184,已用内存70848,缓冲区/高速缓存848852

  KiB互换:总计0,免费0,使用0。782540可用内存

  PID用户PR NI VIRT RES SHR S %CPU %MEM时间命令

  984根20 0 29788 1256 988 S 0.3 0.1 0:01.50小精灵

  #3、

  在父进程正常完成后,将调用Wait/waitpid来回收僵尸进程。

  但如果父进程是一个永远不会结束的无限循环,那么僵尸进程就会一直存在,僵尸进程太多是有害的。

  解决方案1:终止父进程。

  解决方案2:http://blog.csdn.net/u010571844/article/details/50419798View电码

  四个守护进程主进程创建一个守护进程。

  首先,守护进程将在主进程代码执行后终止。

  第二,守护进程中不能再启动子进程,否则会抛出异常:断言错误:守护进程IC进程不允许有子进程。

  注意:进程是相互独立的。当主进程代码运行时,守护进程终止。

  从多重处理导入流程

  导入时间

  随机导入

  类票(流程):

  def __init__(self,name):

  self.name=name

  超级()。__init__()

  定义运行(自身):

  打印( %s正在浮动 %self.name )

  time.sleep(random.randrange(1,3))

  打印(“%s是票尾“%self.name”)

  p=朴( egon )

  P.daemon=True #必须在p.start()之前设置。P设置为守护进程,禁止P创建子进程。当父进程的代码执行完成后,P停止运行。

  开始()

  打印(“主”)视图代码

  #当主进程代码运行时,守护进程将结束。

  从多重处理导入流程

  从线程导入线程

  导入时间

  def foo():

  打印(123)

  时间.睡眠(1)

  打印(“end123”)

  定义栏():

  打印(456)

  时间.睡眠(3)

  打印(“end456”)

  p1=进程(目标=foo)

  p2=过程(目标=bar)

  p1.daemon=True

  p1.start()

  p2.start()

  Print (main - ) #当打印这一行时,主进程代码结束,守护进程p1应该终止。可能会有关于p1任务执行的打印信息123,因为当主进程打印main-时,p1也在执行,但是马上就终止了,让人摸不着头脑。

  5.进程间不共享进程同步(锁)数据,但它们共享同一个文件系统,所以访问同一个文件或同一个打印终端是没有问题的。

  分享带来竞争,竞争的结果是无序。怎么控制就是锁定。

  第一部分:多个进程共享同一个打印终端。

  #并发运行,效率高,但竞争同一个打印终端,导致打印无序。

  从多重处理导入流程

  导入操作系统,时间

  定义工时():

  打印( %s正在运行 %os.getpid())

  时间.睡眠(2)

  打印( %s已完成 %os.getpid())

  if __name__==__main__ :

  对于范围(3)中的I:

  p=过程(目标=工作)

  P.start()并发运行,效率高,但是和同一个打印终端竞争,导致打印无序。

  #从并发改为串行,牺牲了运行效率却避免了竞争。

  从多重处理导入流程,锁定

  导入操作系统,时间

  定义工作(锁定):

  lock.acquire()

  打印( %s正在运行 %os.getpid())

  时间.睡眠(2)

  打印( %s已完成 %os.getpid())

  lock.release()

  if __name__==__main__ :

  lock=Lock()

  对于范围(3)中的I:

  p=进程(target=work,args=(lock,))

  P.start()锁:由并发改为串行,牺牲了运行效率,但避免了竞争。

  第2部分:多个进程共享同一个文件

  当文件数据库,模拟抢票。

  #文件db的内容是:{count:1}

  #一定要用双引号,不然json认不出来。

  从多重处理导入流程,锁定

  导入时间,json,随机

  定义搜索():

  dic=json.load(open(db.txt ))

  打印( \ 033[4300万剩余选票%s\033[0m %dic[count])

  def get():

  dic=json.load(open(db.txt ))

  Time.sleep(0.1) #模拟读取数据的网络延迟

  如果dic[count] 0:

  dic[计数]-=1

  Time.sleep(0.2) #模拟写入数据的网络延迟。

  json.dump(dic,open(db.txt , w ))

  打印( \033[43m购票成功\033[0m )

  定义任务(锁定):

  搜索()

  获取()

  if __name__==__main__ :

  lock=Lock()

  对于范围内的I(100):#模拟100个并发客户端抢票。

  p=进程(target=task,args=(lock,))

  P.start()并发运行,效率高,但是争着写同一个文件,导致数据写入无序。

  #文件db的内容是:{count:1}

  #一定要用双引号,不然json认不出来。

  从多重处理导入流程,锁定

  导入时间,json,随机

  定义搜索():

  dic=json.load(open(db.txt ))

  打印( \ 033[4300万剩余选票%s\033[0m %dic[count])

  def get():

  dic=json.load(open(db.txt ))

  Time.sleep(0.1) #模拟读取数据的网络延迟

  如果dic[count] 0:

  dic[计数]-=1

  Time.sleep(0.2) #模拟写入数据的网络延迟。

  json.dump(dic,open(db.txt , w ))

  打印( \033[43m购票成功\033[0m )

  定义任务(锁定):

  搜索()

  lock.acquire()

  获取()

  lock.release()

  if __name__==__main__ :

  lock=Lock()

  对于范围内的I(100):#模拟100个并发客户端抢票。

  p=进程(target=task,args=(lock,))

  P.start()锁定:购票行为由并发变为串行,牺牲了运行效率但保证了数据安全。

  总结:

  #加锁可以保证多个进程修改同一条数据时,同一时间只能修改一个任务,即串行修改。没错,速度是慢,但是以牺牲速度为代价,保证了数据安全。

  虽然文件共享数据可以用来实现进程间通信,但问题是:

  1.效率低下(共享数据基于文件,文件是硬盘上的数据)

  2.你需要自己锁。

  #所以,我们最好找到一个可以兼顾两者的方案:1。高效率(多个进程共享一个内存中的数据)2。帮助我们解决锁的问题。这是多处理模块提供的基于消息的IPC通信机制:队列和管道。

  1队列和管道将数据存储在内存中。

  队列是基于(管道锁)的,可以把我们从复杂的锁问题中解放出来。

  我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁定问题。而且,当进程数量增加时,往往可以获得更好的可开发性。

  六个队列(推荐)进程相互隔离。为了实现进程间通信,多处理模块支持两种形式:队列和流水线,这两种形式都使用消息传递。

  创建一个队列类(底层通过管道和锁定实现):

  1 ([MaxSize]):创建一个共享进程队列。队列是多个进程的安全队列,您可以使用队列来实现多个进程之间的数据传输。参数介绍:

  1 maxsize是队列中允许的最大项目数;如果省略,则没有大小限制。方法介绍:

  主要方法:

  1.q.put方法用于将数据插入队列。put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值)并且timeout为正值,则此方法会阻塞由timeout指定的时间,直到队列中有剩余空间。如果超时,一个队列。将引发完整的异常。如果blocked为False,但队列已满,则队列。将立即抛出完整的异常。

  2.GET方法可以从队列中读取和删除元素。类似地,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值)并且timeout为正数,那么如果在等待时间内没有获取任何元素,则队列。将引发空异常。如果blocked为False,有两种情况。如果Queue的值可用,它将被立即返回;否则,如果队列为空,则返回队列。将立即抛出空异常。

  三

  4.get _ nowait():与q.get(False)相同

  5.put _ nowait():与q.put(False)相同

  六

  7 q.empty():调用此方法时,如果Q为空,则返回True,结果不可靠。例如,在返回True的过程中,如果另一个项目被添加到队列中。

  8.Q. Full():当这个方法被调用时,Q是满的并返回True。这个结果不靠谱。例如,在返回True的过程中,如果队列中的项目被取走。

  9.Q size():返回队列中当前项目的正确数量,结果不可靠。原因和q.empty()和q.full()一样。其他方法(知道):

  1.Cancel _ Join _ Thread():进程退出时后台线程不会自动连接。可以防止join_thread()方法阻塞。

  2 q.close():关闭队列以防止更多的数据被添加到队列中。调用此方法时,后台线程将继续写入已排队但尚未写入的数据,但在此方法完成后将立即关闭。如果Q被垃圾收集,这个方法将被调用。关闭队列不会在队列用户中产生任何类型的数据结束信号或异常。例如,如果一个使用者在get()操作上被阻塞,那么关闭生产者中的队列不会导致get()方法返回错误。

  3 q.join_thread():连接队列的后台线程。该方法用于在调用q.close()方法后等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。可以通过调用q.cancel_join_thread方法来禁止此行为的应用:

  多处理模块支持两种主要的进程间通信形式:流水线和队列。

  都是基于消息传递实现的,但是队列接口

  从多重处理导入流程,队列

  导入时间

  q=队列(3)

  #put,get,put_nowait,get_nowait,full,empty

  q.put(3)

  q.put(3)

  q.put(3)

  Print(q.full()) #已满

  print(q.get())

  print(q.get())

  print(q.get())

  Print(q.empty()) #空视图代码

  消费者模型

  在并发编程中使用生产者和消费者模式可以解决大多数并发问题。这种模式可以通过平衡生产线程和消费线程的工作能力来提高程序的整体数据处理速度。

  为什么使用生产者和消费者模型?

  在网络世界里,生产者是生产数据的线程,消费者是消费数据的线程。在多线程开发中,如果生产者的处理速度快,消费者的处理速度慢,那么生产者必须等待消费者处理完,才能继续生产数据。同理,如果消费者的处理能力大于生产者,那么消费者必须等待生产者。为了解决这个问题,引入了生产者和消费者模型。

  什么是生产者-消费者模型?

  生产者模型是通过一个容器来解决生产者和消费者之间的强耦合问题。生产者和消费者之间不直接通信,而是通过阻塞队列进行通信。因此,生产者在生产数据后不必等待消费者来处理数据,他们直接将数据扔进阻塞队列。消费者不要求生产者提供数据,而是直接从阻塞队列中获取数据。阻塞队列相当于一个缓冲区,平衡生产者和消费者的处理能力。

  基于队列的生产者-消费者模型的实现

  从多重处理导入流程,队列

  导入时间,随机,操作系统

  定义消费者(q):

  虽然正确:

  res=q.get()

  time.sleep(random.randint(1,3))

  Print(\033[45m%s吃掉%s\033[0m %(os.getpid(),res))

  定义制作人(q):

  对于范围(10)内的I:

  time.sleep(random.randint(1,3))

  Res=包子%s %i

  q.put(res)

  Print(\033[44m%s产生了%s\033[0m %(os.getpid(),res))

  if __name__==__main__ :

  q=队列()

  #制作人:也就是厨师

  p1=进程(目标=生产者,参数=(q,))

  #消费者:也就是吃货们。

  c1=进程(目标=消费者,参数=(q,))

  #开始

  p1.start()

  c1.start()

  打印(“主”)视图代码

  #生产者-消费者模型概述

  #该计划中有两种类型的角色

  第一类负责生产数据(生产者)

  一个负责处理数据(消费者)

  #引入生产者-消费者模型是为了解决以下问题:

  平衡生产者和消费者的速度差异。

  #如何实现:

  生产者队列——消费者

  #解耦和实现生产者-消费者模型的类程序

  这个时候的问题是主进程永远不会结束。原因是生产者P在生产结束后,消费者C却停留在无限循环中,取空q后卡在q.get()的步骤中。

  解决方法是让生产者在生产结束后再向队列发送一个结束信号,这样消费者在收到结束信号后就可以跳出死循环。

  从多重处理导入流程,队列

  导入时间,随机,操作系统

  定义消费者(q):

  虽然正确:

  res=q.get()

  如果res为None:当接收到结束信号时,break #结束。

  time.sleep(random.randint(1,3))

  Print(\033[45m%s吃掉%s\033[0m %(os.getpid(),res))

  定义制作人(q):

  对于范围(10)内的I:

  time.sleep(random.randint(1,3))

  Res=包子%s %i

  q.put(res)

  Print(\033[44m%s产生了%s\033[0m %(os.getpid(),res))

  Q.put(无)#发送结束信号

  if __name__==__main__ :

  q=队列()

  #制作人:也就是厨师

  p1=进程(目标=生产者,参数=(q,))

  #消费者:也就是吃货们。

  c1=进程(目标=消费者,参数=(q,))

  #开始

  p1.start()

  c1.start()

  印刷(“主”)制作者在制作完成后发送结束信号None。

  注意:结束信号None不必由生产者发送。它也可以在主进程中发送,但是主进程不应该发送这个信号,直到生产者完成。

  从多重处理导入流程,队列

  导入时间,随机,操作系统

  定义消费者(q):

  虽然正确:

  res=q.get()

  如果res为None:当接收到结束信号时,break #结束。

  time.sleep(random.randint(1,3))

  Print(\033[45m%s吃掉%s\033[0m %(os.getpid(),res))

  定义制作人(q):

  对于范围(2)中的I:

  time.sleep(random.randint(1,3))

  Res=包子%s %i

  q.put(res)

  Print(\033[44m%s产生了%s\033[0m %(os.getpid(),res))

  if __name__==__main__ :

  q=队列()

  #制作人:也就是厨师

  p1=进程(目标=生产者,参数=(q,))

  #消费者:也就是吃货们。

  c1=进程(目标=消费者,参数=(q,))

  #开始

  p1.start()

  c1.start()

  p1.join()

  Q.put(无)#发送结束信号

  在生产者完成生产后,印刷的主要过程(“主”)发送结束信号None。

  但是,上述解决方案,当有多个生产者和多个消费者时,需要以非常低的方式来解决。

  从多重处理导入流程,队列

  导入时间,随机,操作系统

  定义消费者(q):

  虽然正确:

  res=q.get()

  如果res为None:当接收到结束信号时,break #结束。

  time.sleep(random.randint(1,3))

  Print(\033[45m%s吃掉%s\033[0m %(os.getpid(),res))

  定义生产商(名称,q):

  对于范围(2)中的I:

  time.sleep(random.randint(1,3))

  res=%s%s %(name,I)

  q.put(res)

  Print(\033[44m%s产生了%s\033[0m %(os.getpid(),res))

  if __name__==__main__ :

  q=队列()

  #制作人:也就是厨师

  P1=过程(目标=生产者,args=(包子,q))

  P2=进程(目标=生产者,参数=(bone ,q))

  P3=进程(目标=生产者,参数=(泔水,q))

  #消费者:也就是吃货们。

  c1=进程(目标=消费者,参数=(q,))

  c2=进程(目标=消费者,参数=(q,))

  #开始

  p1.start()

  p2.start()

  p3.start()

  c1.start()

  P1.join() #结束信号应该在所有生产者都完成生产之后才发送。

  p2.join()

  p3.join()

  Q.put(None) #有多少消费者应该发送几次结束信号

  Q.put(无)#发送结束信号

  Print (main )有几个消费者需要多次发送结束信号:相当低。

  其实我们的想法无非是发送一个结束信号,还有另外一种队列提供了这种机制。

  #JoinableQueue([maxsize]):这类似于一个队列对象,但是队列允许项目的用户通知生成器项目已经被成功处理。通知过程是通过使用共享信号和条件变量来实现的。

  #参数介绍:

  Maxsize是队列中允许的最大项目数;如果省略,则没有大小限制。

  #方法介绍:

  除了与队列对象相同的方法之外,JoinableQueue的实例p还具有:

  Q.task_done():用户使用这个方法来表示q.get()的返回项已经被处理。如果调用此方法的次数多于从队列中删除的项目数,将引发ValueError异常。

  Q.join():生产者调用这个方法进行阻塞,直到队列中的所有项目都被处理完。阻塞将继续,直到队列中的每个项目都调用q.task_done()方法。

  从多重处理导入过程,JoinableQueue

  导入时间,随机,操作系统

  定义消费者(q):

  虽然正确:

  res=q.get()

  time.sleep(random.randint(1,3))

  Print(\033[45m%s吃掉%s\033[0m %(os.getpid(),res))

  Q.task_done() #向q.join()发送信号,证明一块数据被取走了。

  定义生产商(名称,q):

  对于范围(10)内的I:

  time.sleep(random.randint(1,3))

  res=%s%s %(name,I)

  q.put(res)

  Print(\033[44m%s产生了%s\033[0m %(os.getpid(),res))

  问加入()

  if __name__==__main__ :

  q=JoinableQueue()

  #制作人:也就是厨师

  P1=过程(目标=生产者,args=(包子,q))

  P2=进程(目标=生产者,参数=(bone ,q))

  P3=进程(目标=生产者,参数=(泔水,q))

  #消费者:也就是吃货们。

  c1=进程(目标=消费者,参数=(q,))

  c2=进程(目标=消费者,参数=(q,))

  c1.daemon=True

  c2.daemon=True

  #开始

  p_l=[p1,p2,p3,c1,c2]

  对于p_l中的p:

  开始()

  p1.join()

  p2.join()

  p3.join()

  打印(“母版”)

  #主要流程等。-P1、P2、P3等。C2 C1

  # P1、P2和P3都结束了,这证明C1和C2肯定都收到了P1、P2和P3发送到队列中的数据。

  #所以C1和C2没有值,应该随着主进程的结束而结束,所以设置为守护进程视图代码。

  七。管道进程间通信(IPC)模式2:管道(不推荐,只知道)

  #创建管道的类别:

  管道([duplex]):在进程间创建一个管道,返回元组(conn1,conn2),其中conn1,conn2表示管道两端连接的对象,强调管道必须在进程对象生成之前生成。

  #参数介绍:

  Dumplex:默认管道是全双工的。如果duplex被shot为False,则conn1只能用于接收,conn2只能用于发送。

  #主要方法:

  Conn1.recv():接收conn2.send(obj

  )发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。

   conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

   #其他方法:

  conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法

  conn1.fileno():返回连接使用的整数文件描述符

  conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。

  conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。

  conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收

  conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。介绍

  from multiprocessing import Process,Pipe

  import time,os

  def consumer(p,name):

   left,right=p

   left.close()

   while True:

   try:

   baozi=right.recv()

   print(%s 收到包子:%s %(name,baozi))

   except EOFError:

   right.close()

   break

  def producer(seq,p):

   left,right=p

   right.close()

   for i in seq:

   left.send(i)

   # time.sleep(1)

   else:

   left.close()

  if __name__ == __main__:

   left,right=Pipe()

   c1=Process(target=consumer,args=((left,right),c1))

   c1.start()

   seq=(i for i in range(10))

   producer(seq,(left,right))

   right.close()

   left.close()

   c1.join()

   print(主进程)基于管道实现进程间通信(与队列的方式是类似的,队列就是管道加锁实现的)

  注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。如果忘记执行这些步骤,程序可能再消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产EOFError异常。因此在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。

  from multiprocessing import Process,Pipe

  import time,os

  def adder(p,name):

   server,client=p

   client.close()

   while True:

   try:

   x,y=server.recv()

   except EOFError:

   server.close()

   break

   res=x+y

   server.send(res)

   print(server done)

  if __name__ == __main__:

   server,client=Pipe()

   c1=Process(target=adder,args=((server,client),c1))

   c1.start()

   server.close()

   client.send((10,20))

   print(client.recv())

   client.close()

   c1.join()

   print(主进程)

  #注意:send()和recv()方法使用pickle模块对对象进行序列化。管道可以用于双向通信,利用通常在客户端/服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序

  八 共享数据展望未来,基于消息传递的并发编程是大势所趋

  即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合

  通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,

  还可以扩展到分布式系统中

  进程间通信应该尽量避免使用本节所讲的共享数据的方式

  进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的

  虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

  A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

  A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,

  from multiprocessing import Manager,Process,Lock

  import os

  def work(d,lock):

   # with lock: #不加锁而操作共享的数据,肯定会出现数据错乱

   d[count]-=1

  if __name__ == __main__:

   lock=Lock()

   with Manager() as m:

   dic=m.dict({count:100})

   p_l=[]

   for i in range(100):

   p=Process(target=work,args=(dic,lock))

   p_l.append(p)

   p.start()

   for p in p_l:

   p.join()

   print(dic)

   #{count: 94}进程之间操作共享的数据

  九 信号量(了解)

  互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁

   信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

  from multiprocessing import Process,Semaphore

  import time,random

  def go_wc(sem,user):

   sem.acquire()

   print(%s 占到一个茅坑 %user)

   time.sleep(random.randint(0,3)) #模拟每个人拉屎速度不一样,0代表有的人蹲下就起来了

   sem.release()

  if __name__ == __main__:

   sem=Semaphore(5)

   p_l=[]

   for i in range(13):

   p=Process(target=go_wc,args=(sem,user%s %i,))

   p.start()

   p_l.append(p)

   for i in p_l:

   i.join()

   print(============》)信号量Semahpore(同线程一样)

  十 事件(了解)

  python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

   事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  clear:将“Flag”设置为False

  set:将“Flag”设置为True

  #_*_coding:utf-8_*_

  #!/usr/bin/env python

  from multiprocessing import Process,Event

  import time,random

  def car(e,n):

   while True:

   if not e.is_set(): #Flase

   print(\033[31m红灯亮\033[0m,car%s等着 %n)

   e.wait()

   print(\033[32m车%s 看见绿灯亮了\033[0m %n)

   time.sleep(random.randint(3,6))

   if not e.is_set():

   continue

   print(走你,car, n)

   break

  def police_car(e,n):

   while True:

   if not e.is_set():

   print(\033[31m红灯亮\033[0m,car%s等着 % n)

   e.wait(1)

   print(灯的是%s,警车走了,car %s %(e.is_set(),n))

   break

  def traffic_lights(e,inverval):

   while True:

   time.sleep(inverval)

   if e.is_set():

   e.clear() #e.is_set() ---- False

   else:

   e.set()

  if __name__ == __main__:

   e=Event()

   # for i in range(10):

   # p=Process(target=car,args=(e,i,))

   # p.start()

   for i in range(5):

   p = Process(target=police_car, args=(e, i,))

   p.start()

   t=Process(target=traffic_lights,args=(e,10))

   t.start()

   print(============》)Event(同线程一样)

  十一 进程池在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

  很明显需要并发执行的任务通常要远大于核数一个操作系统不可能无限开启进程,通常有几个核就开几个进程进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

  我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数...

  ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

     创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

  1 Pool([numprocess [,initializer [, initargs]]]):创建进程池   参数介绍:

  1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值

  2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None

  3 initargs:是要传给initializer的参数组  方法介绍:

      主要方法:

  1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()

  2 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。

  3

  4 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成

  5 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用   其他方法(了解部分)

  方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法

  obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。

  obj.ready():如果调用完成,返回True

  obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常

  obj.wait([timeout]):等待结果变为可用。

  obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数View Code 

       应用:

  from multiprocessing import Pool

  import os,time

  def work(n):

   print(%s run %os.getpid())

   time.sleep(3)

   return n**2

  if __name__ == __main__:

   p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务

   res_l=[]

   for i in range(10):

   res=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限

   res_l.append(res)

   print(res_l)同步调用apply

  from multiprocessing import Pool

  import os,time

  def work(n):

   print(%s run %os.getpid())

   time.sleep(3)

   return n**2

  if __name__ == __main__:

   p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务

   res_l=[]

   for i in range(10):

   res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res

   res_l.append(res)

   #异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: