用 Celery 實現郵件推送系統
郵件的發送存在耗時長,成功率不穩定等問題,需要進行異步處理,并且異步處理時還應該根據發送狀態考慮重發。
系統需求
本文以Celery 實現分布式任務隊列為基礎,簡述了一個郵件推送系統的模型。
Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等著被分配工作的碼農。
需求:
1.在郵件推送系統中,我們需要對成千上萬的用戶發送郵件,發送郵件具有時效性,即不能說今天開始發郵件,要等到明天才能發送完畢。
2.發送郵件過程中,可能會遇到過于頻繁,郵件服務器上信件堆積無法及時接受新信件而產生的拒信,或者郵件服務器將我們的郵件判決為垃圾郵件。
3.郵件發送的 I/O 時間較長,不能讓程序在等待郵件服務器返回消息上浪費時間。
所以我們的推送系統要有以下特性:1.分布式處理作業;2.閉環監控;3.異步式分發作業
系統框圖
前端通過 ajax 調用 views 中的 callpush 接口,該接口將被推送用戶的篩選條件傳入 service,然后 service 請求數據庫,將返回數據作為參數調用 celery 接口中 addtask 函數。celery 接口中 addtask 根據 action 參數來判斷所要添加的任務類型,根據不同的類型分別進行處理,放入隊列。
系統的另外一頭,worker 從隊列中取出任務,用 mail 函數推送郵件,如果發送失敗就調用 error_handler 進行異常處理,此處我們將所有 task 的執行情況放入 redis 中,給每個任務進行標記,如果成功則標記為 1,失敗則 0.
前端可以通過 ajax 調用 pushstatus 來向 redis 中讀取任務執行情況,此處我們返回了成功和失敗任務的個數。
偽代碼實現
# Controller from redis import StrictRedis red = StrictRedis(host='localhost', port=6379, db=0)def callpush(request): area = request.POST.get('area') return HttpResponse(str(mailpush(area)))
def pushstatus(request): failure = red.scard('status:0:task') success = red.scard('status:1:task') return HttpResponse('Failures: ' + str(failure) + '\nSuccess: ' + str(success))
Service
def mailpush(**kargs): targets = MtUser.objects.filter(kargs).values('username', 'address') addtask(action='mailpush', data=targets, content='Hello %s!', subject='Greetings') return len(targets)
Celery Interface (Dispatcher)
from celery import Celery
app = Celery() app.config_from_object('celeryconfig')
def addtask(action, data, **kargs): if action == 'mailpush': for (address, username) in data: app.send_task('worker.mail', args=[kargs['subject'], kargs['content'] % username, address], link_error=app.signature('worker.error_handler')) elif action == 'messagepush': pass else: pass
Celery Backend (Worker)
from celery import Celery from celery import Task from redis import StrictRedis
app = Celery() app.config_from_object('celeryconfig') red = StrictRedis(host='localhost', port=6379, db=0)
@app.task(bind=True) def mail(self, subject, content, address): from django.core.mail import EmailMessage msg = EmailMessage(subject, content, 'admin@admin.com', address) msg.content_subtype = 'html' msg.send() red.sadd('status:1:task', self.request.id)
Overwrite the on_failure function in trace.py
@app.task def error_handler(uuid, args): print uuid print args red.set(uuid, args) red.sadd('status:0:task', uuid) red.srem('status:1:task', uuid)</pre>
來自:http://my.oschina.net/shinedev/blog/500554