python实现异步的几种方式,python 异步调用

  python实现异步的几种方式,python 异步调用

  我一直对asyncio这个图书馆很感兴趣。毕竟是官网推荐的高并发模块。python在python 3.4中也引入了协同学的概念。通过这样的安排,我对这个模块的使用有了更深入的了解。

  阿辛西奥是做什么的?

  异步网络操作并发进程python3.0时代,标准库中的异步网络模块:select(极低级)python3.0时代,第三方异步网络库:Tornado python3.4时代,asyncio: TCP支持,子进程

  目前已经支持asyncio的很多模块:AIO HTTP,AIO DNS,AIO EDIS等。支持的内容列在https://github.com/aio-libs,这里,并不断更新。

  当然,到目前为止,不仅仅是asyncio、tornado和gevent实现了协作流程。

  关于asyncio的一些关键词的解释:

  Event_loop事件循环:程序开始一个无限循环,在事件循环上注册一些函数,遇到事件时,调用对应的协程协程协程协程:协程对象引用async关键字定义的函数,其调用不会立即执行该函数,而是返回一个协程对象。协同流程对象需要在事件循环中注册,并由事件循环调用。任务:一个协作对象是一个可以被挂起的原生函数,一个任务是一个协作过程的进一步的包,它包含了任务未来的各种状态:代表任务未来要执行或不要执行的结果。它和任务没有本质区别。async/await关键字:python3.5用于定义一个协同学,async定义一个协同学,await用于挂起被阻塞的异步调用接口。看完以上关键词,你可能会扭头就走。其实一开始对asyncio的理解和学习是有冲突的,也不知道为什么。这也导致了很长一段时间这个模块本身基本上没人关注和使用。但是,随着python在工作中遇到的各种性能问题,我告诉自己,还是要好好学习这个模块。

  所谓“异步IO”,就是你发起了一个IO操作,却不用等它结束。你可以继续做其他的事情,结束的时候会通知你。

  Asyncio是一种并发方式。对于Python来说,也可以通过线程和多处理来实现并发。

  Asyncio并没有带来真正的并行。当然,由于GIL(全局解释器锁)的存在,Python的多线程并不能带来真正的并行性。

  可以分配给asyncio的任务称为协程。一个进程可以放弃执行并把机会让给其他进程(即放弃或等待)。

  要定义协同学的定义,需要使用async def语句。

  进口异步

  导入时间

  从线程导入线程

  异步定义do_some_work(x):

  打印(等待字符串(x))

  awaasyncio . sleep(x)do _ some _ work是一个协作过程。确切的说,do_some_work是一个协整函数,可以用asyncio.iscoroutinefunction来验证:

  打印(asyncio。iscoroutinefunction(do _ some _ work))这个协作过程什么都没做。我们让它休眠几秒钟,以模拟实际工作负载:

  异步定义do_some_work(x):

  打印(等待字符串(x))

  Awaasyncio.sleep (x)在解释await之前,有必要解释一下协同过程可以做什么。解成能:

  *等待未来的终结

  *等待另一个协程(产生一个结果,或者抛出一个异常)

  *向等待它的进程产生一个结果。

  *向正在等待它的协程抛出一个异常。

  Asyncio.sleep也是一个协同过程,所以wait asyncio.sleep(x)只是在等待另一个协同过程。

  运行协程调用协程函数,协程不会开始运行,只是返回一个协程对象,可以通过asyncio.iscoroutine验证:

  print(asyncio . isco routine(do _ some _ work(3)))# True

  这里还会提出一个警告:

  async1.py:16: RuntimeWarning:协程“do_some_work”从未被等待

  print(asyncio . isco routine(do _ some _ work(3)))

  要让这个协程对象运行的话,有两种方式:

  * 在另一个已经运行的协程中用`等待等待它

  * 通过`确保_未来函数计划它的执行

  简单来说,只有环运行了,协程才可能运行。

  下面先拿到当前线程缺省的循环,然后把协程对象交给循环运行直到完成,协程对象随后会在环里得到运行。

  通过异步非同步(异步)关键字定义一个协程(协程),当然协程不能直接运行,需要将协程加入到事件循环环中

  asyncio.get_event_loop:创建一个事件循环,然后使用运行直到完成将协程注册到事件循环,并启动事件循环

  loop=asyncio.get_event_loop()#获取一个事件_循环循环。run _ until _ complete(do _ some _ work(3))。阻塞直到所有的任务完成

  运行直到完成是一个阻塞(阻塞)调用,直到协程运行结束,它才返回。这一点从函数名不难看出。

  运行直到完成的参数是一个未来,但是我们这里传给它的却是协程对象,之所以能这样,是因为它在内部做了检查,

  通过确保_未来函数把协程对象包装(换行)成了未来。所以,我们可以写得更明显一些:

  循环。run _ until _ complete(asyncio。assure _ future(do _ some _ work(3)))完整代码:

  进口异步

  导入时间

  异步定义做一些工作(x):

  打印(等待字符串(十))

  等待异步睡眠(十)

  loop=asyncio.get_event_loop()

  循环。run _ until _ complete(asyncio。assure _ future(do _ some _ work(3)))

  运行结果:

  等待3

  三秒钟后程序结束

  创建任务和确保_未来

  协程对象不能直接运行,在注册事件循环的时候,其实是运行直到完成方法将协程包装成为了一个任务(任务)对象。工作对象是将来的类的子类,保存了协程运行后的状态,用于未来获取协程的结果,来返回将来的的执行结果

  进口异步

  导入时间

  now=lambda: time.time()

  异步定义做一些工作(x):

  打印(等待:,x)

  开始=现在()

  协程=做一些工作(2)

  loop=asyncio.get_event_loop()

  任务=循环。创建任务(协同例程)

  打印(任务)

  循环运行直到完成(任务)

  打印(任务)打印(任务。result())打印( Time:,now()-start)结果为:

  任务待定coro=do_some_work()运行于/app/py _ code/study _ asyn CIO/simple _ ex2。py:13

  等待:2

  任务完成coro=do_some_work()完成,定义在/app/py _ code/study _ asyn CIO/simple _ ex2。py:13结果=无

  时间:0.000。56860.88868888681创建工作后,在工作加入事件循环之前为悬而未决的状态,当完成后,状态为完成

  关于上面通过循环。创建任务(协同例程)创建任务,同样的可以通过asyncio.ensure _ future(协程)创建工作

  task=asyncio.ensure _ future(协同例程)

  确保_未来方法内部其实也是调用的工作的创建任务方法,然后返回一个工作对象放入环队列中协程嵌套(等等,集合)以及已完成日期:使用异步非同步(异步)可以定义协程,协程用于耗时的超正析象管操作,我们也可以封装更多的超正析象管操作过程,这样就实现了嵌套的协程,即一个协程中等待了另外一个协程,如此连接起来。

  进口异步

  导入时间

  now=lambda: time.time()

  异步定义做一些工作(x):

  打印(等待:,x)

  等待异步睡眠(十)

  返回"在{}s后完成"。格式(十)

  异步定义main():

  协程1=做一些工作(1)

  协程2=做一些工作(2)

  协程3=做一些工作(4)

  任务=[

  asyncio.ensure _ future(协程1),

  asyncio.ensure _ future(协程2),

  asyncio.ensure _ future(协程3)

  ]

  dones,pendings=await asyncio.wait(任务)

  对于已完成的任务:

  打印(任务返回:,task.result())

  # results=wait asyncio。收集(*任务)

  #对于结果中的结果:

  #打印(任务返回:,结果)

  开始=现在()

  loop=asyncio.get_event_loop()

  loop.run_until_complete(main())

  打印( Time:,now()-start)视图代码

  如果我们把上面代码中的:

  dones,pendings=await asyncio.wait(任务)

  对于已完成的任务:

  打印(任务返回:,task.result())替换为:

  结果=等待阿辛西奥。聚集(*任务)

  对于结果中的结果:

  打印(任务返回:,结果)这样得到的就是一个结果的列表

  不在主要的协程函数里处理结果,直接返回等待的内容,那么最外层的运行直到完成将会返回主要的协程的结果。将上述的代码更改为:

  进口异步

  导入时间

  now=lambda: time.time()

  异步定义做一些工作(x):

  打印(等待:,x)

  等待异步睡眠(十)

  返回"在{}s后完成"。格式(十)

  异步定义main():

  协程1=做一些工作(1)

  协程2=做一些工作(2)

  协程3=做一些工作(4)

  任务=[

  asyncio.ensure _ future(协程1),

  asyncio.ensure _ future(协程2),

  asyncio.ensure _ future(协程3)

  ]

  返回等待asyncio . gather(*任务)

  开始=现在()

  loop=asyncio.get_event_loop()

  结果=循环。run _ until _ complete(main()

  对于结果中的结果:

  打印(任务返回:,结果)

  打印( Time:,now()-start)视图代码

  或者返回使用阿辛西奥,等等方式挂起协程。

  将代码更改为:

  进口异步

  导入时间

  now=lambda: time.time()

  异步定义做一些工作(x):

  打印(等待:,x)

  等待异步睡眠(十)

  返回"在{}s后完成"。格式(十)

  异步定义main():

  协程1=做一些工作(1)

  协程2=做一些工作(2)

  协程3=做一些工作(4)

  任务=[

  asyncio.ensure _ future(协程1),

  asyncio.ensure _ future(协程2),

  asyncio.ensure _ future(协程3)

  ]

  return await asyncio.wait(任务)

  开始=现在()

  loop=asyncio.get_event_loop()

  完成,挂起=循环。run _ until _ complete(main()

  对于已完成的任务:

  打印(任务返回:,task.result())

  打印( Time:,now()-start)视图代码

  也可以使用异步超正析象管(Image Orthicon)的已完成方法

  进口异步

  导入时间

  now=lambda: time.time()

  异步定义做一些工作(x):

  打印(等待:,x)

  等待异步睡眠(十)

  返回"在{}s后完成"。格式(十)

  异步定义main():

  协程1=做一些工作(1)

  协程2=做一些工作(2)

  协程3=做一些工作(4)

  任务=[

  asyncio.ensure _ future(协程1),

  asyncio.ensure _ future(协程2),

  asyncio.ensure _ future(协程3)

  ]

  对于asyncio.as_completed中的任务(任务):

  结果=等待任务

  打印(任务返回:{} 。格式(结果))

  开始=现在()

  loop=asyncio.get_event_loop()

  loop.run_until_complete(main())

  打印( Time:,now()-start)视图代码

  聚集另外一个作用:

  #聚集和等待的区别

  #聚集更加高级的

  第1组=[范围(2)中我的get_html(http://projectsedu.com )

  group 2=[get _ html( http://www。百度一下。com )对于范围(2)中的I]

  group1=asyncio.gather(*group1)

  group2=asyncio.gather(*group2)

  group2.cancel()

  循环。run _ until _ complete(asyncio。聚集(第1组,第2组))查看代码

  回调假如协程是一个超正析象管(图片Orthicon)的读操作,等它读完数据后,我们希望得到通知,以便下一步数据的处理。这一需求可以通过往将来的添加回调来实现。

  def done_callback(futu):

  打印("完成")

  futu=asyncio。保证未来(做一些工作(3))

  futu.add_done_callback(完成_回调)

  循环。运行_直到_完成(富图)

  多个协程实际项目中,往往有多个协程,同时在一个环里运行。为了把多个协程交给循环,需要借助asyncio.gather函数。

  循环。run _ until _ complete(asyncio。gather(做一些工作(1),做一些工作(3))

  或者先把协程存在列表里:

  科罗斯=[做一些工作(1),做一些工作(3)]

  循环。run _ until _ complete(asyncio。收集(* coros))

  运行结果:

  等待3

  等待一

  等待三秒钟

  完成的

  这两个协程是并发运行的,所以等待的时间不是1 3=4 秒,而是以耗时较长的那个协程为准。

  参考函数聚集的文档:

  gather(*coros_or_futures,loop=None,return_exceptions=False)

  从给定的协程或将来的返回一个将来的聚合结果。

  发现也可以传未来给它:

  futus=[asyncio。assure _ future(do _ some _ work(1)),

  阿辛西奥。assure _ future(do _ some _ work(3))]

  循环。run _ until _ complete(asyncio。收集(* futus))

  聚集起聚合的作用,把多个未来包装成单个未来,因为循环运行直到完成只接受单个未来。

  运行直到完成和永远奔跑开启事件循环有两种方法,一种方法就是通过调用运行直到完成,另外一种就是调用永远奔跑。运行直到完成内置添加完成回调,使用永远奔跑的好处是可以通过自己自定义添加完成回调;

  我们一直通过运行直到完成来运行循环,等到将来的完成,运行,直到完成也就返回了。

  异步定义做一些工作(x):

  打印(等待字符串(十))

  等待异步睡眠(十)

  打印("完成")

  loop=asyncio.get_event_loop()

  科罗=做一些工作(3)

  循环运行直到完成(科罗)

  输出:

  等待3

  等待三秒钟

  完成的

  程序退出现在改用永远奔跑:

  异步定义做一些工作(x):

  打印(等待字符串(十))

  等待异步睡眠(十)

  打印("完成")

  loop=asyncio.get_event_loop()

  科罗=做一些工作(3)

  阿辛西奥。确保未来(coro)

  loop.run_forever()

  等待3

  等待三秒钟

  完成的

  程序没有退出

  三秒钟过后,未来结束,但是程序并不会退出跑吧_永远会一直运行,直到停下来被调用,但是你不能像下面这样调停止:

  loop.run_forever()

  loop.stop()

  永远奔跑不返回,停止永远也不会被调用。所以,只能在协程中调停止:

  异步定义do_some_work(循环,x):

  打印(等待字符串(十))

  等待异步睡眠(十)

  打印("完成")

  loop.stop()

  这样并非没有问题,假如有多个协程在环里运行:

  阿辛西奥。assure _ future(do _ some _ work(loop,1))

  阿辛西奥。assure _ future(do _ some _ work(loop,3))

  loop.run_forever()第二个协程没结束,循环就停止了——被先结束的那个协程给停掉的。

  要解决这个问题,可以用聚集把多个协程合并成一个未来,并添加回调,然后在回调里再去停止循环。

  异步定义do_some_work(循环,x):

  打印(等待字符串(十))

  等待异步睡眠(十)

  打印("完成")

  def done_callback(loop,futu):

  loop.stop()

  loop=asyncio.get_event_loop()

  futus=asyncio。gather(do_some_work(循环,1),do_some_work(循环,3))

  futus。add _ done _ callback(func工具。partial(done _ callback,loop))

  loop.run_forever()其实这基本上就是运行直到完成的实现了,运行,直到完成在内部也是调用永远奔跑。

  协程的停止:

  未来对象有几个状态:

  PendingRunningDoneCacelled创建将来的的时候,任务为待定,事件循环调用执行的时候当然就是跑步,调用完毕自然就是搞定了,如果需要停止事件循环,就需要先把工作取消。可以使用阿辛西奥。工作获取事件循环的工作

  进口异步

  导入时间

  now=lambda :time.time()

  异步定义做一些工作(x):

  打印(等待:,x)

  等待异步睡眠(十)

  返回"在{}s后完成"。格式(十)

  协程1=做一些工作(1)

  协程2=做一些工作(2)

  协程3=做一些工作(2)

  任务=[

  asyncio.ensure _ future(协程1),

  asyncio.ensure _ future(协程2),

  asyncio.ensure _ future(协程3),

  ]

  开始=现在()

  loop=asyncio.get_event_loop()

  尝试:

  循环。run _ until _ complete(asyncio。等待(任务))

  除了键盘中断为e:

  打印(阿辛西奥.Task.all_tasks()

  对于异步IO中的任务Task.all_tasks().

  print(task.cancel())

  loop.stop()

  loop.run_forever()

  最后:

  loop.close()

  打印( Time:,now()-start)视图代码

  启动事件循环之后,马上ctrl c,会触发运行直到完成的执行异常KeyBorardInterrupt。然后通过循环阿辛西奥。工作取消未来。可以看到输出如下:

  等待:1

  等待:2

  等待:2

  ^C{任务已完成coro=do_some_work()已完成,定义于/app/py _ code/study _ asyn CIO/simple _ ex10。py:13结果=后完成,任务待定coro=do_some_work()正在运行

  at/app/py _ code/study _ asyn CIO/simple _ ex10。py:15 wait _ for=未来待定CB=[Task ._wakeup()] cb=[_wait .当地人/usr/local/lib/python 3.5/asyn CIO/tasks处的. com _ on _ completion()。py:428],任务待定

  coro=do_some_work()运行于/app/py _ code/study _ asyn CIO/simple _ ex10 . py:15 wait _ for=Future pending CB=[Task。_wakeup()] cb=[_wait。当地人。_ on _ completion()at/usr/local/lib/python 3.5/asyncio/tasks . py:428],

  Task pending coro=wait()运行在/usr/local/lib/python 3.5/asyncio/tasks . py:361 wait _ for=Future pending CB=[Task。_wakeup()] }

  错误的

  真实的

  真实的

  真实的

  时间:一分钟。59660.88868688686

  True cannel成功了。循环停止后,需要再次打开事件循环,最后关闭,否则会抛出异常。

  循环任务,逐个取消是一种方案,但就像上面一样,我们把任务列表封装在主函数里,在主函数外调用事件循环。这个时候main就相当于最外向的任务,你就可以处理包装好的main函数了。

  通话(延时功能):

  Call_soon()被立即执行(等待队列中的下一个)

  Call_later()指定运行时间(执行前多长时间)

  Call_at指定运行的时间点。

  调用线程安全的线程安全方法

  进口异步

  导入时间

  进口异步

  定义回调(sleep_times,loop):

  打印(成功时间{} 。format(loop.time()))

  loop.stop()

  # def stoploop(循环):

  # loop.stop()

  #稍后打电话,打电话给

  if __name__==__main__ :

  loop=asyncio.get_event_loop()

  now=loop.time()

  # loop.call_at(现在是2,回调,2,循环)

  # loop.call_at(现在1,回调,1,循环)

  loop.call_at(现在3,回调,3,循环)

  #提前注册我们的任务,也可以根据返回的句柄取消。

  # loop.call_soon(回调,4,循环)

  # loop.call_later(3,回调,5,循环)

  # loop.call_soon(停止循环,循环)

  # loop.call_soon(回调,4,循环)

  loop.run_forever()

  loop.close()视图代码

  闭环?上面的例子都没有调用loop.close,看起来没有问题。你想调整loop.close吗?

  简单来说,只要不关闭,循环就可以再次运行。

  loop . run _ until _ complete(do _ some _ work(loop,1))

  loop . run _ until _ complete(do _ some _ work(loop,3))

  Loop.close()但是如果它被关闭,它就不能再运行了:

  loop . run _ until _ complete(do _ some _ work(loop,1))

  loop.close()

  loop . run _ until _ complete(do _ some _ work(loop,3)) #此处异常

  建议调用loop.close彻底清理循环对象,防止误用。

  和阻塞等待:

  Async可用于定义协作对象,await可用于暂停耗时的操作。就像发电机的产量一样,函数放弃了控制。当一个协同学遇到wait时,事件循环将挂起该协同学并执行其他协同学,直到其他协同学被挂起或执行,然后执行下一个协同学。

  耗时的操作一般都是IO操作,比如网络请求、文件读取等。我们使用asyncio.sleep函数来模拟io操作。该过程的目的是使这些IO操作异步。

  进口异步

  导入时间

  now=lambda :time.time()

  异步定义do_some_work(x):

  打印(等待:,x)

  # await后面是调用耗时的操作。

  等待异步睡眠(x)

  返回“在{}s后完成”。格式(x)

  开始=现在()

  协程=do_some_work(2)

  loop=asyncio.get_event_loop()

  task=asyncio.ensure _ future(协同例程)

  循环运行直到完成(任务)

  print(任务返回:,task.result())

  Print(Time:,now()-start)处于await asyncio.sleep(x)中,因为它已经在这里休眠了,模拟一个阻塞或耗时的操作,此时它会放弃控制权。即当遇到阻塞调用的函数时,使用await方法放弃对协程的控制,这样循环就可以调用其他协程。

  不同线程的事件循环:

  很多时候,我们的事件循环是用来注册协程的,有些协程需要动态添加到事件循环中。一个简单的方法是使用多线程。当前线程创建一个事件循环,然后创建一个新线程,在新线程中启动事件循环。当前线程将不会被阻塞。

  进口异步

  从线程导入线程

  导入时间

  now=lambda :time.time()

  def start_loop(循环):

  asyncio.set_event_loop(循环)

  loop.run_forever()

  def more_work(x):

  打印(更多工作{} )。格式(x))

  睡眠时间(x)

  打印(完成更多工作{} )。格式(x))

  开始=现在()

  new _ loop=asyncio。新事件循环()

  t=线程(target=start_loop,args=(new_loop,)

  启动()

  打印(时间:{} 。format(time.time() - start))

  new _ loop。call _ soon _ threadsafe(more _ work,6)

  new _ loop。call _ soon _ threadsafe(more _ work,3)查看代码

  启动上述代码之后,当前线程不会被块,新线程中会按照顺序执行call_soon_threadsafe方法注册的更多_工作方法,后者因为时间。睡眠操作是同步阻塞的,因此运行完毕更多_工作需要大致6 3

  新线程协程

  进口异步

  导入时间

  从线程导入线程

  now=lambda :time.time()

  定义开始_循环(循环):

  asyncio.set_event_loop(循环)

  loop.run_forever()

  异步定义做一些工作(x):

  打印(等待{} )。格式(十))

  等待异步睡眠(十)

  打印(在{}s后完成)。格式(十))

  def more_work(x):

  打印(更多工作{} )。格式(十))

  睡眠时间(十)

  打印(完成更多工作{} )。格式(十))

  开始=现在()

  new _ loop=asyncio。新事件循环()

  t=线程(target=start_loop,args=(new_loop,)

  启动()

  打印(时间:{} 。format(time.time() - start))

  阿辛西奥。run _ coroutine _ thread safe(do _ some _ work(6),new_loop)

  阿辛西奥。run _ coroutine _ thread safe(do _ some _ work(4),new_loop)查看代码

  述的例子,主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环。主线程通过运行协程线程安全新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被街区。一共执行的时间大概在6s左右。

  动态添加协程:

  在实战之前,我们要先了解下在异步超正析象管(Image Orthicon)中如何将协程态添加到事件循环中的。这是前提。

  如何实现呢,有两种方法:

  主线程是同步的

  进口异步

  导入时间

  从线程导入线程

  now=lambda :time.time()

  定义开始_循环(循环):

  # 一个在后台永远运行的事件循环

  asyncio.set_event_loop(循环)

  loop.run_forever()

  def do_some_work(x):

  打印(等待{} )。格式(十))

  #等待asyncio.sleep(x)

  睡眠时间(十)

  打印(在{}s后完成)。格式(十))

  定义停止循环(循环):

  loop.stop()

  开始=现在()

  new _ loop=asyncio。新事件循环()

  # 定义一个线程,并传入一个事件循环对象

  t=线程(target=start_loop,args=(new_loop,)

  启动()

  打印(时间:{} 。format(time.time() - start))

  # 动态添加两个协程

  # 这种方法,在主线程是同步的

  new _ loop。call _ soon _ threadsafe(do _ some _ work,6)

  new _ loop。call _ soon _ threadsafe(do _ some _ work,4)

  new_loop.call_soon(停止循环,新循环)视图代码

  时间:0.002000093460083008

  等待6

  6s后完成

  等待四

  4s后完成

  进程已结束,退出代码0

  主线程是异步的,这是重点,一定要掌握。

  进口异步

  导入时间

  从线程导入线程

  now=lambda :time.time()

  定义开始_循环(循环):

  # 一个在后台永远运行的事件循环

  asyncio.set_event_loop(循环)

  loop.run_forever()

  异步定义做一些工作(x):

  打印(等待{} )。格式(十))

  等待异步睡眠(十)

  打印(在{}s后完成)。格式(十))

  定义停止循环(循环):

  loop.stop()

  开始=现在()

  new _ loop=asyncio。新事件循环()

  # 定义一个线程,并传入一个事件循环对象

  t=线程(target=start_loop,args=(new_loop,)

  启动()

  # 动态添加两个协程

  # 这种方法,在主线程是同步的

  阿辛西奥。run _ coroutine _ thread safe(do _ some _ work(6),new_loop)

  阿辛西奥。run _ coroutine _ thread safe(do _ some _ work(4),new_loop)

  打印(时间:{} 。format(time.time() - start))

  时间:0.002000093460083008

  等待6

  等待四

  4s后完成

  6s后完成

  实战:利用存储实现动态添加任务;对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似主工人的方式,主人主要用户获取队列的味精,工人用户处理消息。

  为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用存储的队列。主线程中有一个是无限循环,用户消费队列。

  导入时间

  导入存储

  进口异步

  从队列导入队列

  从线程导入线程

  定义开始_循环(循环):

  # 一个在后台永远运行的事件循环

  asyncio.set_event_loop(循环)

  loop.run_forever()

  异步定义do_sleep(x,队列):

  等待异步睡眠(十)

  queue.put(ok )

  def get_redis():

  connection_pool=redis .连接池(主机=127.0.0.1 ,数据库=0)

  返回redis .Redis(连接池=连接池)

  定义消费者():

  虽然正确:

  task=rcon.rpop(队列)

  如果不是任务:

  时间。睡眠(1)

  继续

  阿辛西奥。run _ coroutine _ threadsafe(do _ sleep(int(task),queue),new_loop)

  if __name__==__main__ :

  print(time.ctime())

  new _ loop=asyncio。新事件循环()

  # 定义一个线程,运行一个事件循环对象,用于实时接收新任务

  loop _ Thread=Thread(target=start _ loop,args=(new_loop,))

  loop_thread.setDaemon(True)

  loop_thread.start()

  # 创建存储连接

  rcon=get_redis()

  queue=Queue()

  # 子线程:用于消费队列消息,并实时往事件对象容器中添加新任务

  消费者线程=线程(目标=消费者)

  consumer_thread.setDaemon(True)

  消费者线程启动()

  虽然正确:

  msg=queue.get()

  打印(协程运行完.)

  打印(当前时间:,time.ctime())查看代码

  稍微讲下代码

  循环线程:单独的线程,运行着一个事件对象容器,用于实时接收新任务。

  消费者线程:单独的线程,实时接收来自使用心得的消息队列,并实时往事件对象容器中添加新任务。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: