Python并發編程之進程

bpqhxxybpr 7年前發布 | 30K 次閱讀 Python 并發 Python開發

一、理論概念

1、定義

進程(Process 也可以稱為重量級進程)是程序的一次執行。在每個進程中都有自己的地址空間、內存、數據棧以及記錄運行的輔助數據,它是系統進行資源分配和調度的一個獨立單位。

2、并行和并發

并行:并行是指多個任務同一時間執行;

并發:是指在資源有限的情況下,兩個任務相互交替著使用資源;

Python并發編程之進程

3、同步和異常

同步是指多個任務在執行時有一個先后的順序,必須是一個任務執行完成另外一個任務才能執行;

異步是指多個任務在執行時沒有先后順序,多個任務可以同時執行;

4、同步/異步/阻塞/非阻塞/

同步阻塞:這個阻塞的形成效率是最低的;比如你在下載一個東西是,你一直盯著下載進度條,到達100%時下載完成;

同步體現在:你等待下載完成通知;

阻塞體現在:等待下載的過程中,不能做別的事情

同步非阻塞:你在下載東西時,你把任務提交后就去干別的事情了,只是每過一段時間就看一下是不是下載完成;

同步體現在:等待下載完成通知;

非阻塞提現在:等待下載完成通知過程中,去干別的事情了,只是時不時會瞄一眼進度條;

異步阻塞:你在下載東西時換了一個現在使用的客戶端比如迅雷,下載完成后會有一個提示音,但是這時候你仍然一直在等待那個完成后的提示音;

異步體現在:下載完成時有提示音;

阻塞體現在:等待下載完成提示音時,不做任何事情;

異步非阻塞:你然然使用的是迅雷下載軟件,這時候你把下載任務提交后就去干別的事情去了,當你聽到‘叮’以后就知道下載完成;

異步體現在:下載完成叮一聲完成通知

非阻塞體現在:等待下載完成“叮”一聲通知過程中,去干別的任務了,只需要接收“叮”聲通知即可;

二、進程的創建與結束

multiprocessing模塊:multiprocess不是一個模塊而是python中一個操作、管理進程的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。由于提供的子模塊非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分:創建進程部分,進程同步部分,進程池部分,進程之間數據共享。

Process模塊的各種方法介紹

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)

強調:

  1. 需要使用關鍵字的方式來指定參數
  2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹: group參數未使用,值始終為None target表示調用對象,即子進程要執行的任務 args表示調用對象的位置參數元組,args=(1,2,'egon',) kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} name為子進程的名稱 p.start():啟動進程,并調用該子進程中的p.run() p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法
p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵尸進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖 p.is_alive():如果p仍然運行,返回True p.join([timeout]):主線程等待p終止(強調:是主線程處于等的狀態,而p是處于運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程 </code></pre>

在windows中使用process注意事項:

在Windows操作系統中由于沒有fork(linux操作系統中創建進程的機制),在創建子進程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創建子進程報錯。所以必須把創建子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。

process模塊創建進程:

#!/usr/bin/env python

-- coding:utf-8 --

Author: caoyf

import time from multiprocessing import Process def func(name): print('hello %s'%name) print('我是子進程')

if name == 'main': p = Process(target=func,args=('caoyf',)) #在實例化時候,args的參數必須是一個元祖形式(注冊一個子進程) p.start() #啟動一個子進程 time.sleep(3) print('執行主進程內容了')

創建第一個進程</code></pre>

多個進程同時運行,子進程的執行順序不是根據啟動的順序來決定的;

#!/usr/bin/env python

-- coding:utf-8 --

Author: caoyf

import time from multiprocessing import Process def func(name): print('hello %s'%name) time.sleep(2)

if name == 'main': p_lst = [] for i in range(10): p = Process(target=func, args=('caoyf',)) p.start() p_lst.append(p) for p in p_lst: p.join() # 是感知一個子進程的結束,將異步的程序改為同步 print('父進程在運行')

多個進程同時運行</code></pre>

另一種開啟進程的方法,繼承process的形式

#!/usr/bin/env python

-- coding:utf-8 --

Author: caoyf

import time import os from multiprocessing import Process class Func(Process): def init(self,name): super().init() self.name = name def run(self): print(os.getpid()) print('%s正在和小明聊天'%self.name)

if name == 'main': p1 = Func('caoyf') p2 = Func('Zhao') p1.start() p2.start() p1.join() p2.join()

繼承的方式開啟進程</code></pre>

守護進程:會隨著主進程的結束而結束,進程之間是相互獨立的,主進程的代碼運行結束,守護進程也會隨即結束

#!/usr/bin/env python

-- coding:utf-8 --

Author: caoyf

import time import os from multiprocessing import Process def foo(): print('start123') time.sleep(2) print('end123')

def func(): print('start456') time.sleep(5) print('end456') if name == 'main': p1 = Process(target=foo) p2 = Process(target=func) p1.daemon = True p1.start() p2.start() time.sleep(0.1) print('main------------')#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,因為主進程打印main---

# -時,p1也執行了,但是隨即被終止.

守護進程</code></pre>

三、進程同步(multiprocessing.Lock\Spemaphore\Event)

鎖(Lock):

資源是有限的,多個進程如果對同一個對象進行操作,則有可能造成資源的爭用,甚至導致死鎖,在并發進程中就可以利用鎖進行操作來避免訪問的沖突;

加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,但是速度就變慢了,但犧牲了速度卻保證了數據安全。

雖然可以用文件共享數據實現進程間通信,但問題是:

1.效率低(共享數據基于文件,而文件是硬盤上的數據)

                        2.需要自己加鎖處理

我們可以模擬一個火車搶票的過程,當過個客戶同時對一個程序發起訪問時,假設此時有5張票,有10個人搶

from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩余票數%s\033[0m' %dic['count'])

def get(): dic=json.load(open('db')) time.sleep(random.random()) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(random.random()) #模擬寫數據的網絡延遲 json.dump(dic,open('db','w')) print('\033[32m購票成功\033[0m') else: print('\033[31m購票失敗\033[0m')

def task(lock): search() lock.acquire() get() lock.release()

if name == 'main': lock = Lock() for i in range(100): #模擬并發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start()

搶火車票</code></pre>

信號量:

信號量Semaphore是同時允許一定數量的線程更改數據 。

#!/usr/bin/env python

-- coding:utf-8 --

Author: caoyf

import time import random from multiprocessing import Semaphore from multiprocessing import Process def f(i,a): a.acquire() print('%s走進了房間'%i) time.sleep(random.randint(1,5)) print('%s走出了房間'%i) a.release() if name == 'main': a = Semaphore(5) for i in range(10): p = Process(target=f,args=(i,a)) p.start()

信號量</code></pre>

事件:

用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。

                                                clear:將“Flag”設置為False                                                 set:將“Flag”設置為True</code></pre>

#!/usr/bin/env python

-- coding:utf-8 --

Author: caoyf

from multiprocessing import Event,Process import random import time def cars(a,i): if not a.is_set(): print('car%s在等待'%i) a.wait() print('\033[31mcar%s通過\033[0m' % i) def f(a): while True: if a.is_set(): a.clear() print('\033[31m紅燈亮了\033[0m')

    else:
        a.set()
        print('\033[32m綠燈亮了\033[0m')
    time.sleep(2)

if name == 'main': a = Event() p = Process(target=f,args=(a,)) p.start() for i in range(20): car = Process(target=cars,args=(a,i)) car.start() time.sleep(random.random())

事件/紅綠燈實例</code></pre>

四、進程間通信---隊列和管道

隊列Queue:適用于多線程編程的先進先出數據結構,可以用來安全的傳遞多線程信息。

通過隊列實現了 主進程與子進程的通信 子進程與子進程之間的通信

q=Queue(10)     #實例化一個對象,允許隊列對多10個元素
q.put()         #放入隊列
q.get()         #從隊列中取出

假設現在有一個隊伍,隊伍里最多只能站5個人,但是有15個人想要進去

#!/usr/bin/env python

-- coding:utf-8 --

Author: caoyf

from multiprocessing import Process from multiprocessing import Queue def getin(q): #進入隊伍的子進程 for i in range(15): q.put(i)

    # print(q)

def getout(q): #離開隊伍的子進程 for i in range(6): print(q.get()) if name=='main': q=Queue(5) #隊伍內最多可以容納的人數 p=Process(target=getin,args=(q,)) #進入隊伍的進程 p.start() p2=Process(target=getout,args=(q,)) #離開隊伍的進程 p2.start()

隊列實例</code></pre>

管道(Pipes)

#!/usr/bin/env python

-- coding:utf-8 --

Author: caoyf

from multiprocessing import Process,Pipe,Manager,Lock import time import random

管道 進程之間創建的一條管道,默認是全雙工模式,兩頭都可以進和出,

注意 必須在產生Process對象之前產生管道

如果在Pipe括號里面填寫False后就變成了單雙工,

左邊的只能收,右邊的只能發,recv(接收),send(發送)

如果沒有消息可以接收,recv會一直阻塞,如果連接的另外一段關閉后,

recv會拋出EOFError錯誤

close 關閉連接

下面的實例是在Pipe的括號里填寫和不填寫False的區別

from multiprocessing import Process,Pipe

def func(pro):

pro.send('hello')

pro.close()

#

if name=='main':

con,pro = Pipe(False)

p = Process(target=func,args=(pro,))

p.start()

print(con.recv())

p.join()

模擬recv阻塞情況

def func(con,pro):

con.close()

while 1:

try:

print(pro.recv())

except EOFError:

pro.close()

break

# #

if name=='main':

con,pro = Pipe()

p = Process(target=func,args=(con,pro,))

p.start()

pro.close()

con.send('aaaaa')

con.close()

p.join()

利用管道實現生產者和消費者

def sc(con,pro,name,food):

con.close()

for i in range(5):

time.sleep(random.random())

f = '%s生產了%s%s'%(name,food,i)

print(f)

pro.send(f)

def xf(con,pro,name):

pro.close()

while 1:

try:

baozi = con.recv()

print('%s消費了%s'%(name,baozi))

except EOFError:

break

if name=='main':

con,pro = Pipe()

p1 = Process(target=sc,args=(con,pro,'caoyf','包子'))

c1 = Process(target=xf,args=(con,pro,'zhoaf'))

p1.start()

c1.start()

con.close()

pro.close()

p1.join()

管道</code></pre>

數據共享:

隊列和管道只是實現了數據的傳遞,還沒有實現數據的共享,如實現數據共享,就要用到Managers( 注:進程間通信應該盡量避免使用共享數據的方式

from multiprocessing import Process,Manager
import os

def f(dict1,list1): dict1[os.getpid()] = os.getpid() # 往字典里放當前PID list1.append(os.getpid()) # 往列表里放當前PID print(list1)

if name == "main": with Manager() as manager: d = manager.dict() #生成一個字典,可在多個進程間共享和傳遞 l = manager.list(range(5)) #生成一個列表,可在多個進程間共享和傳遞 p_list = []
for i in range(10): p = Process(target=f,args=(d,l)) p.start() p_list.append(p) # 存進程列表
for res in p_list:
res.join() print('\n%s' %d) #若要保證數據安全,需要加鎖lock=Lock()   </code></pre>

進程池

對于需要使用幾個甚至十幾個進程時,我們使用Process還是比較方便的,但是如果要成百上千個進程,用Process顯然太笨了,multiprocessing提供了Pool類,即現在要講的進程池,能夠將眾多進程放在一起,設置一個運行進程上限,每次只運行設置的進程數,等有進程結束,再添加新的進程

  • Pool(processes =num):設置運行進程數,當一個進程運行完,會添加新的進程進去
  • apply_async:異步,串行
  • apply:同步,并行
  • close():關閉pool,不能再添加新的任務
import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
    i += 1

if name == 'main': p = Pool(5) # 創建了5個進程 start = time.time() p.map(func,range(1000))
p.close() # 是不允許再向進程池中添加任務 p.join() #阻塞等待 執行進程池中的所有任務直到執行結束 print(time.time() - start) start = time.time() l = [] for i in range(1000): p = Process(target=func,args=(i,)) # 創建了一百個進程 p.start() l.append(p) [i.join() for i in l] print(time.time() - start)

回調函數:

import os import time from multiprocessing import Pool

參數 概念 回調函數

def func(i): # 多進程中的io多,分出去一部分 print('子進程%s:%s'%(i,os.getpid())) return i''

def call(arg): # 回調函數是在主進程中完成的,不能傳參數,只能接受多進程中函數的返回值 print('回調 :',os.getpid()) print(arg)

if name == 'main': print('主進程',os.getpid()) p = Pool(5) for i in range(10): p.apply_async(func,args=(i,),callback=call) #callback 回調函數 :主進程執行 參數是子進程執行的函數的返回值 p.close() p.join()

</code></pre>

來自:http://www.cnblogs.com/caoyf1992/p/8687352.html?utm_source=tuicool&utm_medium=referral

 本文由用戶 bpqhxxybpr 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!