python进程和线程之间通信,Python多进程通信
本文主要详细介绍Python进程间通信。本文中的示例代码非常详细,具有一定的参考价值。感兴趣的朋友可以参考一下,希望能帮到你。
00-1010通信模式队列介绍:生产者和消费者模型为什么要使用生产者和消费者模式?什么是生产者和消费者模式实现模式1:队列实现模式2:用JoinableQueue总结。
目录
进程是相互隔离的。为了实现IPC,多处理模块主要采用队列模式。
队列:队列类似于管道,元素先进先出。
需要注意的是,队列都是在内存中操作的,进程退出,队列清空。此外,队列也被阻塞。
通信方式
创建一个队列类(底层通过管道和锁定实现):
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,
您可以使用队列来实现多个进程之间的数据传输。Maxsize是队列中允许的最大项目数;如果省略,则没有大小限制。
方法介绍:
DEF PUT (self,obj,block=true,timeout=none) 3360将数据插入队列
默认情况下,Block的值为True,这意味着当队列已满时,它将阻塞。如果block为False,队列已满将报告异常queue.full。
超时意味着它将被阻塞到指定的时间,直到有空间可以插入。如果超时,它将报告一个异常队列。全部
DEF (self,block=true,timeout=none) :从队列中获取数据
默认情况下,Block的值为True,这意味着当队列为空时,它将阻塞。如果block为False,空队列将报告异常队列。空的
超时意味着它将一直等待,直到获取数据。如果超时,它将报告一个异常队列。空的
Empty (self) :确定队列是否为空,如果为空则返回True。
Def full(self):确定队列是否已满,如果已满,则返回True。
q size (self) 3360返回队列的大小。
应用举例:
从多重处理导入流程,经理
q=经理()。队列(2)
q.put(1)
q.put(2,block=False,timeout=2)
定义函数():
print(q.get())
p=过程(目标=功能)
print(size ,q.qsize())
print(full ,q.full())
开始()
连接()
print(empty ,q.empty())
print(get ,q.get())
print(get ,q.get(block=False,timeout=2))
输出结果
Queue介绍:
在并发编程中使用生产者和消费者模式可以解决大多数并发问题。这种模式可以通过平衡生产线程和消费线程的工作能力来提高程序的整体数据处理速度。
生产者和消费者模型
在网络世界里,生产者是生产数据的线程,消费者是消费数据的线程。在多线程开发中,如果生产者的处理速度快,消费者的处理速度慢,那么生产者必须等待消费者处理完,才能继续生产数据。同理,如果消费者的处理能力大于生产者,那么消费者必须等待生产者。为了解决这个问题,引入了生产者和消费者模型。
为什么要使用生产者和消费者模式
生产者模型是通过一个容器来解决生产者和消费者之间的强耦合问题。生产者和消费者不直接相互通信,而是通过阻塞队列进行通信:
生长
产者,只需要往队列里面丢东西(生产者不需要关心消费者)
消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
实现方式一:Queue
from multiprocessing import Process,Manager,active_childrenimport random
import queue
import time
class Producer(Process):
def __init__(self,queue):
super().__init__()
self.queue = queue
def run(self):
for i in range(6):
r = random.randint(0, 99)
time.sleep(1)
self.queue.put(r)
print("add data{}".format(r))
class Consumer(Process):
def __init__(self,queue):
super().__init__()
self.queue = queue
def run(self):
while True:
if not self.queue.empty():
data = self.queue.get()
print("minus data{}".format(data))
if __name__ == __main__:
q = Manager().Queue() # 创建队列
p = Producer(q)
c = Consumer(q)
p.start()
c.start()
print(active_children()) # 查看现有的进程
p.join()
c.join()
print("结束")
实现方式二:利用JoinableQueue
JoinableQueue([maxsize]):一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
JoinableQueue的实例除了与Queue对象相同的方法之外还具有:
task_done():使用者使用此方法发出信号,表示get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用task_done()方法为止
from multiprocessing import Process,JoinableQueueimport os
import time
import random
def print_log(msg, log_type="prod"):
if log_type == prod:
print("\033[32;1m%s\033[0m" %msg)
elif log_type == con:
print("\033[31;1m%s\033[0m" %msg)
def producer(q):
"""
生产者
:param q:
:return:
"""
for i in range(10):
data = random.randint(1,200)
time.sleep(2)
q.put(data) # 放入队列
msg = "add data {}".format(data)
print_log(msg)
q.join() # 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。
# 阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
def consumer(q):
"""
消费者
:param q:
:return:
"""
while True:
if not q.empty():
time.sleep(5)
data = q.get()
msg = "minus data{}".format(data)
print_log(msg,"con")
q.task_done() # q.get()的返回项目已经被处理
if __name__ == __main__:
q = JoinableQueue()
prod = Process(target=producer, args=(q,))
con = Process(target=consumer, args=(q,))
con.daemon = True # 设置为守护进程,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
# 开启进程
prod.start()
con.start()
prod.join() # 等待生产和消费完成,主线程结束
print("结束")
输出结果
总结
本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注盛行IT软件开发工作室的更多内容!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。