python异步多线程,Python 异步任务
本文主要介绍python多进程和通信实现异步任务需求。我很少接触多进程场景,不熟悉python多进程的使用。在一些多流程的业务场景中,我学习了python多流程的使用,觉得有必要做一个总结。有兴趣的朋友来看看吧。
00-1010一、python多进程与通信的基本用法1。多进程的基本实现a .进程重写运行方法b .使用进程和目标方法C。直接使用过程类2。多进程通信a .队列b和管道II。python多进程实战1。使用进程池2快速提取数据。多进程和通信完成数据清理和保存。3.多进程和通信实现异步任务。说实话,python多进程知识对于那些很少使用python多进程或者实际上没有过多使用python进程解决问题的人来说还是比较难的。我也很少接触多进程场景,不熟悉python多进程的使用。在一些多流程的业务场景中,我学会了如何使用python多流程,觉得有必要做一个总结。
目录
多进程及其通信是python中的一个重要部分。作为一个python程序员,这部分应该是基本掌握的。
一、python多进程及通信基本用法
python多进程的使用一般是调用多进程包中的进程和池,其中有很多对进程这个基本函数的使用
P.start()启动一个初始化的进程。
P.join()允许主进程在运行结束后执行。
1、多进程的基本实现
多重处理类继承多重处理类,然后重写其run方法,实现特定的业务逻辑功能。主程序启动10个进程。
从多重处理导入流程
计数=0
类多进程(进程):
def __init__(self,name):
超级()。__init__()
self.name=name
定义运行(自身)-无:
全局计数
计数=1
print(进程名%s正在运行- count:%d%(self.name,count))
if __name__==__main__:
p_list=[]
对于范围(10):内的I
name=process_%d%i
p=MultiOneProcess(name=name)
开始()
p_list.append
对于p_list:中的p
连接()
打印(“此主流程”)
a、Process重写run方法
定义一个流程类继承流程类,在super()初始化中传入目标函数。
从多重处理导入流程
计数=0
类MultiTwoProcess(Process):
def __init__(self,name):
超级()。__init__(target=self.do_fun)
self.name=
name
def do_fun(self):
global count
count += 1
print(process name %s is running----count:%d % (name, count))
if __name__ == __main__:
p_list = []
for i in range(10):
name = process_%d%i
p = MultiTwoProcess(name)
p.start()
p_list.append(p)
for p in p_list:
p.join()
print(this main process)
代码中定义了一个类MultiTwoProcess类,类中定义了do_fun函数,把它作为参数传入到target中。
c、直接使用Process类
传入target函数,同时传入args参数,注意args参数是一个元组,切不能省略最后一个逗号
from multiprocessing import Processcount = 0
def do_fun(name):
global count
count += 1
print(process name %s is running----count:%d % (name, count))
if __name__ == __main__:
p_list = []
for i in range(10):
name = process_%d%i
p = Process(target=do_fun,args=(name,))
p.start()
p_list.append(p)
for p in p_list:
p.join()
print(this main process)
以上三者运行的结果,是一样的,如下:
2、多进程的通信
进程之间的通信一般都采用Queue和pipe,区别是:pipe只能在两个进程之间调用,而Queue是可以多个进程间调用的;效率上pipe效率更高,Queue是基于pipe实现的,效率比pipe要低一点。
a、Queue
常用API,
存放数据
queue.put(obj, block=True, timeout=None)
当block=False的时候,如果Queue已经满了,那么就会跑出Queue.Full异常;
当block=True且timeout有正值的时候,Queue已经满了,Queue会阻塞timeout时间,超出时间就会抛出同样的异常
获取数据
queue.get(block=True, timeout=None)
当block=False的时候,如果Queue为空,那么就会跑出Queue.Empty异常;
当block=True且timeout有正值的时候,Queue已经为空,Queue会阻塞timeout时间,超出时间就会抛出同样的异常
以上2个API是阻塞;还有两个非堵塞的API
queue.put(obj, block=False) 和queue.put_nowait(obj)等效
queue.get(block=False) 和queue.get_nowait()等效
简单的实现,一个进程发送数据,另外2个进程接收数据,就可以使用queue通信
from multiprocessing import Process, Queuedef send(q):
while True:
q.put(发送一个数据)
def receive1(q):
while True:
s = q.get()
print(receive1:, s)
def receive2(q):
while True:
s = q.get()
print(receive2:, s)
if __name__ == __main__:
q = Queue()
p1 = Process(target=send,args=(q,))
p2 = Process(target=receive1,args=(q,))
p3 = Process(target=receive2,args=(q,))
p1.start()
p2.start()
p3.start()
p1进程不断的往q中存放数据;p2和p3不停的从q中取数据(有竞争的再取),所以打印结果是无序的
b、Pipe
Pipe(duplex=True)返回2个连通端(p1,p2);当duplex=True时,双向通信,p1发送,p2接收;p2发送,p1接收。
当duplex=True时,单向通信,p1只能发送,p2只能接收。
常用API, pipe.send() pipe.recv()
from multiprocessing import Process, Pipedef fun2(p):
while True:
s = p.recv()
print(接收一个数据:,s)
def fun1(p):
while True:
print(发送一个数据:pipe)
p.send(pipe)
if __name__ == __main__:
pi1,pi2 = Pipe(duplex=True)
p1 = Process(target=fun1,args=(pi1,))
p2 = Process(target=fun2,args=(pi2,))
p1.start()
p2.start()
结果如下:
二、python多进程实战
不同的业务场景使用多进程的方式和复杂度也不相同,就我遇见过的一些场景进行演示和说明。
1、使用进程池快速抽取数据
场景描述:有1000个Excel文件的数据需要进行抽取和清洗,要把不符合我们需求的数据过滤掉,保留质量很高的数据;每个Excel都有几十万或者上百万的数据,那么怎么快速的完成这个任务呢?
首先整体上而言,可以把单个Excel的处理并行起来;那么可以使用多进程,其次这个需要返回结果,要保留合格的数据,比较简单的就是采用进程池了,它能够很方便的把进程处理的结果进行返回,并且返回的还是一个生成器;如果还需要更快,那么可以把单个Excel中的每条数据的处理并行起来。代码层面上,采用pool进程池来完成这个任务(本文没有对进程池的使用和API做说明),具体的实现方式采取pool.imap()
if __name__ == __main__:#所有Excel的路径
all_paths = glob(../data/original_data/*)
sysInfo_list = [我通过了好友请求,现在你俩可以开始聊天了, 我通过了你的朋友验证请求,现在我们可以开始聊天了, 已通过你的朋友验证请求,现在可以开始聊天了, 不支持此消息,请在手机上查看,
微信红包]
interval = 25
if len(all_paths)//interval * interval < len(all_paths):
k = len(all_paths)//interval + 1
else:
k = len(all_paths) // interval
#分段处理,每段25个Excel
for i in range(k):
paths = all_paths[i*interval:(i+1)*interval]
if i*interval >= 100 and i*interval < 200:
params = []
for path in tqdm(paths):
params.append((path, sysInfo_list))
#多进程处理——进程池、以及进度显示
with Pool(20) as p:
res = list(tqdm(p.imap(extract_data, params), total=len(params), desc=extract_data))
all_df = []
for dfs in res:
if len(dfs) > 0:
all_df.extend(dfs)
df = pd.concat(all_df, axis=0)
save_path = ../data/weikong_clean_data_+str(i*interval)+_+str(i*interval+len(paths)-1)+.xlsx
writer = pd.ExcelWriter(save_path)
df.to_excel(writer, index=False)
writer.save()
writer.close()
2、多进程及通信完成数据清洗和保存
场景描述:从Excel中读取数据,数据格式是整通整通的对话,每通对话有一定的轮数;保存数据到2个txt中,一个是顺序保留,一个是倒序保留;整体对话顺序不变,每通对话内部顺序倒序。
正序:
倒序:
要想实现这样的任务,粗暴的做法是,用两个list,一个保留正序的,一个保留倒序的,然后分别对这两个list进行文件写入操作。但是如果数据量很多在内存有限的时候,只能满足不了两个list的情况下怎么实现呢?
我的实现方式就是开启两个进程,一个进程保留一个正序list,写入文件的同时对每个元素(每通)对话进行倒序,然后把倒序后的数据通过Queue或者Pipe传入到另外一个进程,让另外的进程进行写文件操作。
def save_mmi_train_data(queue):with open(../data/finetune_mmi_data/train.txt,w,encoding=utf-8) as f:
while True:
save_list = queue.get()
if len(save_list) == 0:
break
for line in save_list:
f.write(line)
def save_mmi_val_data(queue):
with open(../data/finetune_mmi_data/val.txt,w,encoding=utf-8) as f:
while True:
save_list = queue.get()
if len(save_list) == 0:
break
for line in save_list:
f.write(line)
def get_funtine_data(paths):
all_groups = []
for path in tqdm(paths,desc=load data from excle):
df = pd.read_excel(path)
df.dropna(inplace=True)
df.drop_duplicates(inplace=True, keep=first)
groups = list(df.groupby(by=[坐席id, 客户微信id]))
all_groups.extend(groups)
print(len(all_groups),len(all_groups))
train, val = train_test_split(all_groups,test_size=10000/len(all_groups),random_state=1)
print(len(train), len(train))
print(len(val), len(val))
train_std_path = ../data/finetune_std_data/train.txt
val_std_path = ../data/finetune_std_data/val.txt
train_mmi_queue = Queue()
save_funtine_data(train, train_std_path,train_mmi_queue,save_mmi_train_data)
val_mmi_queue = Queue()
save_funtine_data(val, val_std_path, val_mmi_queue, save_mmi_val_data)
def save_funtine_data(groups,save_std_path,queue,fun):
p = Process(target=fun,args=(queue,))
p.start()
with open(save_std_path,w, encoding=utf-8) as f:
for group in tqdm(groups, desc=find and save funtine dialogue datas):
new_df = group[1]
df_roles = new_df[是否客服].values.tolist()
df_contents = new_df[消息内容].values.tolist()
roles = []
contents = []
for role,content in zip(df_roles,df_contents):
content = content.replace(\n, )
content = emoji.replace_emoji(content, )
if len(content) > 0 and content != "":
roles.append(role)
contents.append(content)
save_list = []
save_str = ""
for index, role in enumerate(roles):
content = contents[index].replace(\n,)
content = emoji.replace_emoji(content, )
if content[-1] not in punctuations:
content += ;
if index == 0:
if role == "是":
save_str += "坐席:"+content
else:
save_str += "客户:"+content
else:
if role != roles[index-1]:
f.write(save_str[0:-1]+\n)
save_list.append(save_str[0:-1]+\n)
if role == "是":
save_str = "坐席:" + content
else:
save_str = "客户:" + content
else:
save_str += content
if len(save_str) > 1:
save_list.append(save_str[0:-1] + \n)
f.write(save_str[0:-1]+\n)
f.write(\n)
# 切片反转
save_list = save_list[::-1]
save_list.append(\n)
if len(save_list) > 0:
queue.put(save_list)
#注意传入一个空值,让倒序进程结束
queue.put([])
p.join()
要注意的是,倒序进程中使用while True 无限循环,需要传入一个空值,能够让它在正序进程结束的同时知道数据写完了,跳出循环。以上代码比较简单就不一一说明了。
3、多进程及通信实现异步任务需求
场景描述:假定一个模型推理系统,网络模块负责接受请求传输的数据,把数据传输给数据处理模块;数据处理模块负责处理数据(比如说语音流或者视频流等,这些数据处理对CPU的消耗很大),处理完后把数据传输给模型推理模块;模型推理模块负责对数据进行推理并把结果返回给网络模块。要求就是网络模块、数据处理模块和模型推理模块是独立的,可以并行的完成自己的任务,3个模块是异步的,其实可以把这个系统简化的使用多进程来实现。
每个模块可以用一个进程来表示,内部的逻辑可以开启子进程来实现,然后模块直接的数据传输就可以使用多进程的通信来实现,同时也创建一个全局的Queue变量,让每个模块的进程按需使用。
画了一个简单的结构和流程图,如下:
注意的是模块之间的数据传输,使用queue传输的时候,数据量越小,效率越高,所以可以在网络模块这端提前把数据进行处理。
函数入口文件
import aimport b
import c
from whole_queue import WholeQueue
import os
if __name__ == __main__:
print("main process:",os.getpid())
whole_queue = WholeQueue()
b_pool_size = 2
c_pool_size = 6
Module_list = [
a.A(whole_queue,b_pool_size),
b.B(whole_queue,b_pool_size,c_pool_size),
c.C(whole_queue,c_pool_size)
]
for p in Module_list:
p.start()
公共队列类
class WholeQueue():def __init__(self):
self.queues = dict()
def register(self,queuename,queue):
self.queues[queuename] = queue
各个子模块类
a
from multiprocessing import Process,Queueimport time
import random
import os
class A(Process):
def __init__(self,whole_queue,b_pool_size):
super().__init__(target=self.do_run)
self.whole_queue = whole_queue
self.b_pool_size = b_pool_size
self.queue_list = []
queue = Queue()
self.whole_queue.register(A, queue)
self.queue_list.append(queue)
self.count = 0
def do_run(self):
print("A.do_run process:", os.getpid())
a_send_pro = Process(target = self.send)
a_send_pro.start()
a_receive_pro = Process(target = self.receive)
a_receive_pro.start()
def send(self):
print("A.send process:", os.getpid())
while True:
time.sleep(0.001)
self.whole_queue.queues[B_%d%(self.count%self.b_pool_size)].put_nowait(self.count)
self.count += 1
def receive(self):
print("A.receive process:", os.getpid())
while True:
rece = self.whole_queue.queues[A].get()
print(rece)
b
from multiprocessing import Process,Queueimport time
import random
import os
class B(Process):
def __init__(self,whole_queue,b_pool_size,c_pool_size):
super().__init__(target=self.do_run)
self.whole_queue = whole_queue
self.b_pool_size = b_pool_size
self.c_pool_size = c_pool_size
self.queue_list = []
for i in range(self.b_pool_size):
queue = Queue()
self.whole_queue.register(B_%d% i , queue)
self.queue_list.append(queue)
self.count = 0
def do_run(self):
print("B.do_run process:", os.getpid())
for i in range(self.b_pool_size):
p = Process(target=self.component,args=(self.queue_list[i],))
p.start()
def component(self, queue):
print("B.component process:", os.getpid())
while True:
time.sleep(0.01)
info = queue.get()
componext_info = component_ + str(info)
self.whole_queue.queues[C_%d%(info%self.c_pool_size)].put(componext_info)
c
from multiprocessing import Process,Queuefrom model import Model
import time
import random
import os
class C(Process):
def __init__(self,whole_queue,c_pool_size):
super().__init__(target=self.do_run)
self.whole_queue = whole_queue
self.c_pool_size = c_pool_size
self.queue_list = []
for i in range(self.c_pool_size):
queue = Queue()
self.whole_queue.register(C_%d% i , queue)
self.queue_list.append(queue)
# self.cache_queue = None
# self.result_queue = None
# self.infer_queue = None
def do_run(self):
cache_queue = Queue()
result_queue = Queue()
infer_queue = Queue()
print("C.do_run process:", os.getpid())
for i in range(self.c_pool_size):
p = Process(target=self.receive,args=(self.queue_list[i], cache_queue,))
p.start()
cache_p = Process(target=self.cache,args=(cache_queue, infer_queue,))
cache_p.start()
predict_p = Process(target=self.predict,args=(infer_queue, result_queue))
predict_p.start()
while True:
res = result_queue.get()
for ele in res:
self.whole_queue.queues[A].put(ele)
def receive(self, queue,cache_queue):
print("C.receive process:", os.getpid())
while True:
info = queue.get()
receive_info = receive_ + info
cache_queue.put(receive_info)
def cache(self,cache_queue, infer_queue):
timeLast = time.time()
print("C.cache process:", os.getpid())
caches = []
while True:
data = cache_queue.get()
caches.append(data)
if len(caches) > 128 or time.time() - timeLast > 1:
timeLast = time.time()
infer_queue.put(caches)
caches = []
def predict(self,infer_queue, result_queue):
print("C.predict process:", os.getpid())
# 模型必须在这里初始化
model = Model()
while True:
data = infer_queue.get()
result = model(data)
result = [ modelpredict_ + ele for ele in result]
time.sleep(random.uniform(0.1,0.5))
result_queue.put(result)
代码比较好理解,需要注意的是子进程在使用变量的时候,例如初始后的模型,应该要每一个子进程独立的进行初始化,不然会报错,就是C类中模型初始化不能在init中初始后,然后传入到每个子进程中去——而应该在每个子进程中初始化。
到此这篇关于python多进程及通信实现异步任务需求的文章就介绍到这了,更多相关python异步任务内容请搜索盛行IT软件开发工作室以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT软件开发工作室!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。