python进程通信方式有几种,python线程间通信的方式

  python进程通信方式有几种,python线程间通信的方式

  流程创建后是没有办法得到返回值的,但是有时候需要两个流程相互配合才能完成工作,这就需要借助通信。本文主要介绍Python中进程间的通信方式,你可以知道自己需要什么。

  00-1010什么是进程通信队列的创建-多处理进程间通信的方法-进程间通信-队列演示案例批量添加数据段发送函数其他进程间通信方式-补充

  

目录

  下面举个例子介绍一下沟通的机制:沟通这个词大家都很熟悉,比如一个人想给女朋友打电话。当一个呼叫建立后,呼叫过程中就建立了一个隐形队列(记住这个词)。此时,这个人会不停地通过对话把信息告诉他的女朋友,这个人的女朋友也在听。(嗯……我个人觉得大部分情况下可能反过来)。

  在这里,他们两个可以比作两个过程。‘这个人’的进程需要向‘女朋友’的进程发送信息,所以需要一个队列的帮助。女朋友需要不断从队列中接收信息,还可以做其他事情,所以两个进程之间的通信主要依靠队列。

  该队列可以支持发送和接收消息。“这个人”负责发送消息,而“女朋友”负责接收消息。

  既然队列是重点,那么我们就来看看如何创建队列。

  

什么是进程的通信

  仍然使用多重处理模块,调用该模块的队列函数来创建队列。

  函数介绍参数返回值Queue队列创建mac_count Queue对象队列函数介绍:调用Queue创建队列;它有一个参数mac_count,表示队列中可以创建的最大信息量。如果不传输,默认为无限长。实例化队列对象后,需要操作队列对象来放入和取出数据。

  

队列的创建 - multiprocessing

  函数名引入参数返回值将消息放入队列Message No get获取队列message No strput函数函数描述:传入数据。它有一个字符串类型的参数消息。

  get函数介绍:用来接收队列中的数据。(其实这是常见的json场景。很多数据传输都是字符串,队列的插入和获取都是使用字符串,所以json非常适合这种场景。)

  接下来,让我们练习队列的用法。

  

进程之间通信的方法

  代码示例如下:

  #编码:utf-8

  导入json

  导入多重处理

  Class Work(object): #定义了一个工作类。

  def __init__(self,queue): #构造函数传入一个“队列对象”——queue。

  self.queue=queue

  DEF send (self,message) 3360 #定义了一个send函数,消息传入。

  #[这里隐藏了一个bug,就是只确定传入的字符串是否为类型;如果一个函数、类、集合等。传入,仍会报告错误]

  如果isinstance (message,str) : #确定传入的消息是否是字符串,如果不是,则执行json序列化

  message=json.dumps(message)

  Self.queue.put(message) #使用

   queue 的队列实例化对象将 message 发送出去

   def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环

   while 1:

   result = self.queue.get() # 获取 队列对象 --> queue 传入的message

   # 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获

   try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res

   res = json.loads(result)

   except:

   res = result

   print(接收到的信息为:{}.format(res))

  if __name__ == __main__:

   queue = multiprocessing.Queue()

   work = Work(queue)

   send = multiprocessing.Process(target=work.send, args=({message: 这是一条测试的消息},))

   receive = multiprocessing.Process(target=work.receive)

   send.start()

   receive.start()

  

  使用队列建立进程间通信遇到的异常

  但是这里会出现一个 报错,如下图:

  报错截图示例如下:

  

  这里的报错提示是 文件没有被发现的意思 。其实这里是我们使用 队列做 put() 和 get()的时候 有一把无形的锁加了上去,就是上图中圈中的 .SemLock 。我们不需要去关心造成这个错误的具体原因,要解决这个问题其实也很简单。

  FileNotFoundError: [Errno 2] No such file or directory 异常的解决

  我们只需要给 send 或者 receive 其中一个子进程添加 join 阻塞进程即可,理论上如此。但是我们的 receive子进程是一个 while循环,它会一直执行,所以只需要给 send 子进程加上一个 join 即可。

  解决示意图如下:

  

  PS:虽然解决了报错问题,但是程序没有正常退出。

  实际上由于我们的 receive 进程是个 while循环,并不知道要处理到什么时候,没有办法立刻终止。所以我们需要在 receive 进程 使用 terminate() 函数终结接收端。

  运行结果如下:

  

  

  

批量给 send 函数加入数据

  新建一个函数,写入 for循环 模拟批量添加要发送的消息

  然后再给这个模拟批量发送数据的函数添加一个线程。

  示例代码如下:

  

# coding:utf-8

  import json

  import time

  import multiprocessing

  class Work(object): # 定义一个 Work 类

   def __init__(self, queue): # 构造函数传入一个 队列对象 --> queue

   self.queue = queue

   def send(self, message): # 定义一个 send(发送) 函数,传入 message

   # [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错]

   if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化

   message = json.dumps(message)

   self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去

   def send_all(self): # 定义一个 send_all(发送)函数,然后通过for循环模拟批量发送的 message

   for i in range(20):

   self.queue.put(第 {} 次循环,发送的消息为:{}.format(i, i))

   time.sleep(1)

   def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环

   while 1:

   result = self.queue.get() # 获取 队列对象 --> queue 传入的message

   # 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获

   try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res

   res = json.loads(result)

   except:

   res = result

   print(接收到的信息为:{}.format(res))

  if __name__ == __main__:

   queue = multiprocessing.Queue()

   work = Work(queue)

   send = multiprocessing.Process(target=work.send, args=({message: 这是一条测试的消息},))

   receive = multiprocessing.Process(target=work.receive)

   send_all = multiprocessing.Process(target=work.send_all,)

   send_all.start() # 这里因为 send 只执行了1次,然后就结束了。而 send_all 却要循环20次,它的执行时间是最长的,信息也是发送的最多的

   send.start()

   receive.start()

   # send.join() # 使用 send 的阻塞会造成 send_all 循环还未结束 ,receive.terminate() 函数接收端就会终结。

   send_all.join() # 所以我们只需要阻塞最长使用率的进程就可以了

   receive.terminate()

  

  运行结果如下:

  

  从上图中我们可以看到 send 与 send_all 两个进程都可以通过 queue这个实例化的 Queue 对象发送消息,同样的 receive接收函数也会将两个进程传入的 message 打印输出出来。

  

  

小节

  该章节我们通过队列的方式实现了进程间通信的方法,并且了解了队列的使用方法。一个队列中,有一端(这里我们演示的是 send端)通过 put方法实现添加相关的信息,另一端使用 get 方法获取相关的信息;两个进程相互配合达到一个进程通信的效果。

  其实进程之间的通信不仅仅只有队列这一种方式,感兴趣的话还可以通过 管道、信号量、共享内存的方式来实现。可以自行拓展一下。

  

  

进程间通信的其他方式 - 补充

  python提供了多种进程通信的方式,包括信号,管道,消息队列,信号量,共享内存,socket等

  主要Queue和Pipe这两种方式,Queue用于多个进程间实现通信,Pipe是两个进程的通信。

  1.管道:分为匿名管道和命名管道

  匿名管道:在内核中申请一块固定大小的缓冲区,程序拥有写入和读取的权利,一般使用fock函数实现父子进程的通信

  命名管道:在内存中申请一块固定大小的缓冲区,程序拥有写入和读取的权利,没有血缘关系的进程也可以进程间通信

  特点:面向字节流;生命周期随内核;自带同步互斥机制;半双工,单向通信,两个管道实现双向通信

  2.消息队列:在内核中创建一个队列,队列中每个元素是一个数据报,不同的进程可以通过句柄去访问这个队列。消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法。每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型。消息队列也有管道一样的不足,就是每个消息的最大长度是有上限的,每个消息队列的总的字节数是有上限的,系统上消息队列的总数也有一个上限

  特点:消息队列可以被认为是一个全局的一个链表,链表节点中存放着数据报的类型和内容,有消息队列的标识符进行标记;消息队列允许一个或多个进程写入或读取消息;消息队列的生命周期随内核;消息队列可实现双向通信

  3.信号量:在内核中创建一个信号量集合(本质上是数组),数组的元素(信号量)都是1,使用P操作进行-1,使用V操作+1

  P(sv):如果sv的值大于零,就给它减1;如果它的值为零,就挂起该程序的执行

  V(sv):如果有其他进程因等待sv而被挂起,就让它恢复运行,如果没有进程因等待sv而挂起,就给它加1

  PV操作用于同一个进程,实现互斥;PV操作用于不同进程,实现同步

  功能:对临界资源进行保护

  4.共享内存:将同一块物理内存一块映射到不同的进程的虚拟地址空间中,实现不同进程间对同一资源的共享。共享内存可以说是最有用的进程间通信方式,也是最快的IPC形式

  特点:不同从用户态到内核态的频繁切换和拷贝数据,直接从内存中读取就可以;共享内存是临界资源,所以需要操作时必须要保证原子性。使用信号量或者互斥锁都可以.

  以上就是Python语法学习之进程间的通信方式的详细内容,更多关于Python进程通信方式的资料请关注盛行IT软件开发工作室其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: