Celery 實現分布式任務隊列

g2md 9年前發布 | 56K 次閱讀 Celery 分布式/云計算/大數據

Celery 簡介

Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等著被分配工作的碼農。

在 Python 中定義 Celery 的時候,我們要引入 Broker,中文中有中間人的意思,在這里 Broker 起到一個中間人的角色。在工頭提出任務的時候,把所有的任務放到 Broker 里面,在 Broker 的另外一頭,一群碼農等著取出一個個任務準備著手做。

這種模式注定了整個系統會是個開環系統,工頭對于碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 有點像我們的 Broker,也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。

Show me the code

# 以下為 dispatcher.py
from worker import divide

1

divide.delay(1,2)

2

divide.apply_async((1, 2))

以下為 worker.py

from celery import Celery app = Celery('tasks', backend='amqp://guest@localhost//', broker='redis://')

@app.task def divide(x, y): print x / y</pre>

worker.py 中新建了一個 Celery 實例,以 amqp 作為 broker,以 redis 作為 backend 儲存所有 task 執行的歷史記錄。我們在此例中使用 RabbitMQ 作為我們的消息隊列服務器。

我們一方面通過命令行中執行以下語句來啟動 celery 服務。

celery -A worker worker --loglevel=info 

另外一方面,我們運行 dispatch.py,代碼中將 worker 中的 divide 函數導入,再接著以兩種方式將 task 啟動。第一種方法中的 delay 方法接收了兩個參數,實際為第二種方法的便捷調用,第二種方法在使用時,要將我們要傳給 divide 的參數作為 tuple 放在第一個參數位置。

apply_async 的其他參數

apply_async 還支持其他參數,比如設置回調。

設置 task 實例的回調可以采用 link:

divide.apply_async((16, 2), link=divide.s(8))

首先計算 16 / 2,然后把結果 8 / 8,最后執行的結果等于 1. 所以這里的 link 是指向一個后繼的調用函數,即完成當前 divide 以后再進行下一個 divide 操作。除了 link 之外,還有 link_error,只會在該任務執行失敗時調用。在本例中,我們可以在 divide 執行失敗時,執行 link_error 所指的函數,這個函數就是錯誤消息的處理句柄,它會接收到一個 task 的 UUID,我們可以通過 UUID 來訪問出錯的任務的異常狀態。

# dispatcher
divide.apply_async((1, 0), link_error=error_handler.s())

這里我們把 1 和 0 放到了 divide 函數中執行,引發了除零異常,繼而執行 link_error 對應的 error_handler,error_handler 接收到 uuid 參數,通過 AsyncResult 生產一個結果實例,我們可以用 result.state 打印出該任務的執行情況,用 result.info 來獲取異常的具體信息。

worker

@app.task def divide(x, y): print x/y

@app.task def error_handler(uuid): result = AsyncResult(uuid) print 'task error {0}'.format(uuid)

[2015-09-01 13:43:26,569: WARNING/Worker-2] task error 8e516377-a6c0-4a40-934f-dd1b0692c5fa

print result.state

[2015-09-01 13:43:26,572: WARNING/Worker-2] FAILURE

print result.info

[2015-09-01 13:43:26,572: WARNING/Worker-2] integer division or modulo by zero</pre>

跟蹤異常的成因

異常的成因我們可以如上述代碼所示將 result.info 打印出來而得知。然而我們并不能滿足于此,僅僅知曉出錯的 task 的 UUID 和其狀態是不夠的,我們想要知道發生錯誤時,task 的傳入參數是什么。我一開始沒有嘗試出通過 UUID 來獲取到原來的 1 和 0 這兩個參數,后來我追蹤了 apply_async 這個函數,位于 task.py 中,再跟蹤到 trace.py 中的 build_tracer 函數,果然在 link_error 的調用時只傳遞了 UUID 一個參數,代碼如下:

   def on_error(request, exc, uuid, state=FAILURE, call_errbacks=True):
        if propagate:
            raise
        I = Info(state, exc)
        R = I.handle_error_state(task, eager=eager)
        if call_errbacks:
            group(
                [signature(errback, app=app)
                 for errback in request.errbacks or []], app=app,
            ).apply_async((uuid, ))

        # ).apply_async((uuid, request.args))
        # 可以改成上一行注釋中的代碼,這樣就可以在 error_handler 中得到原來調用的任務的輸入參數了。
    return I, R, I.state, I.retval</pre><br />

此處通過修改 celery 源代碼來獲取出錯時 task 的傳入參數,但是方法并不好。于是我想能不能通過 UUID 直接獲取到原來的 task,然后查看 task 的 args,但是這篇文檔有些晦澀難懂,我就先放棄了,便發現了以下方法。

class DebugTask(Task):
    abstract = True
    def on_failure(self, *args, **kwargs):
        print self.request.args

@app.task(base=DebugTask) def divide(x, y): print x / y</pre>

這段代碼將原來應該繼承的 Task 類中的 on_failure 函數重寫,當 divide 函數發生異常時,該 task 的 state 自動變成 failure,Task 會自動調用 on_failure 函數,從而打印出傳入的 args。

任務的遠程調用

關于 task 的調用,celery 還提供了另外一種 send_task 方法。

Celery 作為分布式系統,自然就支持遠程 worker,這個時候我們可以利用 send_task 這個函數,以函數名的方式調用 task。代碼如下:

from celery import Celery

app = Celery() app.config_from_object('celeryconfig') app.send_task('worker.divide', args=[1, 0])

send_task 也支持 link_error,這個官方文檔上沒寫詳細,這里需要調用 signature 函數來生產函數的 signature,這時 divide 的 UUID 和我們通過修改源代碼得到的 args。

app.send_task('worker.divide', args=[1, 0], link_error=app.signature('worker.error_handler'))</pre>

這里我們沒有通過 module 的方式把 divide 函數給 import 到程序中來,也就意味著我們可以不將 worker 放在與 dispatcher 同一目錄下。我們的想法是,將 worker 放在另外一臺服務器上,通過 celery 調用它,本地 django 項目調用這個 dispatcher 后,將 task 發送到遠程服務器的隊列中,然后由遠程服務器中的 worker 處理。

配置文件

此時需要注意的是,這里的 dispatcher 是通過文件的方式配置的,其配置文件應與 worker 端配置文件吻合,如下:

# celeryconfig.py

coding=utf-8

Broker 設置 RabbitMQ

BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_RESULT_BACKEND = 'redis://'

Tasks 位于 worker.py 中

CELERY_IMPORTS = ('worker', )

默認為1次/秒的任務

CELERY_ANNOTATIONS = {'worker.divide': {'rate_limit': '1/s'}}

CELERY_ROUTES = {'worker.divide': {'queue': 'divide'}, 'worker.error_handler': {'queue': 'error'}}

默認所有格式為 json

CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json']</pre>
使用了配置文件以后,我們在 worker 中也可以采用相同的方式定義 app,如下:

# coding=utf-8
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')

@app.task def divide(x, y): print x / y</pre>

我們在配置文件中為 worker.divide 這個 task 指定了 divide 這個隊列,為 error_handler 定義了 error 這個隊列用于錯誤處理。在啟動 celery 的時候可以通過 -Q 參數指定隊列。在終端中執行了以下命令后,celery 服務器就啟動了,當前 celery 會監視 divide 隊列,取出參數執行任務。而如果我們不啟動另外一個 celery 來監視 error 隊列,error_handler 就不會前往隊列去拿參數執行。

celery -A worker worker --loglevel=info -Q divide

關于 Celery,網上英文教程都不多,更別說中文的了。

網上有些關于 Celery 性能的討論,我暫且沒有做分析,如果有更好的解決方案能夠替代它,請留言告知。

如果發現本文有錯誤,請指正。



來自:http://my.oschina.net/shinedev/blog/500120

 本文由用戶 g2md 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!