import time from celery import Celery
broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('my_tasks', broker=broker, backend=backend)
@app.task def add(x, y): print('enter task') time.sleep(3) return x + y if name == 'main': print('start task') result = add.delay(3, 18) print('end task') print(result)
logger = get_task_logger(name)
@task(bind=True) def add(self, x, y): logger.info(self.request.id)
add.apply_async((1, 2), countdown=10)
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@task(base=MyTask) def add(x, y): raise KeyError()
from celery import chain result = chain(add.s(1, 2), add.s(3), add.s(4)) # 1+2+3+4 result().get() 10
from celery import group group(add.s(1, 2), add.s(3,4), add.s(5,6))().get() [3, 7, 11]
@app.task def xsum(values): return sum(values)
from celery import chord chord((add.s(i, i) for i in xrange(10)), xsum.s())().get() # xsum 收到 [2,4,6,...,18] 90
add.chunks(zip(range(100), range(100)), 10)().get()