0%

将python中同步函数转换成多线程非阻塞函数

使用一个装饰器来将python中的同步函数转化成多线程函数

python由于GIL锁的原因,不支持真正的多线程,但是可以利用多线程来利用由于IO引起的阻塞问题,也算是曲线多线程了。

缘由

目前我正在写一个基于Requests的多线程图像下载库框架,可以查看这里,目前已经基本写完。爬虫可以异步来实现高并发以充分利用网络IO的等待,但是众所周知,Requests库是不支持异步的,但是可以通过多线程的方式来为其实现异步,不过使用线程的开销会稍微大一些。

实现方式

实现方式很简单,只需要一个装饰器即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

import asyncio
import requests


def run_async(callback):
def inner(func):
def wrapper(*args, **kwargs):
def __exec():
out = func(*args, **kwargs)
callback(out)
return out
return asyncio.get_event_loop().run_in_executor(None, __exec)
return wrapper
return inner


def _callback(*args):
print(args)

@run_async(_callback)
def get(url):
return requests.get(url)


get("https://baidu.com")
print("done!") # 注意这里是无阻塞的

只需要这样实现一个装饰器就能实现requests.get()函数的并发。

使用线程池来进行改进

但是在时间过程中,上述代码不能长时间运行,否则内存占用会不断飙升。

原因是每次都创建出一个开销比较高的线程,但是没有进行回收,所以不断驻留在内存中导致占用过高。

不过我们可以使用线程池来修改它,将其资源限制在一定范围内。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

from concurrent.futures import ThreadPoolExecutor

pool = ThreadPoolExecutor(16)

def run_async(callback):
def inner(func):
def wrapper(*args, **kwargs):
def __exec():
out = func(*args, **kwargs)
callback(out)
return out
return pool.submit(__exec)
return wrapper
return inner

这样每次都会从线程池拿一个线程出来进行处理。

使其能用于类方法

上述代码虽然解决了内存占用问题,但是还有一个问题没有解决。

比如我定义了一个Downloader类,里面有callback(self, *args)download(self, *args)函数。其中download(self, *args)函数负责下载任务,然后返回下载结果并交给callback(self, *args)处理,由于下载是并发的,所以装饰器会在download(self, *args)头上,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

class Donwloader:
def __init__(self,**kwargs):
...

@run_async(callback)
def download(self, *args):
# 下载一些东西
...

def callback(self, *args):
# 回调函数
...

但这样是不能正常工作的,因为self没有被正常传递

改成下面这样就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

from concurrent.futures import ThreadPoolExecutor
from anime_crawler.settings import MAX_CONCURRENT_REQUESTS

pool = ThreadPoolExecutor(MAX_CONCURRENT_REQUESTS)


def run_async_c(callback):
def inner(func):
def wrapper(*args, **kwargs):
def __exec():
out = func(*args, **kwargs)
callback(args[0], out) # args[0]是self
return out
return pool.submit(__exec)
return wrapper
return inner


class Donwloader:
def __init__(self,**kwargs):
...

@run_async_c(callback)
def download(self, *args):
# 下载一些东西
...

def callback(self, *args):
# 回调函数
...