python线程间通信的方式,Python多进程通信
本文主要介绍Python进程间通信模式,在这种模式下,进程是相互隔离的。为了实现进程间通信,队列模式是主要的方式。以下更多内容,可以参考需要的小伙伴。
00-1010一、通讯方式二。队列三介绍。方法介绍。生产者与消费者模型四。生产者和消费者模型的实现方式是什么一、实现方式二。使用JoinableQueue
目录
进程是相互隔离的。为了实现IPC,多处理模块主要采用队列模式。
队列:类似于管道,元素先进先出。
需要注意的一点是:在内存中运行,进程退出,队列被清空。此外,队列也是一种阻塞形式。
一、通信方式
创建队列的类(底层就是以管道和锁定的方式实现):
Queue([maxsize]):创建一个共享进程队列,这是一个多进程安全队列。
您可以使用队列来实现多个进程之间的数据传输。Maxsize是队列中允许的最大项目数;如果省略,则没有大小限制。
二、Queue介绍
DEF PUT (self,obj,Block=True,timeout=none) 3360将数据插入队列。block的默认值为true,这意味着当队列已满时,它将被阻塞。如果Block为False,则队列将已满,并出现异常queue . full。time out表示它将被阻塞,直到指定的时间可用于插入。如果超时,将被报告为异常队列。完全def get(self, block=True, timeout=None):从队列中取出数据。block的默认值为True,表示当队列为空时,它将被阻塞。如果block为False,则为异常队列。当队列为空时,将报告为空。超时表示它将一直等到获取数据。如果时间到了,一个异常队列。空def empty(self): 将判断队列是否为空,如果为空,将返回true。def full(self): 将判断队列是否已满。如果full返回true,def qsize(self): 返回应用举例:.队列的大小
从多重处理导入流程,经理
q=经理()。队列(2)
q.put(1)
q.put(2,block=False,timeout=2)
定义函数():
print(q.get())
p=过程(目标=功能)
print(size ,q.qsize())
print(full ,q.full())
开始()
连接()
print(empty ,q.empty())
print(get ,q.get())
print(get ,q.get(block=False,timeout=2))
输出结果:
三、方法介绍
在并发编程中使用生产者和消费者模式可以解决大多数并发问题。这种模式可以通过平衡生产线程和消费线程的工作能力来提高程序的整体数据处理速度。
为什么要使用生产者和消费者模式?
在网络世界里,生产者是生产数据的线程,消费者是消费数据的线程。在多线程开发中,如果生产者的处理速度快,消费者的处理速度慢,那么生产者必须等待消费者处理完,才能继续生产数据。同理,如果消费者的处理能力大于生产者,那么消费者必须等待生产者。为了解决这个问题,引入了生产者和消费者模型。
p>
四、什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯:
生产者,只需要往队列里面丢东西(生产者不需要关心消费者)
消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
实现方式一:Queue
from multiprocessing import Process,Manager,active_childrenimport random
import queue
import time
class Producer(Process):
def __init__(self,queue):
super().__init__()
self.queue = queue
def run(self):
for i in range(6):
r = random.randint(0, 99)
time.sleep(1)
self.queue.put(r)
print("add data{}".format(r))
class Consumer(Process):
def __init__(self,queue):
super().__init__()
self.queue = queue
def run(self):
while True:
if not self.queue.empty():
data = self.queue.get()
print("minus data{}".format(data))
if __name__ == __main__:
q = Manager().Queue() # 创建队列
p = Producer(q)
c = Consumer(q)
p.start()
c.start()
print(active_children()) # 查看现有的进程
p.join()
c.join()
print("结束")
>>>输出
[<ForkProcess(SyncManager-1, started)>, <Producer(Producer-2, started)>, <Consumer(Consumer-3, started)>]
add data83
minus data83
add data72
minus data72
add data8
minus data8
add data63
minus data63
add data75
minus data75
add data52
minus data52
实现方式二:利用JoinableQueue
JoinableQueue([maxsize]):一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。JoinableQueue
的实例除了与Queue对象相同的方法之外还具有:
task_done():使用者使用此方法发出信号,表示get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用task_done()方法为止
from multiprocessing import Process,JoinableQueueimport os
import time
import random
def print_log(msg, log_type="prod"):
if log_type == prod:
print("\033[32;1m%s\033[0m" %msg)
elif log_type == con:
print("\033[31;1m%s\033[0m" %msg)
def producer(q):
"""
生产者
:param q:
:return:
"""
for i in range(10):
data = random.randint(1,200)
time.sleep(2)
q.put(data) # 放入队列
msg = "add data {}".format(data)
print_log(msg)
q.join() # 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。
# 阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
def consumer(q):
"""
消费者
:param q:
:return:
"""
while True:
if not q.empty():
time.sleep(5)
data = q.get()
msg = "minus data{}".format(data)
print_log(msg,"con")
q.task_done() # q.get()的返回项目已经被处理
if __name__ == __main__:
q = JoinableQueue()
prod = Process(target=producer, args=(q,))
con = Process(target=consumer, args=(q,))
con.daemon = True # 设置为守护进程,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
# 开启进程
prod.start()
con.start()
prod.join() # 等待生产和消费完成,主线程结束
print("结束")
输出结果:
到此这篇关于Python进程间通信方式的文章就介绍到这了,更多相关Python进程间通信内容请搜索盛行IT软件开发工作室以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT软件开发工作室!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。