利用 Python yield 創建協程將異步編程同步化

jopen 9年前發布 | 59K 次閱讀 Python Python開發

在 Lua 和 Python 等腳本語言中,經常提到一個概念: 協程。也經常會有同學對協程的概念及其作用比較疑惑,本文今天就來探討下協程的前世今生。

0、首先回答兩個大家最關心的問題:

0.1 什么是協程?

本質上協程就是用戶空間下的線程。

0.2 協程的好處是什么?

通俗易懂的回答:

  • 讓原來要使用 異步 + 回調 方式寫的非人類代碼,可以用看似同步的方式寫出來。

    </li> </ul>

    • 無需線程上下文切換、原子操作鎖定及同步的開銷。

      </li> </ul>

      1、回顧同步與異步編程

      同步編程即線性化編程,代碼按照既定順序執行,上一條語句執行完才會執行下一條,否則就一直等在那里。

      但是許多實際操作都是CPU 密集型任務和 IO 密集型任務,比如網絡請求,此時不能讓這些任務阻塞主線程的工作,于是就會采用異步編程。

      利用 Python yield 創建協程將異步編程同步化

      異步的標準元素就是回調函數(Callback, 后來衍生出Promise/Deferred概念),主線程發起一個異步任務,讓其自己到一邊去工作,當其完成后,會通過執行預先指定的回調函數完成后續任務,然后返回主線程。在異步任務執行過程中,主線程無需等待和阻塞,可以繼續處理其他任務。

      下例大家并不陌生,是jQuery標準發送http異步請求的方式。

      $.ajax({
          url:"/echo/json/",
          success: function(response)
          {
             console.info(response.name);
          }
      });

      而并發的核心思想在于,大的任務可以分解成一系列的子任務,后者可以被調度成 同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換

      2、回顧多線程編程

      當主線程發起異步任務,這個任務跑到哪里去工作了呢?這就說到多線程(包括多進程)編程,一個主線程可以主動創建多個子線程,然后將任務交給子線程,每個子線程擁有自己的堆棧空間。操作系統可以通過分時的方式讓同一個CPU輪流調度各個線程,編程人員無需關心操作系統是如何工作的。

      但是如果需要在多個線程之間通信,則需要編程人員自己寫代碼來控制線程之間的協作(利用鎖或信號量)以及通信(利用管道、隊列等)

      2.1 經典的Producer-Consumer問題

      利用 Python yield 創建協程將異步編程同步化

      這個問題說的是有兩方進行通信和協作,一方只負責生產內容,另一方只負責消費內容。消費者并不知道,也無需知道生產者何時生產,只是當有內容生產出來負責消費即可,沒有內容時就等待。這是一個經典的異步問題。

      2.1.1 Threading/Queue方案

      傳統的解決方案即是采用多線程來實現,生產者和消費者分別處于不同的線程或進程中,由操作系統進行調度。來看一篇經典的多線程教程中的例子,是不是很像Java風格?——啰嗦。

      import threading
      import time
      import logging
      import random
      import Queue

      logging.basicConfig(level=logging.DEBUG,                     format='(%(threadName)-9s) %(message)s',)

      BUF_SIZE = 10 q = Queue.Queue(BUF_SIZE)

      class ProducerThread(threading.Thread):     def init(self, group=None, target=None, name=None,                  args=(), kwargs=None, verbose=None):         super(ProducerThread,self).init()         self.target = target         self.name = name

          def run(self):         while True:             if not q.full():                 item = random.randint(1,10)                 q.put(item)                 logging.debug('Putting ' + str(item)                                 + ' : ' + str(q.qsize()) + ' items in queue')                 time.sleep(random.random())         return

      class ConsumerThread(threading.Thread):     def init(self, group=None, target=None, name=None,                  args=(), kwargs=None, verbose=None):         super(ConsumerThread,self).init()         self.target = target         self.name = name         return

          def run(self):         while True:             if not q.empty():                 item = q.get()                 logging.debug('Getting ' + str(item)                                + ' : ' + str(q.qsize()) + ' items in queue')                 time.sleep(random.random())         return

      if name == 'main':          p = ProducerThread(name='producer')     c = ConsumerThread(name='consumer')

          p.start()     time.sleep(2)     c.start()     time.sleep(2)</pre>

      2.1.2 MessageQueue方案

      基于多線程方案,這個問題已經演變成消息中介模式(有些公司喜歡稱之為”郵局”),有各種的商業MQ方案可以直接使用。

      這里以RabbitMQ開源方案為例,Producer一方向名為隊列中發送”Hello World!”內容,而Consumer一方則監聽隊列,當有內容進入隊列時,就執行callback函數來收取并處理內容。發送與收取的動作是異步執行的,互不干擾。

      ###### Producer ########

      import pika connection = pika.BlockingConnection(pika.ConnectionParameters(         host='localhost')) channel = connection.channel()

      channel.queue_declare(queue='hello')

      channel.basic_publish(exchange='',                       routing_key='hello',                       body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()

      # Consumer 

      import pika connection = pika.BlockingConnection(pika.ConnectionParameters(         host='localhost')) channel = connection.channel()

      channel.queue_declare(queue='hello')

      print ' [*] Waiting for messages. To exit press CTRL+C'

      def callback(ch, method, properties, body):     print " [x] Received %r" % (body,)

      channel.basic_consume(callback,                       queue='hello',                       no_ack=True)

      channel.start_consuming()</pre>

      3、yield與協程

      3.1 何為協程(Coroutine)及yield

      python采用了GIL(Global Interpretor Lock,全局解釋器鎖),默認所有任務都是在同一進程中執行的。(當然,可以借助多進程多線程來實現并行化。)我們調用一個普通的Python函數時,一般是從函數的第一行代碼開始執行,結束于return語句、異常或者函數結束(可以看作隱式的返回None)。一旦函數將控制權交還給調用者,就意味著全部結束。函數中做的所有工作以及保存在局部變量中的數據都將丟失。再次調用這個函數時,一切都將從頭創建。

      實現一個用戶態線程有兩個必須要處理的問題:一是碰著阻塞式 I/O 會導致整個進程被掛起;二是由于缺乏時鐘阻塞,進程需要自己擁有調度線程的能力。如果一種實現使得每個線程需要自己通過調用某個方法,主動交出控制權。那么我們就稱這種用戶態線程是協作式的,即是協程。

      本質上協程就是用戶空間下的線程。

      在 Python 中,所謂協程(Coroutine)就是在同一進程/線程中,利用生成器(generator)來”同時”執行多個函數(routine)。

      Python的中yield關鍵字與Coroutine說的是一件事情,先看看yield的基本用法。

      任何包含yield關鍵字的函數都會自動成為生成器(generator)對象,里面的代碼一般是一個有限或無限循環結構,每當第一次調用該函數時,會執行到yield代碼為止并返回本次迭代結果,yield指令起到的是return關鍵字的作用。然后函數的堆棧會自動凍結(freeze)在這一行。當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時,就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。通過這種方式,迭代器可以實現無限序列和惰性求值。

      看一個用生成器來計算100以內斐波那契數列的例子。我們先用普通遞歸方式來進行計算。

      a = b = 1
      while a < 100:
          a, b = b, a + b
          print a,

      再來用yield和生成器來計算斐波那契數列,該函數形成一個無限循環的生成器,由函數調用者顯式地控制迭代次數。

      #!/usr/bin/env python

       coding=utf-8

       測試utf-8編碼

      import sys

      reload(sys) sys.setdefaultencoding('utf-8')

      def fibonacci():     a = b = 1     # yield則像是generator函數的返回結果     yield a     yield b     while True:         a, b = b, a+b         # yield唯一所做的另一件事就是保存一個generator函數的狀態,         # generator就是一個特殊類型的迭代器(iterator)         yield b

      num = 0 fib = fibonacci() while num < 100:     # 和迭代器相似,我們可以通過使用next()來從generator中獲取下一個值,也可以通過隱式地調用next()來忽略一些值     num = next(fib)     print num,     # 1 1 2 3 5 8 13 21 34 55 89 144</pre>

      總而言之,生成器(以及yield語句)最初的引入是為了讓程序員可以更簡單的編寫用來產生值的序列的代碼。 以前,要實現類似隨機數生成器的東西,需要實現一個類或者一個模塊,在生成數據的同時保持對每次調用之間狀態的跟蹤。引入生成器之后,這變得非常簡單。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!