Celery 實現分布式任務隊列
Celery 簡介
Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等著被分配工作的碼農。
在 Python 中定義 Celery 的時候,我們要引入 Broker,中文中有中間人的意思,在這里 Broker 起到一個中間人的角色。在工頭提出任務的時候,把所有的任務放到 Broker 里面,在 Broker 的另外一頭,一群碼農等著取出一個個任務準備著手做。
這種模式注定了整個系統會是個開環系統,工頭對于碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 有點像我們的 Broker,也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。
Show me the code
# 以下為 dispatcher.py from worker import divide1
divide.delay(1,2)
2
divide.apply_async((1, 2))
以下為 worker.py
from celery import Celery app = Celery('tasks', backend='amqp://guest@localhost//', broker='redis://')
@app.task def divide(x, y): print x / y</pre>
worker.py 中新建了一個 Celery 實例,以 amqp 作為 broker,以 redis 作為 backend 儲存所有 task 執行的歷史記錄。我們在此例中使用 RabbitMQ 作為我們的消息隊列服務器。
我們一方面通過命令行中執行以下語句來啟動 celery 服務。
celery -A worker worker --loglevel=info另外一方面,我們運行 dispatch.py,代碼中將 worker 中的 divide 函數導入,再接著以兩種方式將 task 啟動。第一種方法中的 delay 方法接收了兩個參數,實際為第二種方法的便捷調用,第二種方法在使用時,要將我們要傳給 divide 的參數作為 tuple 放在第一個參數位置。
apply_async 的其他參數
apply_async 還支持其他參數,比如設置回調。
設置 task 實例的回調可以采用 link:
divide.apply_async((16, 2), link=divide.s(8))首先計算 16 / 2,然后把結果 8 / 8,最后執行的結果等于 1. 所以這里的 link 是指向一個后繼的調用函數,即完成當前 divide 以后再進行下一個 divide 操作。除了 link 之外,還有 link_error,只會在該任務執行失敗時調用。在本例中,我們可以在 divide 執行失敗時,執行 link_error 所指的函數,這個函數就是錯誤消息的處理句柄,它會接收到一個 task 的 UUID,我們可以通過 UUID 來訪問出錯的任務的異常狀態。
# dispatcher divide.apply_async((1, 0), link_error=error_handler.s())這里我們把 1 和 0 放到了 divide 函數中執行,引發了除零異常,繼而執行 link_error 對應的 error_handler,error_handler 接收到 uuid 參數,通過 AsyncResult 生產一個結果實例,我們可以用 result.state 打印出該任務的執行情況,用 result.info 來獲取異常的具體信息。
worker
@app.task def divide(x, y): print x/y
@app.task def error_handler(uuid): result = AsyncResult(uuid) print 'task error {0}'.format(uuid)
[2015-09-01 13:43:26,569: WARNING/Worker-2] task error 8e516377-a6c0-4a40-934f-dd1b0692c5fa
print result.state
[2015-09-01 13:43:26,572: WARNING/Worker-2] FAILURE
print result.info
[2015-09-01 13:43:26,572: WARNING/Worker-2] integer division or modulo by zero</pre>
跟蹤異常的成因
異常的成因我們可以如上述代碼所示將 result.info 打印出來而得知。然而我們并不能滿足于此,僅僅知曉出錯的 task 的 UUID 和其狀態是不夠的,我們想要知道發生錯誤時,task 的傳入參數是什么。我一開始沒有嘗試出通過 UUID 來獲取到原來的 1 和 0 這兩個參數,后來我追蹤了 apply_async 這個函數,位于 task.py 中,再跟蹤到 trace.py 中的 build_tracer 函數,果然在 link_error 的調用時只傳遞了 UUID 一個參數,代碼如下:
def on_error(request, exc, uuid, state=FAILURE, call_errbacks=True): if propagate: raise I = Info(state, exc) R = I.handle_error_state(task, eager=eager) if call_errbacks: group( [signature(errback, app=app) for errback in request.errbacks or []], app=app, ).apply_async((uuid, ))# ).apply_async((uuid, request.args)) # 可以改成上一行注釋中的代碼,這樣就可以在 error_handler 中得到原來調用的任務的輸入參數了。 return I, R, I.state, I.retval</pre><br />
此處通過修改 celery 源代碼來獲取出錯時 task 的傳入參數,但是方法并不好。于是我想能不能通過 UUID 直接獲取到原來的 task,然后查看 task 的 args,但是這篇文檔有些晦澀難懂,我就先放棄了,便發現了以下方法。
class DebugTask(Task): abstract = True def on_failure(self, *args, **kwargs): print self.request.args@app.task(base=DebugTask) def divide(x, y): print x / y</pre>
這段代碼將原來應該繼承的 Task 類中的 on_failure 函數重寫,當 divide 函數發生異常時,該 task 的 state 自動變成 failure,Task 會自動調用 on_failure 函數,從而打印出傳入的 args。
任務的遠程調用
關于 task 的調用,celery 還提供了另外一種 send_task 方法。
Celery 作為分布式系統,自然就支持遠程 worker,這個時候我們可以利用 send_task 這個函數,以函數名的方式調用 task。代碼如下:
from celery import Celeryapp = Celery() app.config_from_object('celeryconfig') app.send_task('worker.divide', args=[1, 0])
send_task 也支持 link_error,這個官方文檔上沒寫詳細,這里需要調用 signature 函數來生產函數的 signature,這時 divide 的 UUID 和我們通過修改源代碼得到的 args。
app.send_task('worker.divide', args=[1, 0], link_error=app.signature('worker.error_handler'))</pre>
這里我們沒有通過 module 的方式把 divide 函數給 import 到程序中來,也就意味著我們可以不將 worker 放在與 dispatcher 同一目錄下。我們的想法是,將 worker 放在另外一臺服務器上,通過 celery 調用它,本地 django 項目調用這個 dispatcher 后,將 task 發送到遠程服務器的隊列中,然后由遠程服務器中的 worker 處理。
配置文件
此時需要注意的是,這里的 dispatcher 是通過文件的方式配置的,其配置文件應與 worker 端配置文件吻合,如下:
# celeryconfig.pycoding=utf-8
Broker 設置 RabbitMQ
BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_RESULT_BACKEND = 'redis://'
Tasks 位于 worker.py 中
CELERY_IMPORTS = ('worker', )
默認為1次/秒的任務
CELERY_ANNOTATIONS = {'worker.divide': {'rate_limit': '1/s'}}
CELERY_ROUTES = {'worker.divide': {'queue': 'divide'}, 'worker.error_handler': {'queue': 'error'}}
默認所有格式為 json
CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json']</pre>
使用了配置文件以后,我們在 worker 中也可以采用相同的方式定義 app,如下:# coding=utf-8 from celery import Celery app = Celery() app.config_from_object('celeryconfig')@app.task def divide(x, y): print x / y</pre>
我們在配置文件中為 worker.divide 這個 task 指定了 divide 這個隊列,為 error_handler 定義了 error 這個隊列用于錯誤處理。在啟動 celery 的時候可以通過 -Q 參數指定隊列。在終端中執行了以下命令后,celery 服務器就啟動了,當前 celery 會監視 divide 隊列,取出參數執行任務。而如果我們不啟動另外一個 celery 來監視 error 隊列,error_handler 就不會前往隊列去拿參數執行。
celery -A worker worker --loglevel=info -Q divide
關于 Celery,網上英文教程都不多,更別說中文的了。
網上有些關于 Celery 性能的討論,我暫且沒有做分析,如果有更好的解決方案能夠替代它,請留言告知。
如果發現本文有錯誤,請指正。
來自:http://my.oschina.net/shinedev/blog/500120