使用一个装饰器来将python中的同步函数转化成多线程函数
python由于GIL锁的原因,不支持真正的多线程,但是可以利用多线程来利用由于IO引起的阻塞问题,也算是曲线多线程了。
缘由
目前我正在写一个基于Requests
的多线程图像下载库框架,可以查看这里,目前已经基本写完。爬虫可以异步来实现高并发以充分利用网络IO的等待,但是众所周知,Requests
库是不支持异步的,但是可以通过多线程的方式来为其实现异步,不过使用线程的开销会稍微大一些。
实现方式
实现方式很简单,只需要一个装饰器即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import asyncio import requests
def run_async(callback): def inner(func): def wrapper(*args, **kwargs): def __exec(): out = func(*args, **kwargs) callback(out) return out return asyncio.get_event_loop().run_in_executor(None, __exec) return wrapper return inner
def _callback(*args): print(args)
@run_async(_callback) def get(url): return requests.get(url)
get("https://baidu.com") print("done!")
|
只需要这样实现一个装饰器就能实现requests.get()
函数的并发。
使用线程池来进行改进
但是在时间过程中,上述代码不能长时间运行,否则内存占用会不断飙升。
原因是每次都创建出一个开销比较高的线程,但是没有进行回收,所以不断驻留在内存中导致占用过高。
不过我们可以使用线程池来修改它,将其资源限制在一定范围内。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(16)
def run_async(callback): def inner(func): def wrapper(*args, **kwargs): def __exec(): out = func(*args, **kwargs) callback(out) return out return pool.submit(__exec) return wrapper return inner
|
这样每次都会从线程池拿一个线程出来进行处理。
使其能用于类方法
上述代码虽然解决了内存占用问题,但是还有一个问题没有解决。
比如我定义了一个Downloader
类,里面有callback(self, *args)
和download(self, *args)
函数。其中download(self, *args)
函数负责下载任务,然后返回下载结果并交给callback(self, *args)
处理,由于下载是并发的,所以装饰器会在download(self, *args)
头上,如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class Donwloader: def __init__(self,**kwargs): ...
@run_async(callback) def download(self, *args): ...
def callback(self, *args): ...
|
但这样是不能正常工作的,因为self没有被正常传递
改成下面这样就可以了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| from concurrent.futures import ThreadPoolExecutor from anime_crawler.settings import MAX_CONCURRENT_REQUESTS
pool = ThreadPoolExecutor(MAX_CONCURRENT_REQUESTS)
def run_async_c(callback): def inner(func): def wrapper(*args, **kwargs): def __exec(): out = func(*args, **kwargs) callback(args[0], out) return out return pool.submit(__exec) return wrapper return inner
class Donwloader: def __init__(self,**kwargs): ...
@run_async_c(callback) def download(self, *args): ...
def callback(self, *args): ...
|