2.2.加強(qiáng)篇
其實(shí)以前的 linux中是沒(méi)有線程這個(gè)概念的, windows程序員經(jīng)常使用線程,這一看~方便啊,然后可能是當(dāng)時(shí)程序員偷懶了,就把進(jìn)程模塊改了改(這就是為什么之前說(shuō)linux下的多進(jìn)程編程其實(shí)沒(méi)有win下那么“重量級(jí)”),弄了個(gè)精簡(jiǎn)版進(jìn)程==> 線程(內(nèi)核是分不出 進(jìn)程和線程的,反正 pcb個(gè)數(shù)都是一樣)
多線程和多進(jìn)程最大的不同在于,多進(jìn)程中,同一個(gè)變量,各自有一份拷貝存在于每個(gè)進(jìn)程中,互不影響,而多線程中,所有變量都由所有線程共享(全局變量和堆 ==> 線程間共享。進(jìn)程的棧 ==> 線程平分而獨(dú)占)
還記得通過(guò) current_thread()獲取的線程信息嗎?難道線程也沒(méi)個(gè)id啥的?一起看看:(通過(guò) ps-Lfpid來(lái)查看LWP)

回顧:進(jìn)程共享的內(nèi)容:(回顧:http://www.cnblogs.com/dotnetcrazy/p/9363810.html)
代碼(.text)文件描述符(fd)內(nèi)存映射(mmap)2.2.1.線程同步~互斥鎖Lock
線程之間共享數(shù)據(jù)的確方便,但是也容易出現(xiàn)數(shù)據(jù)混亂的現(xiàn)象,來(lái)看個(gè)例子:
立即學(xué)習(xí)“Python免費(fèi)學(xué)習(xí)筆記(深入)”;
代碼語(yǔ)言:JavaScript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
from multiprocessing.dummy import Threadingnum = 0 # def global numdef test(i): print(f"子進(jìn)程:{i}") global num for i in range(100000): num += 1def main(): p_list = [threading.Thread(target=test, args=(i, )) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(num) # 應(yīng)該是500000,發(fā)生了數(shù)據(jù)混亂,結(jié)果少了很多if __name__ == '__main__': main()
輸出:(應(yīng)該是 500000,發(fā)生了數(shù)據(jù)混亂,只剩下 358615)
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
子進(jìn)程:0子進(jìn)程:1子進(jìn)程:2子進(jìn)程:3子進(jìn)程:4452238
Lock案例
共享資源+CPU調(diào)度==>數(shù)據(jù)混亂==解決==>線程同步 這時(shí)候 Lock就該上場(chǎng)了
互斥鎖是實(shí)現(xiàn)線程同步最簡(jiǎn)單的一種方式,讀寫都加鎖(讀寫都會(huì)串行)
先看看上面例子怎么解決調(diào):
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
from multiprocessing.dummy import threading, Locknum = 0 # def global numdef test(i, lock): print(f"子進(jìn)程:{i}") global num for i in range(100000): with lock: num += 1def main(): lock = Lock() p_list = [threading.Thread(target=test, args=(i, lock)) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(num)if __name__ == '__main__': main()
輸出: time python31.thread.2.py
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
子進(jìn)程:0子進(jìn)程:1子進(jìn)程:2子進(jìn)程:3子進(jìn)程:4500000real 0m2.846suser 0m1.897ssys 0m3.159s
優(yōu)化下
lock設(shè)置為全局或者局部,性能幾乎一樣。循環(huán)換成map后性能有所提升(測(cè)試案例在Code中)
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
from multiprocessing.dummy import Pool as ThreadPool, Locknum = 0 # def global numlock = Lock()def test(i): print(f"子進(jìn)程:{i}") global num global lock for i in range(100000): with lock: num += 1def main(): p = ThreadPool() p.map_async(test, list(range(5))) p.close() p.join() print(num)if __name__ == '__main__': main()
輸出:
time Python31.thread.2.py
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
子進(jìn)程:0子進(jìn)程:1子進(jìn)程:3子進(jìn)程:2子進(jìn)程:4500000real 0m2.468suser 0m1.667ssys 0m2.644s
本來(lái)多線程訪問(wèn)共享資源的時(shí)候可以并行,加鎖后就部分串行了(沒(méi)獲取到的線程就阻塞等了)
【項(xiàng)目中可以多次加鎖,每次加鎖只對(duì)修改部分加(盡量少的代碼) 】(以后會(huì)說(shuō)協(xié)程和Actor模型)
補(bǔ)充:以前都是這么寫的,現(xiàn)在支持 with托管了(有時(shí)候還會(huì)用到,所以了解下):【net是直接 lock大括號(hào)包起來(lái)】
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
#### 以前寫法:lock.acquire() # 獲取鎖try: num += 1finally: lock.release() # 釋放鎖#### 等價(jià)簡(jiǎn)寫with lock: num += 1
擴(kuò)展知識(shí):(GIL在擴(kuò)展篇會(huì)詳說(shuō))
GIL的作用:多線程情況下必須存在資源的競(jìng)爭(zhēng),GIL是為了保證在解釋器級(jí)別的線程唯一使用共享資源(cpu)。同步鎖的作用:為了保證解釋器級(jí)別下的自己編寫的程序唯一使用共享資源產(chǎn)生了同步鎖
2.2.2.線程同步~遞歸鎖RLock
看個(gè)場(chǎng)景:小明欠小張2000,欠小周5000,現(xiàn)在需要同時(shí)轉(zhuǎn)賬給他們:(規(guī)定:幾次轉(zhuǎn)賬加幾次鎖)

小明啥也沒(méi)管,直接擼起袖子就寫Code了:(錯(cuò)誤Code示意)
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 8000xiaozhang = 3000xiaozhou = 5000def test(lock): global xiaoming global xiaozhang global xiaozhou # 小明想一次搞定: with lock: # 小明轉(zhuǎn)賬2000給小張 xiaoming -= 2000 xiaozhang += 2000 with lock: # 小明轉(zhuǎn)賬5000給小周 xiaoming -= 5000 xiaozhou += 5000def main(): print(f"[還錢前]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}") lock = Lock() p = ThreadPool() p.apply_async(test, args=(lock, )) p.close() p.join() print(f"[還錢后]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")if __name__ == '__main__': main()
小明寫完代碼就出去了,這可把小周和小張等急了,打了N個(gè)電話來(lái)催,小明心想啥情況?
一看代碼楞住了,改了改代碼,輕輕松松把錢轉(zhuǎn)出去了:
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 8000xiaozhang = 3000xiaozhou = 5000# 小明轉(zhuǎn)賬2000給小張def a_to_b(lock): global xiaoming global xiaozhang with lock: xiaoming -= 2000 xiaozhang += 2000# 小明轉(zhuǎn)賬5000給小周def a_to_c(lock): global xiaoming global xiaozhou with lock: xiaoming -= 5000 xiaozhou += 5000def main(): print(f"[還錢前]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}") lock = Lock() p = ThreadPool() p.apply_async(a_to_b, args=(lock, )) p.apply_async(a_to_c, args=(lock, )) p.close() p.join() print(f"[還錢后]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")if __name__ == '__main__': main()
輸出:
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
[還錢前]小明8000,小張3000,小周5000[還錢后]小明1000,小張5000,小周10000
就這么算了嗎?不不不,不符合小明性格,于是小明研究了下,發(fā)現(xiàn)~還有個(gè)遞歸鎖 RLock呢,正好解決他的問(wèn)題:
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
from multiprocessing.dummy import Pool as ThreadPool, RLock # 就把這邊換了下xiaoming = 8000xiaozhang = 3000xiaozhou = 5000def test(lock): global xiaoming global xiaozhang global xiaozhou # 小明想一次搞定: with lock: # 小明轉(zhuǎn)賬2000給小張 xiaoming -= 2000 xiaozhang += 2000 with lock: # 小明轉(zhuǎn)賬5000給小周 xiaoming -= 5000 xiaozhou += 5000def main(): print(f"[還錢前]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}") lock = RLock() # 就把這邊換了下 p = ThreadPool() p.apply_async(test, args=(lock, )) p.close() p.join() print(f"[還錢后]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")if __name__ == '__main__': main()
RLock內(nèi)部維護(hù)著一個(gè) Lock和一個(gè)counter變量, counter記錄了acquire的次數(shù),從而使得資源可以被多次 require。直到一個(gè)線程所有的 acquire都被release,其他的線程才能獲得資源
2.2.3.死鎖引入1.多次獲取導(dǎo)致死鎖
小明想到了之前說(shuō)的(互斥鎖 Lock讀寫都加鎖)就把代碼拆分研究了下:
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
print("[開始]小明轉(zhuǎn)賬2000給小張")lock.acquire() # 獲取鎖xiaoming -= 2000xiaozhang += 2000print("[開始]小明轉(zhuǎn)賬5000給小周")lock.acquire() # 獲取鎖(互斥鎖第二次加鎖)xiaoming -= 5000xiaozhou += 5000lock.release() # 釋放鎖print("[結(jié)束]小明轉(zhuǎn)賬5000給小周")lock.release() # 釋放鎖print("[開始]小明轉(zhuǎn)賬2000給小張")
輸出發(fā)現(xiàn):(第二次加鎖的時(shí)候,變成阻塞等了【死鎖】)
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
[還錢前]小明8000,小張3000,小周5000[開始]小明轉(zhuǎn)賬2000給小張[開始]小明轉(zhuǎn)賬5000給小周
這種方式,Python提供的RLock就可以解決了
2.常見的死鎖
看個(gè)場(chǎng)景:小明和小張需要流水帳,經(jīng)常互刷~ 小明給小張轉(zhuǎn)賬1000,小張給小明轉(zhuǎn)賬1000
一般來(lái)說(shuō),有幾個(gè)共享資源就加幾把鎖(小張、小明就是兩個(gè)共享資源,所以需要兩把 Lock)
先描述下然后再看代碼:
正常流程 小明給小張轉(zhuǎn)1000:小明自己先加個(gè)鎖==>小明-1000==>獲取小張的鎖==>小張+1000==>轉(zhuǎn)賬完畢
死鎖情況 小明給小張轉(zhuǎn)1000:小明自己先加個(gè)鎖==>小明-1000==>準(zhǔn)備獲取小張的鎖。可是這時(shí)候小張準(zhǔn)備轉(zhuǎn)賬給小明,已經(jīng)把自己的鎖獲取了,在等小明的鎖(兩個(gè)人相互等,于是就一直死鎖了)
代碼模擬一下過(guò)程:
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 5000xiaozhang = 8000m_lock = Lock() # 小明的鎖z_lock = Lock() # 小張的鎖# 小明轉(zhuǎn)賬1000給小張def a_to_b(): global xiaoming global xiaozhang global m_lock global z_lock with m_lock: xiaoming -= 1000 sleep(0.01) with z_lock: xiaozhang += 1000# 小張轉(zhuǎn)賬1000給小明def b_to_a(): global xiaoming global xiaozhang global m_lock global z_lock with z_lock: xiaozhang -= 1000 sleep(0.01) with m_lock: xiaoming += 1000def main(): print(f"[還錢前]小明{xiaoming},小張{xiaozhang}") p = ThreadPool() p.apply_async(a_to_b) p.apply_async(b_to_a) p.close() p.join() print(f"[還錢后]小明{xiaoming},小張{xiaozhang}")if __name__ == '__main__': main()
輸出:(卡在這邊了)
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
[轉(zhuǎn)賬前]小明5000,小張8000
項(xiàng)目中像這類的情況,一般都是這幾種解決方法:(還有其他解決方案,后面會(huì)繼續(xù)說(shuō))
按指定順序去訪問(wèn)共享資源在訪問(wèn)其他鎖的時(shí)候,先把自己鎖解了trylock的重試機(jī)制得不到全部鎖就先放棄已經(jīng)獲取的資源
比如上面的情況,我們?nèi)绻?guī)定,不管是誰(shuí)先轉(zhuǎn)賬,先從小明開始,然后再小張,那么就沒(méi)問(wèn)題了。或者誰(shuí)錢多就誰(shuí)(權(quán)重高的優(yōu)先)
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 5000xiaozhang = 8000m_lock = Lock() # 小明的鎖z_lock = Lock() # 小張的鎖# 小明轉(zhuǎn)賬1000給小張def a_to_b(): global xiaoming global xiaozhang global m_lock global z_lock # 以上次代碼為例,這邊只修改了這塊 with z_lock: # 小張權(quán)重高,大家都先獲取小張的鎖 xiaozhang += 1000 sleep(0.01) with m_lock: xiaoming -= 1000# 小張轉(zhuǎn)賬1000給小明def b_to_a(): global xiaoming global xiaozhang global m_lock global z_lock with z_lock: xiaozhang -= 1000 sleep(0.01) with m_lock: xiaoming += 1000def main(): print(f"[轉(zhuǎn)賬前]小明{xiaoming},小張{xiaozhang}") p = ThreadPool() p.apply_async(a_to_b) p.apply_async(b_to_a) p.close() p.join() print(f"[轉(zhuǎn)賬后]小明{xiaoming},小張{xiaozhang}")if __name__ == '__main__': main()
輸出:
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
[轉(zhuǎn)賬前]小明5000,小張8000[轉(zhuǎn)賬后]小明5000,小張8000
2.2.4.線程同步~條件變量Condition
條件變量一般都不是鎖,只能能阻塞線程,從而減少不必要的競(jìng)爭(zhēng),Python內(nèi)置了 RLock(不指定就是RLock)
看看源碼:
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
class Condition: """ 實(shí)現(xiàn)條件變量的類。????條件變量允許一個(gè)或多個(gè)線程等到另一個(gè)線程通知它們?yōu)橹????如果給出了lock參數(shù)而不是None,那必須是Lock或RLock對(duì)象作底層鎖。 否則,一個(gè)新的RLock對(duì)象被創(chuàng)建并用作底層鎖。 """ def __init__(self, lock=None): if lock is None: lock = RLock() self._lock = lock # 設(shè)置lock的acquire()和release()方法 self.acquire = lock.acquire self.release = lock.release
再看看可不可以進(jìn)行with托管:(支持)
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
def __enter__(self): return self._lock.__enter__()def __exit__(self, *args): return self._lock.__exit__(*args)
看個(gè)生產(chǎn)消費(fèi)者的簡(jiǎn)單例子:(生產(chǎn)完就通知消費(fèi)者)
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
from multiprocessing.dummy import Pool as ThreadPool, Conditions_list = []con = Condition()def Shop(i): global con global s_list # 加鎖保護(hù)共享資源 for x in range(5): with con: s_list.append(x) print(f"[生產(chǎn)者{i}]生產(chǎn)商品{x}") con.notify_all() # 通知消費(fèi)者有貨了def User(i): global con global s_list while True: with con: if s_list: print(f"列表商品:{s_list}") name = s_list.pop() # 消費(fèi)商品 print(f"[消費(fèi)者{i}]消費(fèi)商品{name}") print(f"列表剩余:{s_list}") else: con.wait()def main(): p = ThreadPool() # 兩個(gè)生產(chǎn)者 p.map_async(Shop, range(2)) # 五個(gè)消費(fèi)者 p.map_async(User, range(5)) p.close() p.join()if __name__ == '__main__': main()
輸出:(list之類的雖然可以不加global標(biāo)示,但是為了后期維護(hù)方便,建議加上)
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
[生產(chǎn)者0]生產(chǎn)商品0[生產(chǎn)者0]生產(chǎn)商品1列表商品:[0, 1][消費(fèi)者0]消費(fèi)商品1列表剩余:[0]列表商品:[0][消費(fèi)者0]消費(fèi)商品0列表剩余:[][生產(chǎn)者0]生產(chǎn)商品2列表商品:[2][消費(fèi)者1]消費(fèi)商品2列表剩余:[][生產(chǎn)者0]生產(chǎn)商品3[生產(chǎn)者1]生產(chǎn)商品0[生產(chǎn)者0]生產(chǎn)商品4列表商品:[3, 0, 4][消費(fèi)者1]消費(fèi)商品4列表剩余:[3, 0][生產(chǎn)者1]生產(chǎn)商品1[生產(chǎn)者1]生產(chǎn)商品2[生產(chǎn)者1]生產(chǎn)商品3[生產(chǎn)者1]生產(chǎn)商品4列表商品:[3, 0, 1, 2, 3, 4][消費(fèi)者2]消費(fèi)商品4列表剩余:[3, 0, 1, 2, 3]列表商品:[3, 0, 1, 2, 3][消費(fèi)者0]消費(fèi)商品3列表剩余:[3, 0, 1, 2]列表商品:[3, 0, 1, 2][消費(fèi)者1]消費(fèi)商品2列表剩余:[3, 0, 1]列表商品:[3, 0, 1][消費(fèi)者3]消費(fèi)商品1列表剩余:[3, 0]列表商品:[3, 0][消費(fèi)者3]消費(fèi)商品0列表剩余:[3]列表商品:[3][消費(fèi)者3]消費(fèi)商品3列表剩余:[]
通知方法:
notify() :發(fā)出資源可用的信號(hào),喚醒任意一條因 wait()阻塞的進(jìn)程notifyAll() :發(fā)出資源可用信號(hào),喚醒所有因wait()阻塞的進(jìn)程
2.2.5.線程同步~信號(hào)量Semaphore(互斥鎖的高級(jí)版)
記得當(dāng)時(shí)在分析 multiprocessing.Queue源碼的時(shí)候,有提到過(guò)(點(diǎn)我回顧)
同進(jìn)程的一樣, semaphore管理一個(gè)內(nèi)置的計(jì)數(shù)器,每當(dāng)調(diào)用 acquire()時(shí)內(nèi)置函數(shù) -1,每當(dāng)調(diào)用 release()時(shí)內(nèi)置函數(shù) +1
通俗講就是:在互斥鎖的基礎(chǔ)上封裝了下,實(shí)現(xiàn)一定程度的并行
舉個(gè)例子,以前使用互斥鎖的時(shí)候:(廁所就一個(gè)坑位,必須等里面的人出來(lái)才能讓另一個(gè)人上廁所)

使用信號(hào)量之后:廁所坑位增加到5個(gè)(自己指定),這樣可以5個(gè)人一起上廁所了==>實(shí)現(xiàn)了一定程度的并發(fā)
舉個(gè)例子:(Python在語(yǔ)法這點(diǎn)特別爽,不用你記太多異同,功能差不多基本上代碼也就差不多)
代碼語(yǔ)言:javascript代碼運(yùn)行次數(shù):0運(yùn)行復(fù)制
from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Semaphoresem = Semaphore(5) # 限制最大連接數(shù)為5def goto_wc(i): global sem with sem: print(f"[線程{i}]上廁所") sleep(0.1)def main(): p = ThreadPool() p.map_async(goto_wc, range(50)) p.close() p.join()if __name__ == '__main__': main()
輸出:

可能看了上節(jié)回顧的會(huì)疑惑:源碼里面明明是 BoundedSemaphore,搞啥呢?
其實(shí) BoundedSemaphore就比 Semaphore多了個(gè)在調(diào)用 release()時(shí)檢查計(jì)數(shù)器的值是否超過(guò)了計(jì)數(shù)器的初始值,如果超過(guò)了將拋出一個(gè)異常