0%

将python中的同步函数转换成协程异步函数(gevent)

使用gevent库来将python中的同步函数转化成多线程函数

将requests库转换成异步

之前使用了多线程的方式将requests库转换成异步等待,但是由于线程开销较高,且本质上还是在空等,所以使用协程来实现异步等待会更好。

gevent库

gevent库是一个网络库,它比asyncio标准库还要快一点。而且可以将同步函数直接转换成协程函数(其内置了loop)。

猴子补丁

它主要使用的方式就是猴子补丁,原理大致如下:

1
2
3
4
5

import liba

import libb as liba

这样liba就被替换成了libb了,只要libb在接口上保证和liba的接口一致,那么就可以直接替换。

猴子补丁就是实现上述过程的东西。下面是猴子补丁使用

1
2
3
4
5
6
7
8

import socket
print(socket.socket) # <class 'socket.socket'>

from gevent import monkey
monkey.patch_socket() # monkey.patch_all()是把所有库打上补丁
print(socket.socket) # <class 'gevent._socket3.socket'>

就是gevent底层是libev,它从其之上再造了一个socket模块,具有跟标准库socket兼容的接口,而gevent中的socket是运行在event loop中的(标准库的socket不带)。

这样打完补丁后,socket库就是协程实现的了。

而requests基于urllib3, urllib3基于socket,补丁是对socket的封装,所以requests库此时也成为协程异步的了。

API

下面有一些gevent常用的API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 创建一个普通的Greenlet对象并启动
gevent.spawn()
# 延时创建一个普通的Greenlet对象并启动
gevent.spawn_later(seconds=3)
# 创建的协程对象属于一个组
gevent.spawn_raw()
# 返回当前正在执行的greenlet
gevent.getcurrent()
# 将协程任务添加到事件循环,接收一个任务列表
gevent.joinall(jobs)
# 可以替代join函数等待循环结束,也可以传入协程对象列表
gevent.wait()
# 杀死一个协程
gevent.kill()
# 杀死一个协程列表里的所有协程
gevent.killall()
# 会自动将python的一些标准模块替换成gevent框架
monkey.patch_all()

使用gevent.spawn()创建了一个协程会自动执行,如果想要获取该协程的结果最好先gevent.join()

另外,gevent本身也是基于greenlet的,所以也可以使用greenlet对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# Greenlet对象
from gevent import Greenlet

# Greenlet对象创建
job = Greenlet(target0, 3)
Greenlet.spawn() # 创建一个协程并启动
Greenlet.spawn_later(seconds=3) # 延时启动

# 协程启动
job.start() # 将协程加入循环并启动协程
job.start_later(3) # 延时启动

# 等待任务完成
job.join() # 等待任务完成
job.get() # 获取协程返回的值

# 任务中断和判断任务状态
job.dead() # 判断协程是否死亡
job.kill() # 杀死正在运行的协程并唤醒其他的协程,这个协程将不会再执行,可以
job.ready() # 任务完成返回一个真值
job.successful() # 任务成功完成返回真值,否则抛出错误

# 获取属性
job.loop # 时间循环对象
job.value # 获取返回的值

# 捕捉异常
job.exception # 如果运行有错误,获取它
job.exc_info # 错误的详细信息

# 设置回调函数
job.rawlink(back) # 普通回调,将job对象作为回调函数的参数
job.unlink() # 删除回调函数
# 执行成功的回调函数
job.link_value(back)
# 执行失败的回调函数
job.link_exception(back)

当然,与线程类似,协程也存在协程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 等待直到有一个协程有结果
pool.wait_available()
# 向进程池添加一个方法并跟踪,非阻塞
pool.dd(greenlet)
# 停止跟踪某个协程
pool.discard(greenlet)
# 加入并启动协程
pool.start(greenlet)
# 阻塞等待结束
pool.join()
# 杀死所有跟踪的协程
pool.kill()
# 杀死一个协程
pool.killone(greenlet)

协程池启动一个协程时,也是非阻塞的。

异步requests

使用requests库时,先创建协程对象,然后使用gevent.joinall()来等待所有协程对象完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

from gevent import monkey
monkey.patch_all()
import requests
import gevent
import time


def f(url,i):
data = requests.get(url)
print(f"{i} done")

start = time.time()
url_list = []
for i in range(5):
# 创建协程对象
url_list.append(gevent.spawn(f, 'https://www.baidu.com/',i))

# 直到所有协程对象完成为止
gevent.joinall(url_list)
print(time.time() - start)

使用装饰器来创建协程函数

当然使用一个装饰器来隐式的完成创建协程对象会更方便

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

from gevent import monkey, pool, Greenlet
monkey.patch_all()
import time
import requests

p = pool.Pool(5)

# 使用协程池来运行协程的装饰器
def async_d(func):
def warp_(*args,**kargs):
p.start(Greenlet(func, *args, **kargs))
return warp_

@async_d
def f(url,i):
data = requests.get(url)
print(f"{i} done")

start = time.time()
for i in range(5):
f('https://www.baidu.com/',i)
p.join()
print(time.time() - start)