并发----来源于cookbook第十二章
记录第十二章中比较有意思的部分
线程与线程池
可以通过python内置库来创建线程执行任务,为了高效,通常是以线程池的方式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from threading import Threadfrom time import sleepdef count (n ): while n > 0 : print (f"now count {n} " ) n -= 1 sleep(1 ) def main (): t = Thread(target=count, args=(10 ,)) t.start() print (f"start target, is alive? {t.is_alive()} " ) t.join() main() from concurrent.futures import ThreadPoolExecutorfrom time import sleepdef count (n, id ): while n > 0 : print (f"{id } now count {n} " ) n -= 1 sleep(1 ) return id def main (): pool = ThreadPoolExecutor(2 ) a = pool.submit(count, 10 , "a" ) b = pool.submit(count, 5 , "b" ) c = pool.submit(count, 4 , "c" ) d = pool.submit(count, 3 , "d" ) print (a.result(), b.result(), c.result(), d.result()) main()
在python的线程都是系统级线程 ,想要实现用户级线程需要用yield来实现。
此处的线程只能设置成守护线程或检测线程是否存活,除此之外的终止线程、向线程发信号、调整线程调度属性的操作都没有,需要自己去构建。
线程池还可以使用Queue来实现,Queue的实现是线程安全的 。
最后,对于线程而言,可以使用Thread.local()
来创建线程私有变量 ,主要用于线程需要使用某种系统资源时(如套接字、文件这类)
线程的事件、条件和信号
可以同样使用python的标准库来实现线程中的信号量、事件等功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 from threading import Thread, Eventfrom time import sleepdef count (n, start_env ): print ("thread start" ) sleep(2 ) start_env.set () while n > 0 : print (f"now count {n} " ) n -= 1 sleep(1 ) def main (): start_env = Event() t = Thread(target=count, args=(10 , start_env)) t.start() start_env.wait() print ("now set env" ) main() from threading import Thread, Conditionfrom time import sleepdef count (n, timer ): print ("thread start" ) while n > 0 : timer.wait_for_tick() print (f"now count {n} " ) n -= 1 sleep(1 ) class Timer : def __init__ (self, intervel ) -> None : self._intervel = intervel self._flag = 0 self._cv = Condition() def start (self ): t = Thread(target=self.run) t.daemon = True t.start() def run (self ): while True : sleep(self._intervel) with self._cv: self._flag ^= 1 self._cv.notify_all() def wait_for_tick (self ): with self._cv: last_flag = self._flag while last_flag == self._flag: self._cv.wait() def main (): timer = Timer(2 ) timer.start() Thread(target=count, args=(10 , timer)).start() main() from threading import Thread, Semaphorefrom time import sleepdef worker (id_, sema ): sema.acquire() print (f"{id_} working" ) def main (): sema = Semaphore(0 ) for i in range (10 ): t = Thread(target=worker, args=(i, sema)) t.start() for i in range (10 ): sleep(0.5 ) sema.release() main()
Event对象最好是一次性使用 ,虽然可以使用clear
来清除事件,但是容易造成死锁。
如果打算一遍一遍重复通知某个事件 ,最好使用Condition对象
Event可以唤醒所有等待线程,若只想从中唤醒一个线程 ,最好使用信号量或Condition对象
线程间通信
可以使用queue模块中的Queue对象进行线程间通信,也可以通过锁来手动实现一个多线程队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from queue import Queuefrom threading import Thread, Eventfrom time import sleepdef producer (out_q ): i = 1 while 1 : env = Event() out_q.put((i, env)) i += 1 env.wait() def consumer (in_q ): while 1 : i, env = in_q.get(timeout=1 ) print (f"get {i} " ) sleep(1 ) env.set () def main (): q = Queue(10 ) t1 = Thread(target=producer, args=(q,)) t2 = Thread(target=consumer, args=(q,)) t1.start() t2.start() main()
Queue对象可以设置容量,且put和get方法都支持超时机制和阻塞机制
如果用于多线程,则不要依赖Queue的len、empty这些方法
锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from threading import Lockclass Counter : def __init__ (self, init_val=0 ) -> None : self._value = init_val self._value_lock = Lock() def incr (self ): with self._value_lock: self._value += 1 def decr (self ): with self._value_lock: self._value -= 1
除了基本的Lock对象外,还有RLock可重入锁。