異步任務神器 Celery 簡明筆記
異步任務
異步任務是web開發中一個很常見的方法。對于一些耗時耗資源的操作,往往從主應用中隔離,通過異步的方式執行。簡而言之,做一個注冊的功能,在用戶使用郵箱注冊成功之后,需要給該郵箱發送一封激活郵件。如果直接放在應用中,則調用發郵件的過程會遇到網絡IO的阻塞,比好優雅的方式則是使用異步任務,應用在業務邏輯中觸發一個異步任務。
實現異步任務的工具有很多,其原理都是使用一個任務隊列,比如使用redis生產消費模型或者發布訂閱模式實現一個簡單的消息隊列。
除了redis,還可以使用另外一個神器---Celery。Celery是一個異步任務的調度工具。它是Python寫的庫,但是它實現的通訊協議也可以使用ruby,php,javascript等調用。異步任務除了消息隊列的后臺執行的方式,還是一種則是跟進時間的計劃任務。下面將會介紹如何使用celery實現這兩種需求。
Celry broker 和 backend
最早學習celery的時候,冒出了一個rabbitmq,又冒出一個redis。當時一頭霧水。實際上這正是celery的設計奧妙。簡單來說,rabbitmq是一個采用Erlang寫的強大的消息隊列工具。在celery中可以扮演broker的角色。那么什么是broker?
broker是一個消息傳輸的中間件,可以理解為一個郵箱。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,而后celery的worker將會取到消息,進行對于的程序執行。好吧,這個郵箱可以看成是一個消息隊列。那么什么又是backend,通常程序發送的消息,發完就完了,可能都不知道對方時候接受了。為此,celery實現了一個backend,用于存儲這些消息以及celery執行的一些消息和結果。對于 brokers,官方推薦是rabbitmq和redis,至于backend,就是數據庫啦。為了簡單起見,我們都用redis。
Getting Starting
使用celery包含三個方面,其一是定義任務函數,其二是運行celery服務,最后是客戶應用程序的調用。
創建一個文件 tasks.py
輸入下列代碼:
from celery import Celery brokers = 'redis://127.0.0.1:6379/5' backend = 'redis://127.0.0.1:6379/6' app = Celery('tasks', broker=broker, backend=backend) @app.task def add(x, y): return x + y
上述代碼導入了celery,然后創建了celery實例app,實力話的過程中,指定了任務名tasks
(和文件名一致),傳入了broker和backend。然后創建了一個任務函數add
。
下面就啟動celery服務
在當前命令行終端運行:
celery -A tasks worker --loglevel=info
此時會看見一對輸出。包括注冊的任務啦。
下面客戶端程序如何調用呢?打開一個命令行,進入Python環境
In [0]:from tasks import add In [1]: r = add.delay(2, 2) In [2]: add.delay(2, 2) Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d> In [3]: r = add.delay(3, 3) In [4]: r.re r.ready r.result r.revoke In [4]: r.ready() Out[4]: True In [6]: r.result Out[6]: 6 In [7]: r.get() Out[7]: 6
在celery命令行可以看見celery執行的日志:
[2015-09-20 21:37:06,086: INFO/MainProcess] Task proj.tasks.add[76beb980-0f55-4629-a4fb-4a1776428ea8] succeeded in 0.00089102005586s: 6
打開 backend的redis,也可以看見celery執行的信息。
現在時在python環境中調用的add函數,實際上通常在應用程序中調用這個方法。需要注意,如果把返回值賦值給一個變量,那么原來的應用程序也會被阻塞,需要等待異步任務返回的結果。因此,實際使用中,不需要把結果賦值。
計劃任務
上述的使用是簡單的配置,下面介紹一個更健壯的方式來使用celery。首先創建一個python包,celery服務,姑且命名為proj。目錄文件如下:
? proj tree . ├── __init__.py ├── celery.py # 創建 celery 實例 ├── config.py # 配置文件 └── tasks.py # 任務函數
首先是 celery.py
#!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import from celery import Celery app = Celery('proj', include=['proj.tasks']) app.config_from_object('proj.config') if __name__ == '__main__': app.start()
這一次創建 app,并沒有直接指定 broker 和 backend。而是在配置文件中。
config.py
#!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5' BROKER_URL = 'redis://127.0.0.1:6379/6'
剩下的就是tasks.py
#!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import from proj.celery import app @app.task def add(x, y): return x + y
使用方法也很簡單,在proj的同一級目錄執行celery:
celery -A proj worker -l info
現在使用任務也很簡單,直接在客戶端代碼調用 proj.tasks 里的函數即可。
Scheduler
一種常見的需求是每隔一段時間執行一個任務。配置如下
config.py
#!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5' BROKER_URL = 'redis://127.0.0.1:6379/6' CELERY_TIMEZONE = 'Asia/Shanghai' from datetime import timedelta CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'proj.tasks.add', 'schedule': timedelta(seconds=30), 'args': (16, 16) }, }
注意配置文件需要指定時區。這段代碼表示每隔30秒執行 add 函數。
一旦使用了 scheduler, 啟動 celery需要加上-B 參數
celery -A proj worker -B -l info
crontab
計劃任務當然也可以用crontab實現,celery也有crontab模式。修改 config.py
#!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5' BROKER_URL = 'redis://127.0.0.1:6379/6' CELERY_TIMEZONE = 'Asia/Shanghai' from celery.schedules import crontab CELERYBEAT_SCHEDULE = { # Executes every Monday morning at 7:30 A.M 'add-every-monday-morning': { 'task': 'tasks.add', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), }, }
總而言之,scheduler的切分度更細,可以精確到秒。crontab模式就不用說了。當然celery還有更高級的用法,比如多個機器使用,啟用多個worker并發處理等。