Python多进程通信,Python进程间通信

  Python多进程通信,Python进程间通信

  本文主要介绍python中进程间的通信以及设置状态量控制的其他进程。文章围绕题目,详细介绍了内容,有一定的参考价值,有需要的朋友可以参考一下。

  00-1010一、python中的进程间通信二。设置状态量以控制另一个过程

  

目录

  在业务场景:,当前遇到的业务场景中,我们需要启动一个区间任务,该任务运行一个算法,然后处理算法的结果并将它们存储起来。目前任务间隔一小时,算法运行50多分钟,留给结果处理的时间不多,可能会超时。目前优化方向在算法上会更合理,因为结果处理不需要太多时间。但是,在这个业务场景中,实际上可以将结果处理时间无限压缩到零。说是压缩到零,其实就是在算法执行之后启动一个进程,这样算法的运行就不会因为需要数据处理而受到影响。算法和结果处理分为两个独立的过程进行处理。在最初的程序中,算法运算和结果处理被视为一个周期,而现在算法运算和结果处理被分为两个周期。

  技术实现方案:

  启动两个进程,其中一个运行算法。算法运行后,向另一个进程发送一个状态值,另一个进程收到状态量后开始数据处理。只要这两个过程互不影响。实际上相当于算法进程控制了数据处理过程。

  测试场景构造代码:

  从多重处理导入过程,管道

  导入时间

  导入系统

  导入操作系统

  定义发送消息(连接):

  对于范围(1000):内的I

  打印( send_message:%d%i )

  print(os.getpid())

  连接发送(一)

  时间.睡眠(3)

  定义发送消息1(连接):

  #对于范围(1000):内的I

  print(conn.recv())

  而True:

  if conn.recv() % 5==0:

  打印(“今天天气不错”)

  时间.睡眠(1)

  if __name__==__main__:

  #创建过程通信管道

  左、右=管道()

  t1=进程(target=send_message,args=(left,))

  t2=进程(target=send_message1,args=(right,))

  t1.start()

  t2.start()

  在这个案例场景下有一些需要注意的点:

  第一,time.sleep()的问题,总是在你睡眠到指定时间的时候出错。错误的具体原因到现在也没有找到。这是原问题,这里没有长测,可能不会发生。但是,需要注意的是,代码实现和上面的描述有一些不同。例如,没有启用计划任务,但只启动了间隔运行的任务。第三,数据处理过程一直在处理空闲运行状态,会造成资源的浪费(形成阻塞状态比较合理,但是对阻塞状态的结构缺乏了解,所以先牺牲资源。第四,上述需求中,算法运算和数据处理的最后节点有一个调度任务在控制之下,这里不体现。事实上,定时任务和数据处理应该分开作为两个周期,以更好地满足上述要求。

  

一、python中进程间通信

  在业务场景:,当前遇到的业务场景中,我们需要启动一个区间任务,该任务运行一个算法,然后处理算法的结果并将它们存储起来。目前任务间隔一小时,算法运行50多分钟,留给结果处理的时间不多,可能会超时。目前优化方向在算法上会更合理,因为结果处理不需要太多时间。但是,在这个业务场景中,实际上可以将结果处理时间无限压缩到零。说是压缩到零,其实就是在算法执行之后启动一个进程,这样算法的运行就不会因为需要数据处理而受到影响。算法和结果处理分为两个独立的过程进行处理。在最初的程序中,算法运算和结果处理被视为一个周期,而现在算法运算和结果处理被分为两个周期。

  上述解决方案仅涉及启用两个进程来运行两个任务,但不涉及启用计时器。

  务框架,所以可能会显得和上述的业务场景不一致,所以在这里重新解决一下。上面也是没有问题的,只是把定时任务框架也作为一个任务去处理即可。然后在定时任务运行完程后,向另外一个进程传入一个参数,作为启动另一个进程的状态量即可。当然,在这里,两个进程还是完全占满的,即处理阻塞状态。对于资源的利用还是没有完全达到最好。后续再考虑使用进程池的方式,看是否可以让其中的一个进程运行完后直接释放资源。

  技术解决方案如下:

  

from multiprocessing import Process,Pipe

  import time

  from apscheduler.schedulers.background import BackgroundScheduler

  from apscheduler.schedulers.blocking import BlockingScheduler

  from apscheduler.schedulers.asyncio import AsyncIOScheduler

  # schedule = BackgroundScheduler()

  schedule = BlockingScheduler(timezone="Asia/Shanghai")

  # schedule = AsyncIOScheduler(timezone="Asia/Shanghai")

  def algorithm(conn):

      print(start_run)

      conn.send(please run)

      # time.sleep(5)

  def worth_result(conn):

      while True:

          if conn.recv() == please run:

              print(conn.recv() + very nice!)

  def time_job(conns):

      schedule.add_job(func=algorithm,trigger=interval,seconds=5,args=(conns,))

      schedule.start()

  if __name__ == __main__:

      left,right = Pipe()

      t1 = Process(target=time_job,args=(left,))

      t2 = Process(target=worth_result,args=(right,))

      t1.start()

      t2.start()

  在这里还有一些点需要说明,定时任务选择那一种类型其实都没有关系,阻塞和非阻塞其实没有关系,因为我们在这里是直接启了两个进程,每个进程间是相互独立的,并非是在定时任务下启用的两个进程,所以不会影响的。

  关于这个解决方案还有的问题:

  

  • 一、上述所说,两个进程是占满的,所以对于资源来说,两个进程的利用率一直很高
  • 二、扩展性不足,如果在这个程序中还有其他需要处理的过程,就需要再添加进程,或者把他添加到当前的进程之下,代码重构会比较麻烦一些
  • 三、整个任务的控制不足,需要加以完善。比如对于运行状态一些控制及查看,一般程序如果运行时间较长的话,我们应该添加这样的接口,否则启动后如果没有出结果,我们是不知道其运行状态,有一点被动
  • 四、关于三,使用logging库,应该是可以直接去输出其日志,但是日志库作为第三方库,相当于是对整个运行状态进行监控,会不会再占用一个进程,这个需要去测试
  • 五、完备性及容灾处理,如果程序由于资源等其他问题挂掉后,会有一些数据冗余下来,也就是一些算法未进行处理,这个时候需要考虑怎么样去补数据?原始文件如果没有保留下来呢?而且如果这些数据是极重要的数据该怎么处理?如果程序挂掉后,应该如何快速的去处理呢?直接重启吗?
  • 六、如果数据处理的进程所用的时间比算法还多,那该怎么办?目前的业务来看,是远低于的,但是如果是远高于呢?可否将处理工作进行分配,利用多台机器来处理,然后再把结果合并起来?

  分布式处理的思想越来越浓。

  到此这篇关于python中进程间通信及设置状态量控制另一个进程的文章就介绍到这了,更多相关python进程通信内容请搜索盛行IT软件开发工作室以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT软件开发工作室!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: