python多进程multiprocessing,python多核并行处理
平时我只使用python的数据处理包进行数据处理,很少使用python的一些其他功能。之前一直在尝试理解python自带的多线程/多进程包,看了很多遍都没办法用。
今天在家好好学习了一番,物有所值,终于可以用了:这个包我大概知道怎么用了,按照我的想法怎么用。我总是看python的多重处理包。这也是我个人的学习积累。记得收藏,喜欢,关注。
【注意】完整的代码、数据和技术交流,在本文最后获得。
什么是多处理器系统?官方文档是这么说的:多处理是一个支持使用类似线程模块的API生成进程的包。多处理包提供了本地和远程并发操作,通过使用子进程而不是线程有效地绕过了全局解释器锁。因此,多处理模块允许程序员充分利用给定机器上的多个处理器。它可以在Unix和Windows上运行。
多处理模块还引入了线程模块中没有的API。一个主要的例子是Pool对象,它提供了一种快速的方法来赋予函数并行处理一系列输入值的能力,并且可以将输入数据分发到不同的进程进行处理(数据并行)。下面的示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了池。
其实在我眼里,我并不在乎多处理是多线程还是多进程,只要能充分利用cpu的多核性能,提高自己的计算效率就行。
使用多重处理的目的:我们想用这个软件包实现什么功能?
我想计算得更快;我想保存计算结果;我想把计算过程的进度反馈给我;如果遇到异常,可以不终止程序继续运行吗?如果程序超时,可以终止吗?以上是我的目的,也是我的需求;如果都达到了,那我觉得就完了。
创建场景我们现在有一个非常耗时的函数,就是计算一个值的平方:2=2 * 2。我们设置一个函数,每次计算都要休息1秒。那么可以用python这样写:
def myf(x):
时间.睡眠(1)
返回x * x如果我们不用熊猫,numpy包。我们可能需要这样写:
#不使用numpy时进行简单的并行计算
导入时间
从tqdm导入tqdm
def myf(x):
时间.睡眠(1)
返回x * x
if __name__==__main__ :
listx=range(10)
listy=[]
对于tqdm(listx)中的I:
temp_value=myf(x=i)
listy.append(临时值)
打印(最终y:)
打印(listy)一个简单的并行计算我们使用多处理池。池对象可以自动将数据共享给不同的进程。什么都不用管,把数据传进去就行了。在下面的代码中,我需要计算0到19的平方,所以我设置了一个Pool对象,它有4个进程。我传入需要计算的函数和需要计算的迭代对象。然后数据出来了。
:此处使用了Pool的映射功能。
#当遇到一个大数据和一个复杂的计算时,使用并行性可以大大提高计算效率。
从多处理导入池
导入时间
从tqdm导入tqdm
def myf(x):
时间.睡眠(1)
返回x * x
if __name__==__main__ :
打印(-开始运行-)
value_x=range(20)
P=池(进程=4)
value_y=P.map(func=myf,iterable=value_x)
Print(value_y)其实计算还是很快的,但是我中间有个情况,就是不知道这个计算到什么程度。是算到50%还是80%?这个不清楚。
我想展示计算的进步。要显示多核计算的进度条,您可以这样做:
:这里使用了Pool的imap_unordered函数。
#相比02,这款可以有变化的进度条,进度条不断衍生,但缺点也很明显,比如缺少进度完成。
从多处理导入池
导入时间
从tqdm导入tqdm
def myf(x):
时间.睡眠(1)
返回x * x
if __name__==__main__ :
#打印(-开始运行-)
value_x=range(20)
P=池(进程=4)
value _ y=list(tqdm(p . IMAP _ unordered(func=myf,iterable=value_x)))
打印(值y)
这个虽然可以看到进度条不断的完成,但是没有办法看到完成率。
展示计算完成率:这里使用的是泳池的应用_异步函数
这里的应用_异步需要的就不是一个迭代对象了,和上面的地图、imap _无序就有明显的差距,我们需要把一个迭代对象拿出来,放到应用_异步里面,然后再将应用_异步放到列表里面,然后在把这个东西从列表里面取出来。通过得到调用他。
从多处理导入池
从管理导入管理
导入时间
def myf(x):
时间。睡眠(1)
返回x * x
if __name__==__main__ :
值x=范围(200)
P=池(进程=4)
# 这里计算很快
res=[P.apply_async(func=myf,args=(i,),for i in value_x]
# 主要是看这里
结果=[tqdm(RES)中我的I . get(超时=2)]
打印(结果)当然,我看还有人用应用_异步的回收参数,和上面的对比,感觉就有点麻烦,这里展示给各位:
从多处理导入池
从管理导入管理
导入时间
def myf(x):
时间。睡眠(1)
返回x * x
if __name__==__main__ :
值x=范围(200)
P=池(进程=20)
pbar=tqdm(total=len(value_x))
# 这里计算很快
res=[P.apply_async(func=myf,args=(i,),callback=lambda _:pbar。在value _ x]中为I更新(1))
# 主要是看这里
result=[I . get(time out=2)for I in RES]
打印(结果)处理异常多核计算最经常遇到的就是遇到一个错误,然后就跳出来,这怎么可以忍。就拿最常见的错误来说,函数运行超时怎么解决?
:这里使用的是泳池的应用_异步函数和得到来解决
来自多处理导入池,超时错误
导入时间
从管理导入管理
def myf(x):
如果x % 5==0:
时间。睡眠(20.2)
否则:
时间。睡眠(0.3)
返回x * x
def safely_get(value,timeout=2):
尝试:
data=value.get(timeout=timeout)
除了超时错误:
数据=0
返回数据
if __name__==__main__ :
P=池(进程=10)
值=范围(100)
pbar=tqdm(total=len(value))
#方式2
RES _ temp=[p . apply _ async(func=myf,args=(i,),callback=lambda _:pbar。值中I的更新(1))
# result=[RES . get(time out=3)for RES in RES _ temp]
结果=[对于资源_温度中的res,safely_get(res,time out=1)]
#方式一
# RES _ temp=[p . apply _ async(func=myf,args=(i,),for i in value]
# result=[safely_get(res,time out=1)for RES in tqdm(RES _ temp)]
时间。睡眠(1)
打印(结果)我把这个myf函数做了简单的处理,如果是5的倍速,就需要设置休息20.2秒,但是我的容忍度是每个函数运行时间不能超过一秒,所以我写了一饿过安全_获取函数,这个函数里面有试试看,可以破获错误,如果超时,那么整个函数也不跳出,并且把结果返回为0。
多个进程修改同一个数据来都来了,也把这个问题说一下:
由于大蟒的GIL,导致同一时间,不同的进程不能同时修改同一个数据。但是使用多重处理包的经理的字典,列表之类的就可以。
下面这两个代码就是用来将子进程的相关信息保存到一个列表里面。然后保存到熊猫里面。
使用过程做的:代码如下:
#多子进程将数据写入一个数据(python字典)
从多重处理导入流程,经理
导入操作系统
导入时间
进口熊猫作为螺纹中径
从管理导入管理
定义工人(id,save_data):
时间。睡眠(1)
save_data[id]={
子进程:[os.getpid()],
父进程:[os.getppid()],
进程id: [id]
}
if __name__==__main__ :
finaldata=Manager().字典()
subprocess_list=[]
对于管理中的我(范围(200)):
p=进程(target=worker,args=(i,finaldata))
subprocess _ list.append
开始()
[p . join()for p in tqdm(subprocess _ list)]
finaldata=pd.concat([pd .finaldata.items()])中(键,值)的数据框架(值)
打印(最终数据)使用泳池做的:
#多子进程将数据写入一个数据(python字典)
#使用池
从多处理导入池,经理
导入操作系统
导入时间
进口熊猫作为螺纹中径
从管理导入管理
定义工人(id,save_data):
时间。睡眠(1)
save_data[id]={
子进程:[os.getpid()],
父进程:[os.getppid()],
进程id: [id]
}
if __name__==__main__ :
finaldata=Manager().字典()
P=池(进程=20)
# reslist=[]
#对于管理中的我(范围(200)):
# res=P.apply_async(func=worker,args=(i,finaldata))
# reslist.append(res)
reslist=[p . apply _ async(func=worker,args=(i,finaldata)) for i in range(200)]
[对于tqdm(结果列表)中的res,RES . get(超时=200)]
finaldata=pd.concat([pd .finaldata.items()])中(键,值)的数据框架(值)
打印(最终数据)技术交流欢迎转载、收藏、有所收获点赞支持一下!数据、代码可以找我获取
目前开通了技术交流群,群友已超过2000人,添加时最好的备注方式为:来源兴趣方向,方便找到志同道合的朋友
方式、微信搜索公众号:Python学习与数据挖掘,后台回复:加群
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。