0%

asyncio小记

关于python中asyncio使用的一些记录

asyncio

asyncio是python的一种异步实现方法,可以理解成基于n = yield m这种方式实现的用户级线程,更加轻量,通过运行在一个事件循环里,可以有效的缩减IO耗时。

可以使用async def fun():的方式来创建一个协程函数,直接执行它会类似于生成器函数一样,得到一个协程对象而不是执行函数本身。

另外,通过关键词await + [async func | corortine | task]可以实现当前函数放权并进入事件循环的调度中,从而完成并发:

  • await + async func:会直接调度进目标协程内部,并执行此协程的代码,直到再次被调度为止
  • await + corortine:与上类似
  • await + task:需要先通过asyncio.create_task(func())创建出一个任务,再通过await调度进入事件循环,且task被创建出来后相当于被注册,不会立刻执行,一般是多个任务的首选方式,更好的利用并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio


async def func1():
print(1)


async def func2():
print(2)


async def func3():
print(3)


async def main():
corortine2 = func2()
task3 = asyncio.create_task(func3())
# 第一种方法:直接加async def()
await func1()
# 第二种方法:直接加协程对象
await corortine2
# 第三种方法:加任务
await task3

asyncio.run(main())

事件循环是单线程的,事件循环内的任务都是单线程运行的,只有每个任务主动放弃执行权(阻塞),事件循环才会调度下一个任务来执行。

可以查看python协程与asyncio

每个任务可以使用cancel()方法来取消,也可以使用done()方法来判断是否完成,使用result()方法来获取结果。

但是cancel()方法是非阻塞的,任务被取消的异常只能从await中抛出。

也可以使用asyncio.wait_for()函数来设置超时时间,如果超时则抛出asyncio.TimeoutError异常,它会在任务超时后自动取消任务,如果不希望超时取消,可以使用asyncio.shield()函数来保护任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio


async def func1():
await asyncio.sleep(2)
print("func1")


async def main():
# cancel取消后只能在await抛出异常
task1 = asyncio.create_task(func1())
task1.cancel()
try:
await task1
except asyncio.CancelledError:
print("task1 is cancelled")

# wait_for设置超时会自动取消任务
task2 = asyncio.create_task(func1())
try:
await asyncio.wait_for(task2, timeout=1)
except asyncio.TimeoutError:
print("task2 is timeout")

# shield保护任务不被取消
task3 = asyncio.create_task(func1())
try:
await asyncio.wait_for(asyncio.shield(task3), timeout=1)
except asyncio.TimeoutError:
print("task3 is timeout")
await task3


asyncio.run(main())

使用asyncio.sleep来模拟真实任务的原因

大部分教程都会使用asyncio.sleep函数来模拟真实的IO阻塞,并且考虑到async是具有传递性,所以一定会有最终的一个函数会真正陷入到系统的IO中。

但如果需要真的去实现这样一个“最底层”的async函数,那么需要与事件循环、操作系统交互,难度系数较高。所以一般使用asyncio.sleep来模拟IO阻塞,而在真正的业务中也是直接使用大神们在底层写好的异步库来使用。(比如gevent可以把所有同步IO转异步IO、aiohttp可以实现异步网络请求、aiofile实现异步磁盘IO等)

awaitable、future

协程、任务、future都是awaitable对象,所以在写类型注解的时候可以使用typing.Awaitable来表示。

future对象是一种当前还没有值,但是未来会有值的对象,可以通过asyncio.Future()来创建,一般不写轮子用不到。

另外,可以使用asyncio.get_running_loop()来获取当前的事件循环,使用asyncio.get_event_loop()来获取默认的事件循环,使用asyncio.set_event_loop()来设置默认的事件循环。并且可以在asyncio.run()函数中传入一个协程函数来运行,它会自动创建一个事件循环并运行,同时asyncio.run()可以传入debug=True来进入调试模式,它会在程序执行事件较长时发出提示信息。

asyncio与非阻塞套接字

socket套接字有非阻塞模式,它会立刻返回,但是如果套接字数据还没有到位就调用非阻塞方法,会抛出异常;而如果通过捕获异常的方式又会陷入自旋。

不过asyncio提供了asyncio.sock_connect()asyncio.sock_accept()asyncio.sock_recv()asyncio.sock_sendall()等方法来实现非阻塞套接字的操作,它们会自动处理异常,不会陷入自旋。这些方法是事件循环自带的,可以直接使用,他们是绑定在事件循环中的基于socket的方法,可以理解成socket对应的方法的异步版本,并且socket必须设置为非阻塞。(相对的,事件循环还有create_connection等方法不需要绑定socket,但是使用起来更加复杂。)

但是正常情况下不需要自己基于事件循环来写socket吧,主要是因为异常处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import socket


async def echo(conn, loop):
while data := await loop.sock_recv(conn, 1024):
print(data)
await loop.sock_sendall(conn, data)


async def listen(sock, loop):
while True:
conn, addr = await loop.sock_accept(sock)
asyncio.create_task(echo(conn, loop))


async def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

sock.setblocking(False)
sock.bind(('127.0.0.1', 8080))

sock.listen(5)

await listen(sock, asyncio.get_running_loop())


asyncio.run(main())

asyncio的异常处理

任务异常

当一个任务抛出异常时,它被视为已完成,结果为异常。并且此异常会在我们对这个任务进行await时抛出,如果不await,那么就看不到异常。可以使用try-except-finally来捕获异常,在finally里面做异常的最后回收工作,这样虽然看不到异常,但是资源都能被正确回收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio


async def func1():
raise Exception("func1")
print("func1")


async def main():
task1 = asyncio.create_task(func1())
try:
await task1
except Exception as e:
print(e)
finally:
print("finally")


asyncio.run(main())

事件循环异常

事件循环可能收到信号量而导致关闭,可以在事件循环中使用add_signal_handler()函数定义收到信号量时对应的回调函数(仅用于unix系统)。

但是,当事件循环被关闭的时候,还需要考虑等待每一个任务结束。可以用wait_for()函数来等待每个正在运行的协程一定时间后超时关闭,但是此时需要新建一个时间循环而不是使用asyncio.run(),因为asyncio.run()会自动取消所有任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 设置信号量回调函数
loop.add_signal_handler(signal.SIGINT, shutdown)
# 若回调函数是async def
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown()))


# 等待所有任务结束
async def clean(loop):
waiters = [asyncio.wait_for(t, 1) for t in asyncio.all_task (loop)]
for task in waiters:
try:
await task
except asyncio.TimeoutError:
...

asyncio等待多个任务

当创建了多个任务并需要等待其完成时可以使用一些内置的api来完成,务必不要在列表推导式中使用await,会导致阻塞问题,并且非常难以处理异常

gather等待所有的任务完成与异常处理

可以使用asyncio.gather(*tasks)来等待多个任务,它会返回一个Future对象,当对他执行await时,会等待所有对象全部完成,并且是按照传入的顺序来返回的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio


async def func1(t):
await asyncio.sleep(t)
return t


async def main():
tasks = [asyncio.create_task(func1(i)) for i in range(1, 4)]
results = await asyncio.gather(*tasks)
print(results)
# 输出 [1, 2, 3]

asyncio.run(main())

但是,如果其中一个任务抛出异常,那么gather立刻取消所有任务并抛出异常,这样就无法获取到已经完成的任务的结果了。可以使用return_exceptions=True来忽略异常,这时协程出现的异常会被当做结果返回,而不会影响到其他正常协程的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio


async def func1(t, exception=0):
if exception:
raise Exception("func1")
await asyncio.sleep(t)
return t


async def main():
# 当设置为True时,即使其中一个任务出现异常,也不会影响其他任务的执行
tasks = [asyncio.create_task(func1(i, i % 2)) for i in range(1, 4)]
results = await asyncio.gather(*tasks, return_exceptions=True)
print(results) # [Exception('func1'), 2, Exception('func1')]

# 当设置为False时,如果其中一个任务出现异常,会立即终止其他任务的执行
tasks = [asyncio.create_task(func1(i, i % 2)) for i in range(1, 4)]
try:
results = await asyncio.gather(*tasks, return_exceptions=False)
print(results)
except Exception as e:
print(e) # 只会输出一个异常结果

asyncio.run(main())

使用gather()函数时会等待所有的任务完成,并且需要在结果中手动过滤掉异常。

as_completed等待任意一个任务完成

as_completed()函数会返回一个迭代器,当对它执行await时,会等待其中任意一个任务完成,并且返回一个Future对象,这个对象包含了任务的结果,当迭代器中的所有任务都完成时,迭代器会停止迭代。此时不能保证返回顺序,但是一旦有了返回结果可以立即执行,并且as_completed()不需要解包裹。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio


async def func1(t):
await asyncio.sleep(t)
return t


async def main():
tasks = [func1(3), func1(1), func1(2)]
# as_completed()不需要解包裹
for task in asyncio.as_completed(tasks):
res = await task
print(res)

asyncio.run(main())

as_completed()函数也可以设置超时,超时后它在被await时抛出异常,但是它不会像wait_for()那样取消任务,它返回无须同时不会自动取消任务导致它在出现异常时难以确定哪些任务是未完成的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio


async def func1(t, excpetion):
await asyncio.sleep(t)
if excpetion:
raise Exception("error")
return t


async def main():
tasks = [func1(3, 0), func1(1, 0), func1(2, 1)]
for task in asyncio.as_completed(tasks):
try:
print(await task)
except Exception as e:
print(e)

asyncio.run(main())

使用gather()函数时会返回迭代器,但是结果是按时间顺序输出的,可以单独去处理每个任务的异常,只是在处理异常的时候难以确定哪些任务是仍在执行的(不方便取消仍在运行的任务)。

wait与三种等待模式

wait()函数可以实现更加细致的控制,它通过'return_when'控制三种模式:

  • return_when=ALL_COMPLETED:默认模式,会等待所有任务完成,与gather()类似。
  • return_when=FIRST_COMPLETED:等待任意一个任务完成,与as_completed()类似,注意异常也算是完成。
  • return_when=FIRST_EXCEPTION:等待任意一个任务出现异常。

wait()函数会返回两个列表,第一个列表包含了已经完成的任务,第二个列表包含了未完成的任务。也可以通过time_out来设置超时,超时后不会取消任务。由于第二个未完成列表的存在,可以快速取消未完成的任务

同样可以设置超时,超时后会返回两个列表,并且不会取消任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import asyncio


async def func1(t, excpetion):
await asyncio.sleep(t)
if excpetion:
raise Exception("error")
return t


async def main():
# ALL_COMPLETED模式,等待所有任务完成
tasks = [func1(3, 0), func1(1, 0), func1(2, 1)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
for task in done:
print(task.result() if not task.exception() else task.exception())

# FIRST_COMPLETED模式,等待第一个任务完成
tasks = [func1(3, 0), func1(1, 0), func1(2, 1)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"done counts {len(done)}") # 1个任务完成
print(f"pending counts {len(pending)}") # 2个未完成
# 然后可以遍历done,或者继续等待pending中的任务完成

# FIRST_EXCEPTION模式,等待第一个任务出现异常或全部完成
tasks = [func1(3, 0), func1(1, 0), func1(2, 1)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
print(f"done counts {len(done)}") # 2个任务完成,包含一个异常
print(f"pending counts {len(pending)}") # 1个未完成

# 超时
tasks = [func1(3, 0), func1(1, 0), func1(2, 1)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED, timeout=2)
print(f"done counts {len(done)}") # 2个任务完成,包含一个异常
print(f"pending counts {len(pending)}") # 1个未完成

asyncio.run(main())

对于ALL_COMPLETEDFIRST_EXCEPTION模式,如果不出现异常,那么都会等待所有任务全部完成。使用FIRST_EXCEPTION模式可以在出现异常后快速取消其他任务。

对于FIRST_COMPLETED模式,出现异常或者任务完成都可以算作完成,有异常出现不会取消其他任务。可以通过while pending来观测每一步的完成情况。

1
2
3
4
5
pending = [tasks]
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
print(await task)

另外需要注意,如果传给wait()函数的是协程对象,而不是任务(也就是没有使用create_task包装),那么是不能比较的,因为pending中返回的是任务,而不是协程。

1
2
3
4
5
6
7
8
9
10
11
12
corortines = [...]
tasks = [asyncio.create_task(c) for c in corortines]

# 正确,当pending中第一个与tasks中第一个相等时,会执行print语句
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
if pending[0] == tasks[0]:
print("pending[0] == tasks[0]")

# 错误,这个if永远不会成立
done, pending = await asyncio.wait(corortines, return_when=asyncio.FIRST_COMPLETED)
if pending[0] == corortines[0]:
print("pending[0] == corortines[0]")

通过asyncio管理线程池和进程池

asyncio中的run_in_executor()函数可以接受一个事件循环,这个事件循环不仅仅是协程,也可以是线程池执行器或进程池执行器,从而可以像使用协程一样去管理线程池和进程池。

另外,run_in_executor()的默认参数是None,也就是默认使用ThreadPoolExecutor,如果想要使用ProcessPoolExecutor,需要显式传入。可以用于把同步函数转化成在另一个线程里执行的异步函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
from time import sleep


def func1(t, excpetion):
sleep(t)
if excpetion:
raise Exception("error")
return t


async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
tasks = [
loop.run_in_executor(pool, partial(func1, i, 0)) for i in range(1, 5)
]
results = await asyncio.gather(*tasks)
print(results)

asyncio.run(main())

当然,如果仅仅只是为了在线程里执行同步函数,那么使用asyncio.to_thread()会更加简单,且不需要使用partial传递参数了。

1
tasks = [asyncio.to_thread(func1, i, 0) for i in range(1, 5)]

另外,python的多线程由于GIL的存在,本质上还是单线程,但是由于一些c库,比如numpy之类的,他们在纯C代码中释放了GIL锁,所以当设计到这些库的时候可以在python里使用多线程进行CPU密集任务的并行与加速。关于多线程的锁的问题,参考之前的Python小技巧(10)--并发

而对于使用asyncio管理进程池的方式类似,只不过每个进程内还可以有属于自己的事件循环,也就是每个进程可以同时运行多个协程,这会增加一些复杂性。同时,多进程的共享数据自带锁,不需要像多线程那样需要手动设置一个锁。

单线程并发与asyncio同步

单线程事件循环也存在竞态,类似下面这种,主要是因为把数据读入临时变量后在写回前放权所导致的,可以使用asyncio模块中的各种锁来避免。

1
2
3
4
temp_data = data
temp_data += 1
await asyncio.sleep(1)
data = temp_data

协程的锁与多线程的锁类似,有锁、信号量、事件和条件。各个锁之间的使用也十分类似,其中锁是最基本的;信号量可以控制并发数目;事件通知所有等待该事件的协程,直到事件被clear,且事件被重复设置时只生效一次;条件变量可以设置为只唤醒一个协程或者等待特定的条件。

另外,锁不能创建在全局空间中,因为在创建锁的时候会自动创建出一个事件循环。这个特性会在更高版本中移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
from asyncio import sleep, Lock, Semaphore, Condition, Event


async def worker1(lock):
print('worker1 start')
async with lock:
print('worker1 got lock')
await sleep(1)


async def worker2(lock):
print('worker2 start')
async with lock:
print('worker2 got lock')
await sleep(1)


async def main():
lock = Lock()
tasks = [asyncio.create_task(worker1(lock)),
asyncio.create_task(worker2(lock))]
await asyncio.gather(*tasks)

asyncio.run(main())

异步队列

在生产者-消费者模型中,生产者与消费者的速度不匹配,希望有一个管道(队列),能满足以下要求:

  • 当生产者速度比消费者快时,如果队列满了,当生产者对队列执行push方法时,会自动阻塞,直到队列中数目小于最大数目
  • 当消费者速度比生产者快时,如果队列空了,当消费者对队列执行pop方法时,会自动阻塞,直到队列中数目大于0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
from asyncio import Queue


async def consumer(queue: Queue):
while True:
item = await queue.get()
await asyncio.sleep(2)
print(f'Consumer get {item}')


async def producer(queue: Queue):
for i in range(10):
await queue.put(i)
print(f'Producer put {i}')
await asyncio.sleep(1)
queue.task_done()


async def main():
queue = Queue(2)
consumer_task = asyncio.create_task(consumer(queue))
producer_task = asyncio.create_task(producer(queue))
await asyncio.gather(consumer_task, producer_task, queue.join())


asyncio.run(main())

队列可以使用join()方法等待完成,此时需要task_done()方法来通知队列已完成。同时,队列还有get_nowait()put_nowait()方法,当队列满了或者空了时,会抛出asyncio.QueueFull或者asyncio.QueueEmpty异常,这些方法与get()put()方法的区别在于,它们不会阻塞,而是直接抛出异常。

此外,asyncio模块还有优先队列PriorityQueue,它的get()方法会返回优先级最高的元素,而不是最先进入队列的元素。优先级通过put()方法的priority参数指定,优先级越高,越先被get()方法获取。和Queue类似,PriorityQueue也有get_nowait()put_nowait()方法。

除了优先队列外,还有LIFO队列LifoQueue,它的get()方法会返回最后一个进入队列的元素,而不是最先进入队列的元素。和Queue类似,LifoQueue也有get_nowait()put_nowait()方法。

这两种特殊的异步队列在使用上与正常的队列基本一致。

其他一些特性

异步上下文管理器

只要实现了__aenter____aexit__方法的对象就是异步上下文管理器,可以使用async with来使用。

aiohttp就是通过异步上下文管理器来实现的,它的ClientSession类实现了__aenter____aexit__方法,所以可以使用async with来使用。

而gevent库则是通过monkey patch来实现的,它会将标准库中的threadingsocket等模块替换成自己的模块,内置了event loop来实现非阻塞,可以把requests库直接变为异步的(requests基于urllib,而urllib又基于socket,gevent是实现了非阻塞的socket)

异步生成器

就是在一个async def里使用yield关键字。它类似于生成器,但是可以使用await关键字,而且返回的是一个异步迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio


async def generate():
for i in range(10):
await asyncio.sleep(1)
yield i


async def main():
async for i in generate():
print(i)


asyncio.run(main())

上下文变量

与线程的local变量类似,协程也有自己的local变量,也就是上下文变量。它可以在协程中共享,但是不能在不同的协程中共享。

1
2
3
from contextvars import ContextVar

...

协程调度

可以使用await asyncio.sleep(0)来让出CPU,让其他协程有机会执行。

sleep(0)有很多作用,本质都是因为大家都不希望cpu在空等,所以基本上所有语言的sleep函数都会进行调度。

在协程中的体现为让出CPU权限,去执行其他的协程。在如果是正常的线程中使用sleep函数,则有较高的几率触发垃圾回收。