0%

Python小技巧(10)--并发

并发----来源于cookbook第十二章

记录第十二章中比较有意思的部分


线程与线程池

可以通过python内置库来创建线程执行任务,为了高效,通常是以线程池的方式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

#################### 线程 ####################
from threading import Thread
from time import sleep

def count(n):
while n > 0:
print(f"now count {n}")
n -= 1
sleep(1)

def main():
# 创建线程
t = Thread(target=count, args=(10,))
# 开启线程
t.start()
# 主线程不被阻塞
print(f"start target, is alive? {t.is_alive()}")
# 等待线程结束
t.join()

main()

#################### 线程池 ####################
from concurrent.futures import ThreadPoolExecutor
from time import sleep

def count(n, id):
while n > 0:
print(f"{id} now count {n}")
n -= 1
sleep(1)
return id

def main():
# 创建线程池
pool = ThreadPoolExecutor(2)
# 向线程池提交任务
a = pool.submit(count, 10, "a")
b = pool.submit(count, 5, "b")
c = pool.submit(count, 4, "c")
d = pool.submit(count, 3, "d")
# 获取任务的返回值
print(a.result(), b.result(), c.result(), d.result())

main()

在python的线程都是系统级线程,想要实现用户级线程需要用yield来实现。

此处的线程只能设置成守护线程或检测线程是否存活,除此之外的终止线程、向线程发信号、调整线程调度属性的操作都没有,需要自己去构建。

线程池还可以使用Queue来实现,Queue的实现是线程安全的

最后,对于线程而言,可以使用Thread.local()来创建线程私有变量,主要用于线程需要使用某种系统资源时(如套接字、文件这类)

线程的事件、条件和信号

可以同样使用python的标准库来实现线程中的信号量、事件等功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

#################### 事件 ####################
from threading import Thread, Event
from time import sleep

def count(n, start_env):
print("thread start")
sleep(2)
# 线程设置事件
start_env.set()
while n > 0:
print(f"now count {n}")
n -= 1
sleep(1)

def main():
start_env = Event()
t = Thread(target=count, args=(10, start_env))
t.start()

# 等待事件被设置,会阻塞
start_env.wait()
print("now set env")

main()

#################### 条件 ####################
from threading import Thread, Condition
from time import sleep

def count(n, timer):
print("thread start")
while n > 0:
# 重复等待信号到来
timer.wait_for_tick()
print(f"now count {n}")
n -= 1
sleep(1)

class Timer:
def __init__(self, intervel) -> None:
self._intervel = intervel
self._flag = 0
self._cv = Condition()

def start(self):
t = Thread(target=self.run)
t.daemon = True
t.start()

def run(self):
while True:
sleep(self._intervel)
# with的写法是锁的上下文管理协议
with self._cv:
self._flag ^= 1
# 会唤醒所有线程,notify_all本身不会释放锁
self._cv.notify_all()

def wait_for_tick(self):
# with的写法是锁的上下文管理协议
with self._cv:
last_flag = self._flag
while last_flag == self._flag:
# 调用wait会阻塞线程并释放锁
# 被唤醒后需要重新请求锁
self._cv.wait()

def main():
timer = Timer(2)
timer.start()
Thread(target=count, args=(10, timer)).start()

main()

#################### 信号量 ####################
from threading import Thread, Semaphore
from time import sleep

def worker(id_, sema):
# 等待信号量
sema.acquire()
print(f"{id_} working")

def main():
sema = Semaphore(0)
for i in range(10):
t = Thread(target=worker, args=(i, sema))
t.start()
for i in range(10):
sleep(0.5)
# 释放信号量
sema.release()

main()

Event对象最好是一次性使用,虽然可以使用clear来清除事件,但是容易造成死锁。

如果打算一遍一遍重复通知某个事件,最好使用Condition对象

Event可以唤醒所有等待线程,若只想从中唤醒一个线程,最好使用信号量或Condition对象

线程间通信

可以使用queue模块中的Queue对象进行线程间通信,也可以通过锁来手动实现一个多线程队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

from queue import Queue
from threading import Thread, Event
from time import sleep

def producer(out_q):
i = 1
while 1:
env = Event()
out_q.put((i, env))
i += 1
env.wait()

def consumer(in_q):
while 1:
i, env = in_q.get(timeout=1)
print(f"get {i}")
sleep(1)
env.set()

def main():
# 可以设置队列容量
q = Queue(10)
t1 = Thread(target=producer, args=(q,))
t2 = Thread(target=consumer, args=(q,))
t1.start()
t2.start()


main()

Queue对象可以设置容量,且put和get方法都支持超时机制和阻塞机制

如果用于多线程,则不要依赖Queue的len、empty这些方法

可以使用Lock对象来对临界区加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

from threading import Lock

class Counter:
def __init__(self, init_val=0) -> None:
self._value = init_val
self._value_lock = Lock()

def incr(self):
# 锁存在上下文管理协议,可以直接with使用
with self._value_lock:
self._value += 1

def decr(self):
with self._value_lock:
self._value -= 1

除了基本的Lock对象外,还有RLock可重入锁。