異步任務神器 Celery 簡明筆記

jopen 8年前發布 | 20K 次閱讀 Web框架

異步任務

異步任務是web開發中一個很常見的方法。對于一些耗時耗資源的操作,往往從主應用中隔離,通過異步的方式執行。簡而言之,做一個注冊的功能,在用戶使用郵箱注冊成功之后,需要給該郵箱發送一封激活郵件。如果直接放在應用中,則調用發郵件的過程會遇到網絡IO的阻塞,比好優雅的方式則是使用異步任務,應用在業務邏輯中觸發一個異步任務。

實現異步任務的工具有很多,其原理都是使用一個任務隊列,比如使用redis生產消費模型或者發布訂閱模式實現一個簡單的消息隊列

除了redis,還可以使用另外一個神器---Celery。Celery是一個異步任務的調度工具。它是Python寫的庫,但是它實現的通訊協議也可以使用ruby,php,javascript等調用。異步任務除了消息隊列的后臺執行的方式,還是一種則是跟進時間的計劃任務。下面將會介紹如何使用celery實現這兩種需求。

Celry broker 和 backend

最早學習celery的時候,冒出了一個rabbitmq,又冒出一個redis。當時一頭霧水。實際上這正是celery的設計奧妙。簡單來說,rabbitmq是一個采用Erlang寫的強大的消息隊列工具。在celery中可以扮演broker的角色。那么什么是broker?

broker是一個消息傳輸的中間件,可以理解為一個郵箱。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,而后celery的worker將會取到消息,進行對于的程序執行。好吧,這個郵箱可以看成是一個消息隊列。那么什么又是backend,通常程序發送的消息,發完就完了,可能都不知道對方時候接受了。為此,celery實現了一個backend,用于存儲這些消息以及celery執行的一些消息和結果。對于 brokers,官方推薦是rabbitmq和redis,至于backend,就是數據庫啦。為了簡單起見,我們都用redis。

Getting Starting

使用celery包含三個方面,其一是定義任務函數,其二是運行celery服務,最后是客戶應用程序的調用。

創建一個文件 tasks.py

輸入下列代碼:

from celery import Celery

brokers = 'redis://127.0.0.1:6379/5'
backend = 'redis://127.0.0.1:6379/6'


app = Celery('tasks', broker=broker, backend=backend)

@app.task
def add(x, y):
    return x + y

上述代碼導入了celery,然后創建了celery實例app,實力話的過程中,指定了任務名tasks(和文件名一致),傳入了broker和backend。然后創建了一個任務函數add

下面就啟動celery服務

在當前命令行終端運行:

celery -A tasks worker  --loglevel=info

此時會看見一對輸出。包括注冊的任務啦。

下面客戶端程序如何調用呢?打開一個命令行,進入Python環境

In [0]:from tasks import add
In [1]: r = add.delay(2, 2)
In [2]: add.delay(2, 2)
Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d>

In [3]: r = add.delay(3, 3)

In [4]: r.re
r.ready   r.result  r.revoke

In [4]: r.ready()
Out[4]: True

In [6]: r.result
Out[6]: 6

In [7]: r.get()
Out[7]: 6

在celery命令行可以看見celery執行的日志:

[2015-09-20 21:37:06,086: INFO/MainProcess] Task proj.tasks.add[76beb980-0f55-4629-a4fb-4a1776428ea8] succeeded in 0.00089102005586s: 6

打開 backend的redis,也可以看見celery執行的信息。

現在時在python環境中調用的add函數,實際上通常在應用程序中調用這個方法。需要注意,如果把返回值賦值給一個變量,那么原來的應用程序也會被阻塞,需要等待異步任務返回的結果。因此,實際使用中,不需要把結果賦值。

計劃任務

上述的使用是簡單的配置,下面介紹一個更健壯的方式來使用celery。首先創建一個python包,celery服務,姑且命名為proj。目錄文件如下:

?  proj  tree
.
├── __init__.py
├── celery.py             # 創建 celery 實例
├── config.py                # 配置文件
└── tasks.py                # 任務函數

首先是 celery.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from __future__ import absolute_import
from celery import Celery

app = Celery('proj', include=['proj.tasks'])

app.config_from_object('proj.config')

if __name__ == '__main__':
    app.start()

這一次創建 app,并沒有直接指定 broker 和 backend。而是在配置文件中。

config.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from __future__ import absolute_import

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'

剩下的就是tasks.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from __future__ import absolute_import
from proj.celery import app

@app.task
def add(x, y):
    return x + y

使用方法也很簡單,在proj的同一級目錄執行celery:

celery -A proj worker -l info

現在使用任務也很簡單,直接在客戶端代碼調用 proj.tasks 里的函數即可。

Scheduler

一種常見的需求是每隔一段時間執行一個任務。配置如下

config.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from __future__ import absolute_import

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'

CELERY_TIMEZONE = 'Asia/Shanghai'

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
         'task': 'proj.tasks.add',
         'schedule': timedelta(seconds=30),
         'args': (16, 16)
    },
}

注意配置文件需要指定時區。這段代碼表示每隔30秒執行 add 函數。

一旦使用了 scheduler, 啟動 celery需要加上-B 參數

celery -A proj worker -B -l info

crontab

計劃任務當然也可以用crontab實現,celery也有crontab模式。修改 config.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from __future__ import absolute_import

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'

CELERY_TIMEZONE = 'Asia/Shanghai'

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

總而言之,scheduler的切分度更細,可以精確到秒。crontab模式就不用說了。當然celery還有更高級的用法,比如多個機器使用,啟用多個worker并發處理等。

來自: http://www.jianshu.com/p/1840035cb510

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!