python 异步io并发,

  python 异步io并发,

  本文详细阐述了Python并发编程的IO模型,并通过实例代码进行了详细介绍。对大家的学习或者工作都有一定的参考价值,有需要的朋友可以参考一下。

  

五种IO模型

  为了更好地理解IO模型,我们需要提前复习一下:同步、异步、阻塞和非阻塞。

  同步)IO异步)IO阻塞IO非阻塞IO五种I/O模式包括阻塞I/O、非阻塞)IO、信号驱动I/O(不常用)、I/O复用和异步I/O,其中前四种称为同步I/O。

  后五种型号的阻塞程度由低到高:阻塞I/O非阻塞I/O多路复用I/O信号驱动I/O异步I/O,所以它们的效率由低到高。

  

1、阻塞I/O模型

  在linux中,默认情况下所有套接字都是阻塞的。除非另有说明,几乎所有的I/O接口(包括socket接口)都是阻塞的。

  如果你面对的是成千上万甚至上万个并发的客户端请求,“线程池”或者“连接池”或许可以缓解一部分压力,但并不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但是面对大规模的服务请求,多线程模型也会遇到瓶颈,可以使用非阻塞接口来尝试解决这个问题。

  

2、非阻塞I/O模型

  在非阻塞I/O中,用户进程实际上需要询问内核数据是否准备好了。但是非阻塞I/O模型绝不被推荐。

  不堵不等。例如,当您创建一个套接字来连接到一个地址并获取接收到的数据的recv时,在执行后续操作之前,您将默认等待(成功连接或接收到的数据)。

  如果设置了setblocking(False ),上述两个进程将不再等待,但会报告一个错误BlockingIOError,只要它被捕获。

  异步,通知,自动执行回调函数或者执行完成后自动执行某些操作(通知)。比如,给一个地址百度一个爬虫。Com发送请求,并在请求执行后执行回调函数。

  

3、多路复用I/O模型(事件驱动)

  基于事件周期的异步非阻塞框架3360,如Twisted框架、scrapy框架(单线程完成并发)。

  检查多个socket是否有变化(连接是否成功/数据是否已采集)(读/写)IO复用?

  系统有三种模式来检测插座是否改变:

  选择:1024插座;最多;环路检测。轮询:监控插座数量不限;环路检测(水平触发)。Epoll:无限数量的监控插座;回调模式(边沿触发)。Python模块:

  Select.selectselect.epoll基于IO复用套接字非阻塞,实现并发请求(每个线程100个请求)。

  导入插座

  #创建套接字

  客户端=socket.socket()

  #将原始锁定位置更改为非锁定位置(报告错误)

  client.setblocking(False)

  #百度创建连接3360屏蔽

  尝试:

  #已执行但报告错误

  client.connect((www.baidu.com ,80))

  除了阻塞错误为e:

  及格

  #检测到连接已经成功。

  #问百度我要什么?

  client.sendall(bGET /s?wd=Alex HTTP/1.0 \ r \ n host : www . Baidu.com \ r \ n \ r \ n )

  #我在等百度的回复。

  chunk_list=[]

  而True:

  #将原始锁定位置更改为非锁定位置(报告错误)

  chunk=client.recv(8096)

  如果不是chunk:

  破裂

  chunk_list.append(组块)

  body=b 。联接(chunk_list)

  print(body.decode(utf-8 ))

  

selectors模块

  #服务器

  从套接字导入*

  导入选择器

  sel=选择器。默认选择器()

  def accept(server_fileobj,mask):

  conn,addr=server_fileobj.accept()

   sel.register(conn,selectors.EVENT_READ,read)

  def read(conn,mask):

   try:

   data=conn.recv(1024)

   if not data:

   print(closing,conn)

   sel.unregister(conn)

   conn.close()

   return

   conn.send(data.upper()+b_SB)

   except Exception:

   print(closing, conn)

   sel.unregister(conn)

   conn.close()

  server_fileobj=socket(AF_INET,SOCK_STREAM)

  server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

  server_fileobj.bind((127.0.0.1,8088))

  server_fileobj.listen(5)

  server_fileobj.setblocking(False) #设置socket的接口为非阻塞

  sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄

   #server_fileobj,并且绑定了一个回调函数accept

  while True:

   events=sel.select() #检测所有的fileobj,是否有完成wait data的

   for sel_obj,mask in events:

   callback=sel_obj.data #callback=accpet

   callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

  #客户端

  from socket import *

  c=socket(AF_INET,SOCK_STREAM)

  c.connect((127.0.0.1,8088))

  while True:

   msg=input(>>: )

   if not msg:continue

   c.send(msg.encode(utf-8))

   data=c.recv(1024)

   print(data.decode(utf-8))

  

4、异步I/O

  asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。

  asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

  用asyncio实现Hello world代码如下:

  

import asyncio

  @asyncio.coroutine

  def hello():

   print("Hello world!")

   # 异步调用asyncio.sleep(1):

   r = yield from asyncio.sleep(1)

   print("Hello again!")

  # 获取EventLoop:

  loop = asyncio.get_event_loop()

  # 执行coroutine

  loop.run_until_complete(hello())

  loop.close()

  @asyncio.coroutine把一个generator标记为coroutine类型,然后,我们就把这个coroutine扔到EventLoop中执行。

  hello()会首先打印出Hello world!,然后,yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。

  把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。

  我们用Task封装两个coroutine试试:

  

import threading

  import asyncio

  @asyncio.coroutine

  def hello():

   print(Hello world! (%s) % threading.currentThread())

   yield from asyncio.sleep(1)

   print(Hello again! (%s) % threading.currentThread())

  loop = asyncio.get_event_loop()

  tasks = [hello(), hello()]

  loop.run_until_complete(asyncio.wait(tasks))

  loop.close()

  观察执行过程:

  

Hello world! (<_MainThread(MainThread, started 140735195337472)>)

  Hello world! (<_MainThread(MainThread, started 140735195337472)>)

  (暂停约1秒)

  Hello again! (<_MainThread(MainThread, started 140735195337472)>)

  Hello again! (<_MainThread(MainThread, started 140735195337472)>)

  由打印的当前线程名称可以看出,两个coroutine是由同一个线程并发执行的。

  如果把asyncio.sleep()换成真正的IO操作,则多个coroutine就可以由一个线程并发执行。

  我们用asyncio的异步网络连接来获取sina、sohu和163的网站首页:

  

import asyncio

  @asyncio.coroutine

  def wget(host):

   print(wget %s... % host)

   connect = asyncio.open_connection(host, 80)

   reader, writer = yield from connect

   header = GET / HTTP/1.0\r\nHost: %s\r\n\r\n % host

   writer.write(header.encode(utf-8))

   yield from writer.drain()

   while True:

   line = yield from reader.readline()

   if line == b\r\n:

   break

   print(%s header > %s % (host, line.decode(utf-8).rstrip()))

   # Ignore the body, close the socket

   writer.close()

  loop = asyncio.get_event_loop()

  tasks = [wget(host) for host in [www.sina.com.cn, www.sohu.com, www.163.com]]

  loop.run_until_complete(asyncio.wait(tasks))

  loop.close()

  执行结果如下:

  

wget www.sohu.com...

  wget www.sina.com.cn...

  wget www.163.com...

  (等待一段时间)

  (打印出sohu的header)

  www.sohu.com header > HTTP/1.1 200 OK

  www.sohu.com header > Content-Type: text/html

  ...

  (打印出sina的header)

  www.sina.com.cn header > HTTP/1.1 200 OK

  www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT

  ...

  (打印出163的header)

  www.163.com header > HTTP/1.0 302 Moved Temporarily

  www.163.com header > Server: Cdn Cache Server V2.0

  ...

  可见3个连接由一个线程通过coroutine并发完成。

  

async/await

  用asyncio提供的@asyncio.coroutine可以把一个generator标记为coroutine类型,然后在coroutine内部用yield from调用另一个coroutine实现异步操作。

  为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法asyncawait,可以让coroutine的代码更简洁易读。

  请注意,asyncawait是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:

  

  • @asyncio.coroutine替换为async
  • yield from替换为await

  让我们对比一下上一节的代码:

  

@asyncio.coroutine

  def hello():

   print("Hello world!")

   r = yield from asyncio.sleep(1)

   print("Hello again!")

  用新语法重新编写如下:

  

async def hello():

   print("Hello world!")

   r = await asyncio.sleep(1)

   print("Hello again!")

  剩下的代码保持不变。

  

小结

  asyncio提供了完善的异步IO支持;

  异步操作需要在coroutine中通过yield from完成;

  多个coroutine可以封装成一组Task然后并发执行。

  到此这篇关于Python并发编程之IO模型的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: