python异步编程实战,深入理解python异步编程
Yyds干货库存
Python 3.5引入了两个新的关键字:async和await。这些看似神奇的关键词,完全可以在没有任何线程的情况下实现线程式并发。在本教程中,我们将介绍异步编程的原因,并通过构建我们自己的小型异步类框架来解释Python的async/await关键字如何在内部工作。
为什么要异步编程?要理解异步编程的动机,我们必须首先理解是什么限制了我们代码的速度。理想情况下,我们希望我们的代码以光速运行,并立即跳过我们的代码,没有任何延迟。然而,由于两个因素,代码实际上运行得要慢得多:
CPU时间(处理器执行指令的时间)IO时间(等待网络请求或存储读/写的时间)当我们的代码在等待IO时,CPU基本上是空闲的,等待某个外部设备响应。通常,内核会检测到这一点,并立即切换到执行系统中的其他线程。因此,如果我们希望加快一组IO密集型任务的处理速度,我们可以为每个任务创建一个线程。当其中一个线程停止等待IO时,内核会切换到另一个线程继续处理。
这在实践中效果很好,但是它有两个缺点:
存在线程开销(尤其是在Python中)。我们无法控制内核何时选择在线程间切换。例如,如果我们想要执行10,000个任务,我们要么必须创建10,000个线程,这将占用大量RAM,要么我们需要创建较少数量的工作线程,并以较少的并发性执行任务。此外,这些线程的初始生成将占用CPU时间。
因为内核可以随时选择在线程之间切换,所以我们的代码随时都可能存在相互竞争的情况。
在基于同步线程的传统代码中引入异步,内核必须检测线程何时被IO绑定,并选择随意在线程之间切换。使用Python异步,程序员使用关键字await来确认声明IO绑定的代码行,并确认被授予执行其他任务的权限。例如,考虑以下执行Web请求的代码:
异步定义请求_google():
reader,writer=await asyncio . open _ connection( Google . com ,80)
writer.write(bGET/HTTP/2\n\n )
等待writer.drain()
response=await reader.read()
Return response.decode()在这里,在这里,我们看到代码在两个地方等待。所以在等待我们的字节发送到服务器(writer.drain())的同时,在等待服务器回复一些字节(reader.read())的同时,我们知道其他代码可能会执行,全局变量可能会改变。但是,从函数开始到第一次等待,我们可以保证代码逐行运行,而不会切换到正在运行的程序中的其他代码。这就是异步的美妙之处。
Asyncio是一个标准库,允许我们用这些异步函数做一些有趣的事情。例如,如果我们想同时向Google发出两个请求,我们可以:
异步定义请求_google_twice():
response_1,response _ 2=await asyncio . gather(request_google(),request _ Google())
返回response_1,response_2当我们调用request_google_twice()时,神奇的asyncio.gather会启动一个函数调用,但当我们调用await writer.drain()时,它会启动第二个函数调用,这样两个请求就并行发生了。然后,它等待第一个或第二个请求的writer.drain()调用完成,并继续执行该函数。
最后,忽略了一个重要的细节:asyncio.run为了从常规[同步] Python函数中实际调用异步函数,我们将调用包装在asyncio.run(.):
异步定义async_main():
r1,r2=await request_google_twice()
打印(回答一:,r1)
打印(响应二:,r2)
返回12
return _ val=async io . run(async_main())请注意,如果我们只调用async _ main()而不调用await.或者asyncio.run(.),什么都不会发生。这仅仅受到异步工作模式的性质的限制。
那么,异步是如何工作的,这些神奇的asyncio.run和asyncio.gather函数的作用是什么?详情请阅读下文。
异步是如何工作的?要理解async的神奇之处,我们首先需要了解一个更简单的Python构造:generator(前面《生成器和协程》,如果你没看过,可以去我的主页看看这篇文章。回来学这个就容易了)。
生成器是一个Python函数,它逐个返回一系列值(迭代)。例如:
def get_numbers()。
打印( get_numbers begin )
print( get_numbers给出1 . )
产量1
print( get_numbers给出2 . )
产量2
print( get_numbers给出3 . )
产量3
打印( get_numbers end )
打印( 表示开始)
对于get_numbers()中的数字:
print(f“ Got { number }”。)
打印( 表示结束)表示开始
获取编号开始
get_numbers给1.
得到了1。
get_numbers给2.
得到了2。
get_numbers给3.
得了3。
获取号码结束
for end因此,我们看到,对于for循环的每次迭代,我们只在生成器中执行一次。我们可以使用Python的next()函数更明确地执行这种迭代:
In [3]: generator=get_numbers()
在[4]:下一个(生成器)
获取编号开始
get_numbers给1.
Out[4]: 1
在[5]:下一个(生成器)
get_numbers给2.
Out[5]: 2
在[6]:下一个(生成器)
get_numbers给3.
Out[6]: 3
在[7]:下一个(生成器)
获取号码结束
-
StopIteration Traceback(最近一次调用)
模块中的ipython-input-154-323ce5d717bb
- 1下一个(发电机)
StopIteration:这非常类似于异步函数的行为。就像异步函数从函数开始一直执行代码到第一次等待一样,当我们第一次调用next()时,生成器会从函数的顶部执行到第一条yield语句。然而,现在我们只是从生成器返回数字。我们将使用相同的思想,但是返回不同的内容,使用生成器创建类似异步的函数。
使用生成器异步让我们使用生成器来创建我们自己的小型异步框架。
然而,为了简单起见,让我们用sleep(即time.sleep)来代替实际的IO。让我们考虑一个需要定期发送更新的应用程序:
def send_updates(计数:int,间隔_秒:float):
对于范围内的I(1,计数1):
time.sleep(间隔_秒)
Print ([{}] SendingUpdate {}/{} . 。格式(interval _ seconds,I,count))。因此,如果我们调用send_updates(3,1.0),它将输出这三条消息,每条消息间隔1秒钟:
[1.0]发送更新1/3。
[1.0]发送更新2/3。
[1.0]发送更新3/3。现在,假设我们想同时运行几个不同的时间间隔。比如send_updates(10,1.0),send_updates(5,2.0)和send_updates(4,3.0)。我们可以使用线程来做到这一点,如下所示:
线程=[
穿线。Thread(target=send_updates,args=(10,1.0))
穿线。线程(target=send_updates,args=(5,2.0)),
穿线。线程(target=send_updates,args=(4,3.0))
]
对于螺纹中的I:
i.start()
对于螺纹中的I:
I.join()这个是可行的,大概12秒就完成了,但是使用的线程有前面提到的缺点。让我们用发电机来建造同样的东西。
在演示生成器的例子中,我们返回了一个整数。为了获得类似异步的行为,而不是返回任意值,我们希望返回一些描述IO等待的对象。在我们的例子中,我们的“IO”只是一个计时器,它会等待一段时间。因此,让我们为此创建一个计时器对象:
类异步计时器:
def __init__(self,duration: float):
self . done _ time=time . time()duration现在,让我们从我们的函数中生成它,而不是调用time.sleep:
def send_updates(计数:int,间隔_秒:float):
对于范围内的I(1,计数1):
产出异步计时器(间隔_秒)
Print ([{}] SendingUpdate {}/{} . 。Format (interval _ seconds,I,count))现在,每次我们调用next(.)当我们调用send_updates(.),我们得到一个AsyncTimer对象,它告诉我们应该等待到什么时候:
generator=send_updates(3,1.5)
timer=next(generator) # [1.5]发送更新1/3。
print(timer . done _ time-time . time())# 1.498.因为我们的代码实际上并不调用time.sleep now,所以我们现在可以同时执行另一个send_updates调用。
因此,为了将所有这些放在一起,我们需要后退一步,认识到一些事情:
生成器就像一个部分执行的函数,等待一些IO(定时器)。每个部分执行的函数都有一些IO(定时器),在继续执行之前等待。因此,我们程序的当前状态是由每个部分执行的成对函数(生成器)和函数正在等待的IO(定时器)的列表。现在,要运行我们的程序,我们只需要等到一个IO准备好了(也就是我们的一个定时器超时了),然后向前一步执行相应的函数,得到一个阻塞该函数的新IO。这个实现逻辑为我们提供了以下信息:
#用定时器0初始化每个生成器,以便它立即执行
generator_timer_pairs=[
(send_updates(10,1.0),AsyncTimer(0)),
(send_updates(5,2.0),AsyncTimer(0)),
(发送更新(4,3.0),异步计时器(0))
]
而发电机计时器对:
pair=min(generator_timer_pairs,key=lambda x: x[1])。done_time)
生成器,min_timer=pair
#等到计时器准备好
time.sleep(max(0,min_timer.done_time - time.time()))
del generator _ timer _ pairs[generator _ timer _ pairs . index(pair)]
try: #再执行一步这个函数
new_timer=next(生成器)
generator _ timer _ pairs . append((generator,new_timer))
函数完成时,StopIteration除外:#
至此,我们有了一个使用生成器的类似异步函数的工作示例。请注意,当生成器完成时,它将引发StopIteration,当我们不再有部分执行的函数(生成器)时,我们的函数就完成了。
现在,我们将它包装在一个函数中,我们得到类似于asyncio.run Run与asyncio.gather的内容:
def async _ run _ all(*生成器):
generator_timer_pairs=[
(生成器,异步定时器(0))
对于发电机中发电机
]
而发电机计时器对:
pair=min(generator_timer_pairs,key=lambda x: x[1])。done_time)
生成器,min_timer=pair
time.sleep(max(0,min_timer.done_time - time.time()))
del generator _ timer _ pairs[generator _ timer _ pairs . index(pair)]
尝试:
new_timer=next(生成器)
generator _ timer _ pairs . append((generator,new_timer))
除了StopIteration:
及格
异步运行全部(
发送更新(10,1.0),
send_updates(5,2.0),
发送更新(4,3.0)
)
Async/await用于异步实现。asyncio的caveman版本的最后一步是支持Python 3.5中引入的async/await语法。Await的行为类似yield,只是它不直接返回提供的值,而是返回next((.).__await__())。异步函数返回“协程”,其行为类似于生成器,但需要使用。发送(无)而不是下一个()。(请注意,就像生成器在最初被调用时不会返回任何东西一样,异步函数在逐步执行之前也不会做任何事情,这就解释了我们前面提到的内容)。
因此,鉴于这些信息,我们可以将我们的示例转换为async/await,只需做一些调整。以下是最终结果:
类异步计时器:
def __init__(self,duration: float):
self.done_time=time.time()持续时间
def __await__(self):
屈服自我
异步定义send_updates(count: int,interval_seconds: float):
对于范围内的I(1,计数1):
等待异步计时器(间隔_秒)
print([{}]发送更新{}/{}。。格式(间隔_秒,I,计数))
定义等待直到io就绪(ios):
min_timer=min(ios,key=lambda x: x.done_time)
time.sleep(max(0,min_timer.done_time - time.time()))
返回ios.index(min_timer)
def async _ run _ all(*协同例程):
协程_io_pairs=[
(协同程序,AsyncTimer(0))
对于协程中的协程
]
while协程io对:
IOs=[cor的io,协程io对中的io]
就绪索引=_等待_直到_ io _就绪(ios)
协程,_=协程_io_pairs.pop(ready_index)
尝试:
new_io=coroutine.send(无)
协程_io_pairs.append((协程,新_io))
除了StopIteration:
及格
异步运行全部(
发送更新(10,1.0),
send_updates(5,2.0),
发送更新(4,3.0)
)我们有了它,并且使用async/await完成了我们的迷你异步示例。现在,你可能注意到了,我把timer重命名为IO,并把寻找最小计时器的逻辑提取到一个名为_wait_until_io_ready的名字中。这是这个例子和上一个话题:真实IO的有意联系。
这里,我们已经使用async/await完成了我们的小型异步示例。现在,你可能已经注意到了,我把timer重命名为io,把用来寻找最小计时器的逻辑提取到一个名为_wait_until_io_ready的函数中。这是把这个例子和上一个话题:真实IO联系起来。
Real IO(不仅仅是timer)所以,所有这些例子都很棒,但是它们和real asyncio有什么关系呢?我们要在真实IO上等待TCP套接字和文件读/写?好吧,美妙之处在于那个_wait_until_io_ready函数。为了让真正的IO正常工作,我们所要做的就是用类似于AsyncTimer的文件描述符创建一些新对象。然后,AsyncReadFile我们等待的对象集对应于一组文件描述符。最后,我们可以使用函数(syscall) select()来等待其中一个文件描述符准备就绪。因为TCP/UDP套接字是使用文件描述符实现的,所以这也包括网络请求。
所以,所有这些例子都很好,但是它们与真正的异步IO有什么关系呢?我们要等待实际的IO,比如TCP套接字和文件读/写?嗯,它的优势在于_wait_until_io_ready函数。要使真正的IO工作,我们需要做的就是创建一些新的AsyncReadFile,类似于AsyncTimer,它包含一个文件描述符。然后,我们等待的一组AsyncReadFile对象对应一组文件描述符。最后,我们可以使用函数(syscall) select()来等待其中一个文件描述符准备就绪。因为TCP/UDP套接字是使用文件描述符实现的,所以这也包括网络请求。
总结有了它,Python异步从头开始。虽然我们已经深入研究过了,但还是有很多细微之处没有涉及到。例如,要从另一个生成器函数调用一个类似生成器异步的函数,我们将使用yield from,并且我们可以通过将参数传递给。发送(.).关于异步IO的具体构造还有很多其他的话题,还有很多其他的微妙之处,比如异步生成器和任务取消,但是我们还是留给大家去详细研究吧。
原创作品来自程,
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。