用 Celery 實現郵件推送系統

jopen 9年前發布 | 23K 次閱讀 Celery

郵件的發送存在耗時長,成功率不穩定等問題,需要進行異步處理,并且異步處理時還應該根據發送狀態考慮重發。

系統需求

本文以Celery 實現分布式任務隊列為基礎,簡述了一個郵件推送系統的模型。

Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等著被分配工作的碼農。

需求:

1.在郵件推送系統中,我們需要對成千上萬的用戶發送郵件,發送郵件具有時效性,即不能說今天開始發郵件,要等到明天才能發送完畢。

2.發送郵件過程中,可能會遇到過于頻繁,郵件服務器上信件堆積無法及時接受新信件而產生的拒信,或者郵件服務器將我們的郵件判決為垃圾郵件。

3.郵件發送的 I/O 時間較長,不能讓程序在等待郵件服務器返回消息上浪費時間。

所以我們的推送系統要有以下特性:1.分布式處理作業;2.閉環監控;3.異步式分發作業

系統框圖

用 Celery 實現郵件推送系統

前端通過 ajax 調用 views 中的 callpush 接口,該接口將被推送用戶的篩選條件傳入 service,然后 service 請求數據庫,將返回數據作為參數調用 celery 接口中 addtask 函數。celery 接口中 addtask 根據 action 參數來判斷所要添加的任務類型,根據不同的類型分別進行處理,放入隊列。

系統的另外一頭,worker 從隊列中取出任務,用 mail 函數推送郵件,如果發送失敗就調用 error_handler 進行異常處理,此處我們將所有 task 的執行情況放入 redis 中,給每個任務進行標記,如果成功則標記為 1,失敗則 0.

前端可以通過 ajax 調用 pushstatus 來向 redis 中讀取任務執行情況,此處我們返回了成功和失敗任務的個數。

偽代碼實現

# Controller
from redis import StrictRedis
red = StrictRedis(host='localhost', port=6379, db=0)

def callpush(request): area = request.POST.get('area') return HttpResponse(str(mailpush(area)))

def pushstatus(request): failure = red.scard('status:0:task') success = red.scard('status:1:task') return HttpResponse('Failures: ' + str(failure) + '\nSuccess: ' + str(success))

Service

def mailpush(**kargs): targets = MtUser.objects.filter(kargs).values('username', 'address') addtask(action='mailpush', data=targets, content='Hello %s!', subject='Greetings') return len(targets)

Celery Interface (Dispatcher)

from celery import Celery

app = Celery() app.config_from_object('celeryconfig')

def addtask(action, data, **kargs): if action == 'mailpush': for (address, username) in data: app.send_task('worker.mail', args=[kargs['subject'], kargs['content'] % username, address], link_error=app.signature('worker.error_handler')) elif action == 'messagepush': pass else: pass

Celery Backend (Worker)

from celery import Celery from celery import Task from redis import StrictRedis

app = Celery() app.config_from_object('celeryconfig') red = StrictRedis(host='localhost', port=6379, db=0)

@app.task(bind=True) def mail(self, subject, content, address): from django.core.mail import EmailMessage msg = EmailMessage(subject, content, 'admin@admin.com', address) msg.content_subtype = 'html' msg.send() red.sadd('status:1:task', self.request.id)

Overwrite the on_failure function in trace.py

@app.task def error_handler(uuid, args): print uuid print args red.set(uuid, args) red.sadd('status:0:task', uuid) red.srem('status:1:task', uuid)</pre>
來自:http://my.oschina.net/shinedev/blog/500554

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!