Celery 是一个强大的分布式任务队列系统,它允许你将耗时的任务从 Web 请求中分离出来,从而提高应用程序的响应速度。以下是如何使用 Celery 将 Web 请求添加到队列的基本步骤:
任务队列:一种软件架构模式,用于将任务放入队列中,由后台工作者异步处理。
Celery:一个基于分布式消息传递的异步任务队列/作业队列。
Broker:消息中间件,用于任务的传递。常见的有 RabbitMQ、Redis 等。
Backend:任务执行结果的存储后端。
首先,你需要安装 Celery 和你选择的消息中间件(例如 Redis):
pip install celery redis
创建一个 celery.py
文件来配置 Celery:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
在 tasks.py
文件中定义你的任务:
from celery import Celery
app = Celery('tasks')
@app.task
def add(x, y):
return x + y
在你的 Web 应用中,当接收到请求时,你可以将任务添加到 Celery 队列:
from flask import Flask
from tasks import add
app = Flask(__name__)
@app.route('/add')
def add_numbers():
result = add.delay(4, 6)
return f"Task ID: {result.id}"
if __name__ == '__main__':
app.run(debug=True)
原因:可能是 Broker 或 Backend 服务未启动,或者配置错误。
解决方法:确保 Redis 服务正在运行,并检查 celery.py
中的配置是否正确。
原因:任务执行时间过长,超过了默认的超时设置。
解决方法:在任务定义中增加 time_limit
和 soft_time_limit
参数:
@app.task(time_limit=300, soft_time_limit=240)
def long_running_task():
# 长时间运行的代码
原因:Backend 存储出现问题,或者结果过期。
解决方法:检查 Backend 的状态,确保其正常运行。调整 result_expires
参数以延长结果存储时间。
以下是一个完整的示例,包括 Flask Web 应用和 Celery 任务的集成:
# app.py
from flask import Flask
from tasks import add
app = Flask(__name__)
@app.route('/add')
def add_numbers():
result = add.delay(4, 6)
return f"Task ID: {result.id}"
if __name__ == '__main__':
app.run(debug=True)
# tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
确保 Redis 服务正在运行,然后启动 Celery worker:
celery -A tasks worker --loglevel=info
现在,当你访问 /add
路由时,加法任务将被添加到 Celery 队列中异步执行。
通过这种方式,你可以有效地利用 Celery 来处理 Web 应用中的后台任务,提升用户体验和应用性能。
云+社区技术沙龙[第14期]
云原生正发声
云+社区技术沙龙[第4期]
云+社区技术沙龙[第17期]
云+社区技术沙龙[第1期]
云+社区技术沙龙[第28期]
云+社区技术沙龙[第8期]
云+社区技术沙龙[第6期]
Elastic 中国开发者大会
领取专属 10元无门槛券
手把手带您无忧上云