深入理解asyncio(一)

论坛 期权论坛 金融     
吴宇   2022-7-6 22:42   19248   20
本文首发于微信公众号「Python之美」 https://mp.weixin.qq.com/s/kxWmO6Q_VYt749OhAoTEUA
前言

这几天看asyncio相关的pycon视频又重温了asyncio 的官方文档,收获很多。之前asyncio被吐槽的一点就是文档写的不好,Python 3.7 时 asyncio 的官方文档被 Andrew Svetlov 以及 Yury Selivanov 等核心开发者重写了,新的版本我觉得已经好很多了。在这里记录一下我对asyncio的一些理解。
核心概念

asyncio里面主要有4个需要关注的基本概念
Eventloop

Eventloop可以说是asyncio应用的核心,是中央总控。Eventloop实例提供了注册、取消和执行任务和回调的方法。
把一些异步函数(就是任务,Task,一会就会说到)注册到这个事件循环上,事件循环会循环执行这些函数(但同时只能执行一个),当执行到某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行:这就是事件循环的目标。
Coroutine

协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程:
cat coro1.py
import asyncio


async def a():
    print('Suspending a')
    await asyncio.sleep(0)
    print('Resuming a')


async def b():
    print('In b')


async def main():
    await asyncio.gather(a(), b())


if __name__ == '__main__':
    asyncio.run(main())这里面有4个重要关键点:

  • 协程要用async def声明,Python 3.5时的装饰器写法已经过时,我就不列出来了。
  • asyncio.gather用来并发运行任务,在这里表示协同的执行a和b2个协程
  • 在协程a中,有一句await asyncio.sleep(0),await表示调用协程,sleep 0并不会真的sleep(因为时间为0),但是却可以把控制权交出去了。
  • asyncio.run是Python 3.7新加的接口,要不然你得这么写:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()好了,我们先运行一下看看:
python coro1.py
Suspending a
In b
Resuming a看到了吧,在并发执行中,协程a被挂起又恢复过。
Future

接着说Future,它代表了一个「未来」对象,异步操作结束后会把最终结果设置到这个Future对象上。Future是对协程的封装,不过日常开发基本是不需要直接用这个底层Future类的。我在这里只是演示一下:
In : def c():
...:     print('Inner C')
...:     return 12
...:

In : future = loop.run_in_executor(None, c)  # 这里没用await,None 表示默认的 executor
Inner C

In : future  # 虽然c已经执行了,但是状态还是 pending。
Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]>

In : future.done()  # 还没有完成
Out: False

In : for a in dir(future):
...:     if not a.startswith('_'):
...:         print(a)
...:
add_done_callback
cancel
cancelled
done
exception
get_loop
remove_done_callback
result
set_exception
set_result可以对这个Future实例添加完成后的回调(add_done_callback)、取消任务(cancel)、设置最终结果(set_result)、设置异常(如果有的话,set_exception)等。现在我们让Future完成:
In : await future
Out: 12

In : future
Out: <Future finished result=12>

In : future.done()
Out: True

In : future.result()
Out: 12看到了吧,await之后状态成了finished。这里顺便说一下,一个对象怎么样就可以被await(或者说怎么样就成了一个awaitable对象)呢?给类实现一个__await__方法,Python版本的Future的实现大概如下:
def __await_(self):
    if not self.done():
        self._asyncio_future_blocking = True
        yield self
    if not self.done():
        raise RuntimeError("await wasn't used with future")
    return self.result()这样就可以await future了,那为什么await future后Future的状态就能改变呢,这是因为用loop.run_in_executor创建的Future注册了一个回调(通过asyncio.futures.wrap_future,加了一个_call_set_state回调, 有兴趣的可以通过延伸阅读链接2找上下文)。
__await__里面的yield self不要奇怪,主要是为了兼容__iter__,给旧的yield from用:
In : future = loop.run_in_executor(None, c)
Inner C

In : future
Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]>

In : def spam():
...:     yield from future
...:

In : s = spam()

In : next(s)
Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]>新的替代yield from的用法await必须在异步函数(用 async def申明)中使用:
In : def spam():
...:     await future
...:
  File "cell_name", line 5
SyntaxError: 'await' outside async functionTask

Eventloop除了支持协程,还支持注册Future和Task2种类型的对象,那为什么要存在Future和Task这2种类型呢?
先回忆前面的例子,Future是协程的封装,Future对象提供了很多任务方法(如完成后的回调、取消、设置任务结果等等),但是开发者并不需要直接操作Future这种底层对象,而是用Future的子类Task协同的调度协程以实现并发。
Task非常容易创建和使用:
# 或者用task = loop.create_task(a())
In : task = asyncio.ensure_future(a())

In : task
Out: <Task pending coro=<a() running at /Users/dongwm/mp/2019-05-22/coro1.py:4>>

In : task.done()
Out: False

In : await task
Suspending a
Resuming a

In : task
Out: <Task finished coro=<a() done, defined at /Users/dongwm/mp/2019-05-22/coro1.py:4> result=None>

In : task.done()
Out: Trueasyncio并发的正确/错误姿势

在代码中使用async/await是不是就能发挥asyncio的并发优势么,其实是不对的,我们先看个例子:
async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')


async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')


async def s1():
    await a()
    await b()有2个协程a和b,分别sleep1秒和3秒,如果协程可以并发执行,那么执行时间应该是sleep最大的那个值(3秒),现在它们都在s1协程里面被调用。大家先猜一下s1会运行几秒?
我们写个小程序验证一下:
def show_perf(func):
    print('*' * 20)
    start = time.perf_counter()
    asyncio.run(func())
    print(f'{func.__name__} Cost: {time.perf_counter() - start}')大家注意我这个时间计数用的方法,没有用time.time,而是用了Python 3.3新增的time.perf_counter它是现在推荐的用法。我们在IPython里面验证下:
In : from coro2 import *

In : show_perf(s1)
********************
Suspending a
Resuming a
Suspending b
Resuming b
s1 Cost: 4.009796932999961看到了吧,4秒!!!,相当于串行的执行了(sleep 3 + 1)。这是错误的用法,应该怎么用呢,前面的asyncio.gather就可以:
async def c1():
    await asyncio.gather(a(), b())

In : show_perf(c1)
********************
Suspending a
Suspending b
Resuming b
Resuming a
c1 Cost: 3.002452698999832看到了吧,3秒!另外一个是asyncio.wait:
async def c2():
    await asyncio.wait([a(), b()])

In : show_perf(c2)
...
c2 Cost: 3.0066957049998564同样是3秒。先别着急,gather和wait下篇文章还会继续对比。还有一个方案就是用asyncio.create_task:
async def c3():
    task1 = asyncio.create_task(a())
    task2 = asyncio.create_task(b())
    await task1
    await task2


async def c4():
    task = asyncio.create_task(b())
    await a()
    await task

In : show_perf(c3)
...
c3 Cost: 3.002332438999929

In : show_perf(c4)
...
c4 Cost: 3.002270970000154都是3秒。asyncio.create_task相当于把协程封装成Task。不过大家要注意一个错误的用法:
async def s2():
    await asyncio.create_task(a())
    await asyncio.create_task(b())

In : show_perf(s2)
...
s2 Cost: 4.004671427999938**直接await task不会对并发有帮助。asyncio.create_task是Python 3.7新增的高阶API,*是推荐的用法,其实你还可以用asyncio.ensure_future和loop.create_task:
async def c5():
    task = asyncio.ensure_future(b())
    await a()
    await task


async def c6():
    loop = asyncio.get_event_loop()
    task = loop.create_task(b())
    await a()
    await task

In : show_perf(c5)
...
c5 Cost: 3.0033873750003295

In : show_perf(c6)
...
c6 Cost: 3.006120122000084到这里,我们一共看到2种错误的,6种正确的写法。你学到了么?
代码目录

本文代码可以在 mp项目 找到
延伸阅读


  • https://www.python.org/dev/peps/pep-0492/
  • https://github.com/python/cpython/blob/3.7/Lib/asyncio/futures.py#L365
分享到 :
0 人收藏
萍水相逢,尽是他乡之客

20 个回复

倒序浏览
2#
重新开始的蜗牛  3级会员 | 2022-7-6 22:42:38 发帖IP地址来自 北京
这个库还是有点难的感觉
3#
不兹道0212  4级常客 | 2022-7-6 22:42:50 发帖IP地址来自 北京
js早就有了
4#
你最后的救赎  1级新秀 | 2022-7-6 22:43:39 发帖IP地址来自 中国
比yield进步很多了
5#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:44:32 发帖IP地址来自 中国
其实挺期待 async 版本的 requests
6#
S汌餂  1级新秀 | 2022-7-6 22:44:44 发帖IP地址来自 中国
期待你更新这个模块的文章
7#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:45:12 发帖IP地址来自 湖北
[捂脸]有没有讲asyncio基础应用的文章
8#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:45:54 发帖IP地址来自 北京
现在网上关于asyncio的原创文章太少了,感谢分享[捂脸]
9#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:46:22 发帖IP地址来自 北京
aiohttp?
10#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:47:20 发帖IP地址来自 中国
刚看完go的协程再回头来看Python的 文档还是头痛 看了这篇才明白
[棒]
11#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:47:59 发帖IP地址来自 北京
httpx?
12#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:48:44 发帖IP地址来自 北京
requests_html 可以异步 from requests_html import AsyncHTMLSession , 就是效率不知道如何
13#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:49:44 发帖IP地址来自 中国
asyncio.create_task方法是将一个协程包装成一个任务对象,**并且将该任务任务绑定到事件循环上** 是绑定,然后看时机等待被事件循环调度,这些都是非阻塞的,返回任务对象本身,如果你对任何一个可等待对象直接进行了await操作,那么一定得等到该可等待对象的状态确定之后才能返回,否则一定会被阻塞在此处,所以你不要再所有的地方都进行await操作,你只需在需要等待的地方进行await操作,以避免阻塞,并且你的每一个协程任务应该尽可能的小,让他们只做好一件事,如果是cpu密集型的任务,尽量与你的异步代码分离,因为你每个时刻只能干一件事,如果cpu密集型的任务占用了太多的cpu,你就没办法处理更多的IO了,异步IO是为了用更少的资源更方便处理更多的多的IO,在python中,协程不能对cpu密集型的任务进行加速
14#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:50:18 发帖IP地址来自 中国
Coroutine这种过时的方式是指,asyncio.coroutine这种返回生成器的装饰器吧?廖雪峰的教程里介绍的还是这种方式。 我看了下官方文档,说这个方式在3.10里面不再支持了。
15#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:50:38 发帖IP地址来自 福建
最好的方式还先看看官方文档,跑完之后再看看别人的讲解
16#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:51:12 发帖IP地址来自 北京
以后肯定会慢慢抛弃历史遗留问题,不过企业要把py换成3.10,估计最少得2-3年以后了吧
17#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:51:48 发帖IP地址来自 中国
async def s3():
    task = asyncio.create_task(b())
    await task
    await a()
这样写也是4s
18#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:52:45 发帖IP地址来自 北京
如果是CPU密集型的任务...就不该去考虑异步非阻塞, 因为没有任何意义, 且无论Python还是其他语言, 协程这个东西本身也不是用来加速的, 而是用来提高CPU的利用率的, 防止CPU有过多的空转, CPU密集型...就不该有空转
19#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:53:38 发帖IP地址来自 北京
写法源于原理, 内嵌的Future当然跟阻塞一模一样, 只有把内嵌的Future变成同级的Future才会出异步效果
20#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:54:00 发帖IP地址来自 中国
在早期, Python中的greenlet以及gevent都是有栈协程的实现, 而后期也就是Python 3.5 之后, 新的协程实现是无栈协程的实现, 两种实现方式在写法上的差异是比较大的, async/await更加规范标准, 而且个人也更加喜欢无栈协程(只要用async/await的写法基本都是无栈协程)
21#
吴宇  管理员  伦敦金丝雀码头交易员 | 2022-7-6 22:54:56 发帖IP地址来自 北京
是啊,能不能加速,只是看问题的角度不同,几乎可以同时处理多个io,相对来说不就是加速了io吗,虽然单个io没什么差别,但io多了就是另外一种视角了,对吧
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:38337
帖子:3370
精华:36
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP