celery集群管理實現

jopen 9年前發布 | 25K 次閱讀 Celery

celery集群管理實現


本來這個方案打算用在我的Sora上,但是因為某些問題打算棄用celery。但既然有人想問怎樣實現多機器的管理,那就寫出來了:

架構:

這里作為例子的celery app為myapp:

root@workgroup0:~/celeryapp# ls myapp
agent.py   celery.py   config.py   __init__.py
root@workgroup0:~/celeryapp#


公用代碼部分:

celery.py:(備注:172.16.77.175是任務發布節點的ip地址)

from future import absolute_import
from celery import Celery
app = Celery('myapp',
             broker='amqp://guest@172.16.77.175//',
             backend='amqp://guest@172.16.77.175//',
             include=['myapp.agent'])

app.config_from_object('myapp.config')

if name == 'main':   app.start()</pre>


config.py:

from future import absolute_import
from kombu import Queue,Exchange
from datetime import timedelta

CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] CELERY_RESULT_SERIALIZER='json'

CELERY_DEFAULT_EXCHANGE = 'agent' CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'

CELERT_QUEUES =  (   Queue('machine1',exchange='agent',routing_key='machine1'),   Queue('machine2',exchange='agent',routing_key='machine2'), )</pre>


__init__.py:(空白)


任務發布節點的agent.py:

from future import absolute_import
from myapp.celery import app

@app.task def add(x,y):     return {'the value is ':str(x+y)}

@app.task def writefile():     out=open('/tmp/data.txt','w')     out.write('hello'+'\n')     out.close()

@app.task def mul(x,y):     return x*y

@app.task def xsum(numbers):     return sum(numbers)

@app.task def getl(stri):     return getlength(stri)

def getlength(stri):     return len(stri)</pre>


docker1上的agent.py:

from future import absolute_import
from myapp.celery import app

@app.task def add(x,y):     return {'value':str(x+y),'node_name':'docker1'}                   #增加了node_name用來識別節點

@app.task def writefile():     out=open('/tmp/data.txt','w')     out.write('hello'+'\n')     out.close()

@app.task def mul(x,y):     return x*y

@app.task def xsum(numbers):     return sum(numbers)

@app.task def getl(stri):     return getlength(stri)

def getlength(stri):     return len(stri)</pre>


docker2上的:

from future import absolute_import
from myapp.celery import app

@app.task def add(x,y):     return {'value':str(x+y),'node_name':'docker2'}

@app.task def writefile():     out=open('/tmp/data.txt','w')     out.write('hello'+'\n')     out.close()

@app.task def mul(x,y):     return x*y

@app.task def xsum(numbers):     return sum(numbers)

@app.task def getl(stri):     return getlength(stri)

def getlength(stri):     return len(stri)</pre>


在這個例子中我只測試add()函數:

在docker1節點上啟動worker:(用-Q指定監聽的queue)

root@workgroup1:~/celeryapp# celery -A myapp worker -l info -Q machine1
/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,    -------------- celery@workgroup1.hzg.com v3.1.17 (Cipater) ---- * -----  ---  *   -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty --  - ** ---  -  ---------- [config] -  ---------- .> app:         myapp:0x7f472d73f190 -  ---------- .> transport:   amqp://guest:@172.16.77.175:5672// -  ---------- .> results:     amqp://guest@172.16.77.175// -  ---  --- .> concurrency: 1 (prefork) -- * ----  --- * ----- [queues]  -------------- .> machine1         exchange=machine1(direct) key=machine1                 

[tasks]   . myapp.agent.add   . myapp.agent.getl   . myapp.agent.mul   . myapp.agent.writefile   . myapp.agent.xsum

[2015-10-18 15:07:51,313: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672// [2015-10-18 15:07:51,340: INFO/MainProcess] mingle: searching for neighbors [2015-10-18 15:07:52,372: INFO/MainProcess] mingle: sync with 1 nodes [2015-10-18 15:07:52,374: INFO/MainProcess] mingle: sync complete [2015-10-18 15:07:52,423: WARNING/MainProcess] celery@workgroup1.hzg.com ready.</pre>


啟動docker2上的worker:

root@workgroup2:~/celeryapp# celery -A myapp worker -l info -Q machine2
/usr/local/lib/python2.7/dist-packages/celery/platforms.py:766: RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,    -------------- celery@workgroup2.hzg.com v3.1.18 (Cipater) ---- * -----  ---  *   -- Linux-3.13.0-24-generic-x86_64-with-Ubuntu-14.04-trusty --  - ** ---  -  ---------- [config] -  ---------- .> app:         myapp:0x7f708cb8ec10 -  ---------- .> transport:   amqp://guest:@172.16.77.175:5672// -  ---------- .> results:     amqp://guest@172.16.77.175// -  ---  --- .> concurrency: 1 (prefork) -- * ----  --- * ----- [queues]  -------------- .> machine2         exchange=machine2(direct) key=machine2                 

[tasks]   . myapp.agent.add   . myapp.agent.getl   . myapp.agent.mul   . myapp.agent.writefile   . myapp.agent.xsum

[2015-10-18 15:08:52,114: INFO/MainProcess] Connected to amqp://guest:**@172.16.77.175:5672// [2015-10-18 15:08:52,144: INFO/MainProcess] mingle: searching for neighbors [2015-10-18 15:08:53,174: INFO/MainProcess] mingle: sync with 1 nodes [2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete [2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.</pre>


在任務發布節點發布一個計算任務給docker1:

root@workgroup0:~/celeryapp# ls
default.etcd  hots.sh  hotswap.py  myapp  myapp1tmp  people.db  resp  sora  test.py
root@workgroup0:~/celeryapp# python
Python 2.7.6 (default, Mar 22 2014, 22:59:56) 
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from myapp.agent import add
>>> res = add.apply_async(args=[122,34],queue='machine1',routing_key='machine1')
>>> res.get()
{u'value': u'156', u'node_name': u'docker1'}

用get()可以看到來自docker1的返回,再看看docker1的顯示:

[2015-10-18 15:11:51,217: INFO/MainProcess] Task myapp.agent.add[c487a9a2-e5cc-462b-a131-784b363a1952] succeeded in 0.03602907s: {'value': '156', 'node_name': 'docker1'}

至于docker2,一點沒動:

[2015-10-18 15:08:53,176: INFO/MainProcess] mingle: sync complete
[2015-10-18 15:08:53,227: WARNING/MainProcess] celery@workgroup2.hzg.com ready.


發布一個任務給docker2:

>>> res = add.apply_async(args=[1440,900],queue='machine2',routing_key='machine2')
>>> res.get()
{u'value': u'2340', u'node_name': u'docker2'}
>>>


因為在配置文件中已經定義好了默認的exchange,因此只需指定queue和routing key即可把任務發到指定的節點上。但是這樣的架構不容易增刪節點(我的項目就是如此),我還是研究了一個使用actor模型+etcd任務持久化的架構開發Sora。

總結:這樣一來,就可以實現集群的管理。但是任務發布節點必須維護一個queue與routing key的記錄,以便指定集群中的節點執行任務。建議根據情況改變exchange的設置,節點多的時候不應該只用一個default exchange。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!