對python并發編程的思考

NealSorenso 6年前發布 | 33K 次閱讀 Python 并發 Python開發

為了提高系統密集型運算的效率,我們常常會使用到多個進程或者是多個線程,python中的 Threading 包實現了線程, multiprocessing 包則實現了多進程。而在3.2版本的python中,將進程與線程進一步封裝成 concurrent.futures 這個包,使用起來更加方便。我們以請求網絡服務為例,來實際測試一下加入多線程之后的效果。

首先來看看不使用多線程花費的時間:

import time
import requests

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'

# 獲取網絡請求結果
def fetch(a):
    r = requests.get(URL.format(a))
    return r.json()['args']['a']

# 開始時間
start = time.time()

for num in NUMBERS:
    result = fetch(num)
    print('fetch({}) = {}'.format(num, result))
# 計算花費的時間
print('cost time: {}'.format(time.time() - start))

執行結果如下:

fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
cost time: 6.952988862991333

再來看看加入多線程之后的效果:

import time
import requests
from concurrent.futures import ThreadPoolExecutor

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'

def fetch(a):
    r = requests.get(URL.format(a))
    return r.json()['args']['a']

start = time.time()
# 使用線程池(使用5個線程)
with ThreadPoolExecutor(max_workers=5) as executor:
  # 此處的map操作與原生的map函數功能一樣
    for num, result in zip(NUMBERS, executor.map(fetch, NUMBERS)):
        print('fetch({}) = {}'.format(num, result))
print('cost time: {}'.format(time.time() - start))

執行結果如下:

fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
cost time: 1.9467740058898926

只用了近2秒的時間,如果再多加幾個線程時間會更短,而不加入多線程需要接近7秒的時間。

不是說python中由于全局解釋鎖的存在,每次只能執行一個線程嗎,為什么上面使用多線程還快一些?

確實,由于python的解釋器(只有cpython解釋器中存在這個問題)本身不是線程安全的,所以存在著全局解釋鎖,也就是我們經常聽到的GIL,導致一次只能使用一個線程來執行Python的字節碼。但是對于上面的I/O操作來說,一個線程在等待網絡響應時,執行I/O操作的函數會釋放GIL,然后再運行一個線程。

所以,執行I/O密集型操作時,多線程是有用的,對于CPU密集型操作,則每次只能使用一個線程。那這樣說來,想執行CPU密集型操作怎么辦?

答案是使用多進程,使用concurrent.futures包中的 ProcessPoolExecutor 。這個模塊實現的是真正的并行計算,因為它使用ProcessPoolExecutor 類把工作分配給多個 Python 進程處理。因此,如果需要做 CPU密集型處理,使用這個模塊能繞開 GIL,利用所有可用的 CPU 核心。

說到這里,對于I/O密集型,可以使用多線程或者多進程來提高效率。我們上面的并發請求數只有5個,但是如果同時有1萬個并發操作,像淘寶這類的網站同時并發請求數可以達到千萬級以上,服務器每次為一個請求開一個線程,還要進行上下文切換,這樣的開銷會很大,服務器壓根承受不住。一個解決辦法是采用分布式,大公司有錢有力,能買很多的服務器,小公司呢。

我們知道系統開進程的個數是有限的,線程的出現就是為了解決這個問題,于是在進程之下又分出多個線程。所以有人就提出了能不能用 同一線程來同時處理若干連接 ,再往下分一級。于是 協程 就出現了。

協程在實現上試圖用一組少量的線程來實現多個任務,一旦某個任務阻塞,則可能用同一線程繼續運行其他任務,避免大量上下文的切換,而且,各個協程之間的切換,往往是用戶通過代碼來顯式指定的,不需要系統參與,可以很方便的實現異步。

協程本質上是異步非阻塞技術,它是將事件回調進行了包裝,讓程序員看不到里面的事件循環。說到這里,什么是異步非阻塞?同步異步,阻塞,非阻塞有什么區別?

借用知乎上的一個例子,假如你打電話問書店老板有沒有《分布式系統》這本書,如果是同步通信機制,書店老板會說,你稍等,”我查一下",然后開始查啊查,等查好了(可能是5秒,也可能是一天)告訴你結果(返回結果)。而異步通信機制,書店老板直接告訴你我查一下啊,查好了打電話給你,然后直接掛電話了(不返回結果)。然后查好了,他會主動打電話給你。在這里老板通過“回電”這種方式來回調。

而阻塞與非阻塞則是你打電話問書店老板有沒有《分布式系統》這本書,你如果是阻塞式調用,你會一直把自己“掛起”,直到得到這本書有沒有的結果,如果是非阻塞式調用,你不管老板有沒有告訴你,你自己先一邊去玩了, 當然你也要偶爾過幾分鐘check一下老板有沒有返回結果。在這里阻塞與非阻塞與是否同步異步無關。跟老板通過什么方式回答你結果無關。

總之一句話,阻塞和非阻塞,描述的是一種狀態,而同步與非同步描述的是行為方式。

回到協程上。

類似于 Threading 包是對線程的實現一樣,python3.4之后加入的 asyncio 包則是對協程的實現。我們用asyncio改寫文章開頭的代碼,看看使用協程之后能花費多少時間。

import asyncio
import aiohttp
import time

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
# 這里的代碼不理解沒關系
# 主要是為了證明協程的強大
async def fetch_async(a):
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.json()
    return data['args']['a']

start = time.time()
loop = asyncio.get_event_loop()
tasks = [fetch_async(num) for num in NUMBERS]
results = loop.run_until_complete(asyncio.gather(*tasks))

for num, results in zip(NUMBERS, results):
    print('fetch({}) = ()'.format(num, results))

print('cost time: {}'.format(time.time() - start))

執行結果:

fetch(0) = ()
fetch(1) = ()
fetch(2) = ()
fetch(3) = ()
fetch(4) = ()
fetch(5) = ()
fetch(6) = ()
fetch(7) = ()
fetch(8) = ()
fetch(9) = ()
fetch(10) = ()
fetch(11) = ()
cost time: 0.8582110404968262

不到一秒!感受到協程的威力了吧。

asyncio的知識說實在的有點難懂,因為它是用異步的方式在編寫代碼。上面給出的asyncio示例不理解也沒有關系,之后的文章會詳細的介紹一些asyncio相關的概念。

 

來自:https://segmentfault.com/a/1190000012794872

 

 本文由用戶 NealSorenso 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!