python多进程multiprocessing,python多核并行处理

  python多进程multiprocessing,python多核并行处理

  平时我只使用python的数据处理包进行数据处理,很少使用python的一些其他功能。之前一直在尝试理解python自带的多线程/多进程包,看了很多遍都没办法用。

  今天在家好好学习了一番,物有所值,终于可以用了:这个包我大概知道怎么用了,按照我的想法怎么用。我总是看python的多重处理包。这也是我个人的学习积累。记得收藏,喜欢,关注。

  【注意】完整的代码、数据和技术交流,在本文最后获得。

  什么是多处理器系统?官方文档是这么说的:多处理是一个支持使用类似线程模块的API生成进程的包。多处理包提供了本地和远程并发操作,通过使用子进程而不是线程有效地绕过了全局解释器锁。因此,多处理模块允许程序员充分利用给定机器上的多个处理器。它可以在Unix和Windows上运行。

  多处理模块还引入了线程模块中没有的API。一个主要的例子是Pool对象,它提供了一种快速的方法来赋予函数并行处理一系列输入值的能力,并且可以将输入数据分发到不同的进程进行处理(数据并行)。下面的示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了池。

  其实在我眼里,我并不在乎多处理是多线程还是多进程,只要能充分利用cpu的多核性能,提高自己的计算效率就行。

  使用多重处理的目的:我们想用这个软件包实现什么功能?

  我想计算得更快;我想保存计算结果;我想把计算过程的进度反馈给我;如果遇到异常,可以不终止程序继续运行吗?如果程序超时,可以终止吗?以上是我的目的,也是我的需求;如果都达到了,那我觉得就完了。

  创建场景我们现在有一个非常耗时的函数,就是计算一个值的平方:2=2 * 2。我们设置一个函数,每次计算都要休息1秒。那么可以用python这样写:

  def myf(x):

  时间.睡眠(1)

  返回x * x如果我们不用熊猫,numpy包。我们可能需要这样写:

  #不使用numpy时进行简单的并行计算

  导入时间

  从tqdm导入tqdm

  def myf(x):

  时间.睡眠(1)

  返回x * x

  if __name__==__main__ :

  listx=range(10)

  listy=[]

  对于tqdm(listx)中的I:

  temp_value=myf(x=i)

  listy.append(临时值)

  打印(最终y:)

  打印(listy)一个简单的并行计算我们使用多处理池。池对象可以自动将数据共享给不同的进程。什么都不用管,把数据传进去就行了。在下面的代码中,我需要计算0到19的平方,所以我设置了一个Pool对象,它有4个进程。我传入需要计算的函数和需要计算的迭代对象。然后数据出来了。

  :此处使用了Pool的映射功能。

  #当遇到一个大数据和一个复杂的计算时,使用并行性可以大大提高计算效率。

  从多处理导入池

  导入时间

  从tqdm导入tqdm

  def myf(x):

  时间.睡眠(1)

  返回x * x

  if __name__==__main__ :

  打印(-开始运行-)

  value_x=range(20)

  P=池(进程=4)

  value_y=P.map(func=myf,iterable=value_x)

  Print(value_y)其实计算还是很快的,但是我中间有个情况,就是不知道这个计算到什么程度。是算到50%还是80%?这个不清楚。

  我想展示计算的进步。要显示多核计算的进度条,您可以这样做:

  :这里使用了Pool的imap_unordered函数。

  #相比02,这款可以有变化的进度条,进度条不断衍生,但缺点也很明显,比如缺少进度完成。

  从多处理导入池

  导入时间

  从tqdm导入tqdm

  def myf(x):

  时间.睡眠(1)

  返回x * x

  if __name__==__main__ :

  #打印(-开始运行-)

  value_x=range(20)

  P=池(进程=4)

  value _ y=list(tqdm(p . IMAP _ unordered(func=myf,iterable=value_x)))

  打印(值y)

  这个虽然可以看到进度条不断的完成,但是没有办法看到完成率。

  展示计算完成率:这里使用的是泳池的应用_异步函数

  这里的应用_异步需要的就不是一个迭代对象了,和上面的地图、imap _无序就有明显的差距,我们需要把一个迭代对象拿出来,放到应用_异步里面,然后再将应用_异步放到列表里面,然后在把这个东西从列表里面取出来。通过得到调用他。

  从多处理导入池

  从管理导入管理

  导入时间

  def myf(x):

  时间。睡眠(1)

  返回x * x

  if __name__==__main__ :

  值x=范围(200)

  P=池(进程=4)

  # 这里计算很快

  res=[P.apply_async(func=myf,args=(i,),for i in value_x]

  # 主要是看这里

  结果=[tqdm(RES)中我的I . get(超时=2)]

  打印(结果)当然,我看还有人用应用_异步的回收参数,和上面的对比,感觉就有点麻烦,这里展示给各位:

  从多处理导入池

  从管理导入管理

  导入时间

  def myf(x):

  时间。睡眠(1)

  返回x * x

  if __name__==__main__ :

  值x=范围(200)

  P=池(进程=20)

  pbar=tqdm(total=len(value_x))

  # 这里计算很快

  res=[P.apply_async(func=myf,args=(i,),callback=lambda _:pbar。在value _ x]中为I更新(1))

  # 主要是看这里

  result=[I . get(time out=2)for I in RES]

  打印(结果)处理异常多核计算最经常遇到的就是遇到一个错误,然后就跳出来,这怎么可以忍。就拿最常见的错误来说,函数运行超时怎么解决?

  :这里使用的是泳池的应用_异步函数和得到来解决

  来自多处理导入池,超时错误

  导入时间

  从管理导入管理

  def myf(x):

  如果x % 5==0:

  时间。睡眠(20.2)

  否则:

  时间。睡眠(0.3)

  返回x * x

  def safely_get(value,timeout=2):

  尝试:

  data=value.get(timeout=timeout)

  除了超时错误:

  数据=0

  返回数据

  if __name__==__main__ :

  P=池(进程=10)

  值=范围(100)

  pbar=tqdm(total=len(value))

  #方式2

  RES _ temp=[p . apply _ async(func=myf,args=(i,),callback=lambda _:pbar。值中I的更新(1))

  # result=[RES . get(time out=3)for RES in RES _ temp]

  结果=[对于资源_温度中的res,safely_get(res,time out=1)]

  #方式一

  # RES _ temp=[p . apply _ async(func=myf,args=(i,),for i in value]

  # result=[safely_get(res,time out=1)for RES in tqdm(RES _ temp)]

  时间。睡眠(1)

  打印(结果)我把这个myf函数做了简单的处理,如果是5的倍速,就需要设置休息20.2秒,但是我的容忍度是每个函数运行时间不能超过一秒,所以我写了一饿过安全_获取函数,这个函数里面有试试看,可以破获错误,如果超时,那么整个函数也不跳出,并且把结果返回为0。

  多个进程修改同一个数据来都来了,也把这个问题说一下:

  由于大蟒的GIL,导致同一时间,不同的进程不能同时修改同一个数据。但是使用多重处理包的经理的字典,列表之类的就可以。

  下面这两个代码就是用来将子进程的相关信息保存到一个列表里面。然后保存到熊猫里面。

  使用过程做的:代码如下:

  #多子进程将数据写入一个数据(python字典)

  从多重处理导入流程,经理

  导入操作系统

  导入时间

  进口熊猫作为螺纹中径

  从管理导入管理

  定义工人(id,save_data):

  时间。睡眠(1)

  save_data[id]={

  子进程:[os.getpid()],

  父进程:[os.getppid()],

  进程id: [id]

  }

  if __name__==__main__ :

  finaldata=Manager().字典()

  subprocess_list=[]

  对于管理中的我(范围(200)):

  p=进程(target=worker,args=(i,finaldata))

  subprocess _ list.append

  开始()

  [p . join()for p in tqdm(subprocess _ list)]

  finaldata=pd.concat([pd .finaldata.items()])中(键,值)的数据框架(值)

  打印(最终数据)使用泳池做的:

  #多子进程将数据写入一个数据(python字典)

  #使用池

  从多处理导入池,经理

  导入操作系统

  导入时间

  进口熊猫作为螺纹中径

  从管理导入管理

  定义工人(id,save_data):

  时间。睡眠(1)

  save_data[id]={

  子进程:[os.getpid()],

  父进程:[os.getppid()],

  进程id: [id]

  }

  if __name__==__main__ :

  finaldata=Manager().字典()

  P=池(进程=20)

  # reslist=[]

  #对于管理中的我(范围(200)):

  # res=P.apply_async(func=worker,args=(i,finaldata))

  # reslist.append(res)

  reslist=[p . apply _ async(func=worker,args=(i,finaldata)) for i in range(200)]

  [对于tqdm(结果列表)中的res,RES . get(超时=200)]

  finaldata=pd.concat([pd .finaldata.items()])中(键,值)的数据框架(值)

  打印(最终数据)技术交流欢迎转载、收藏、有所收获点赞支持一下!数据、代码可以找我获取

  目前开通了技术交流群,群友已超过2000人,添加时最好的备注方式为:来源兴趣方向,方便找到志同道合的朋友

  方式、微信搜索公众号:Python学习与数据挖掘,后台回复:加群

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: