首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >学习在Django中集成Celery

学习在Django中集成Celery

原创
作者头像
rxg456
发布2025-02-19 23:16:03
发布2025-02-19 23:16:03
28500
代码可运行
举报
运行总次数:0
代码可运行

一、基础概念理解

  1. 为什么需要 Celery?
    • 解决耗时任务阻塞 HTTP 请求的问题(如邮件发送、文件处理、AI 计算等)。
    • 实现异步任务、定时任务和分布式任务队列。
  2. 核心组件
    • 消息代理(Broker):Redis/RabbitMQ,负责传递任务消息。
    • Worker:执行任务的进程。
    • 结果存储(Backend):存储任务执行结果(可选)。

二、环境准备

安装依赖

代码语言:bash
复制
pip install celery redis  # 使用 Redis 作为 Broker
pip install flower       # 可选,任务监控工具

项目结构

代码语言:bash
复制
your_project/
├── your_project/
│   ├── __init__.py
│   ├── celery.py       # 新增 Celery 初始化文件
│   ├── settings.py
│   └── urls.py
└── your_app/
    └── tasks.py        # 存放异步任务

三、配置 Celery

创建 celery.py

代码语言:python
代码运行次数:0
运行
复制
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()  # 自动发现所有 Django App 中的 tasks.py

修改 __init__.py

代码语言:python
代码运行次数:0
运行
复制
from .celery import app as celery_app
__all__ = ('celery_app',)

配置 settings.py

代码语言:python
代码运行次数:0
运行
复制
# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Broker 地址
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'  # 结果存储(可选)
CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区设置

四、编写第一个异步任务

在 App 中创建 tasks.py

代码语言:python
代码运行次数:0
运行
复制
from celery import shared_task
from django.core.mail import send_mail

@shared_task
def add(x, y):
    return x + y

调用异步任务

代码语言:python
代码运行次数:0
运行
复制
# 在视图或其他地方调用
from your_app.tasks import add

def register_user(request):
    # ... 用户注册逻辑
    add.delay(98, 98)  # 异步执行
    return HttpResponse("Registered!")

五、运行 Celery

启动 Worker

代码语言:bash
复制
celery -A your_project worker --loglevel=info

启动 Beat(定时任务)

代码语言:bash
复制
celery -A your_project beat --loglevel=info

监控任务(可选)

代码语言:bash
复制
celery -A your_project flower  # 访问 http://localhost:5555

六、定时任务配置

settings.py 中定义周期性任务:

代码语言:python
代码运行次数:0
运行
复制
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'cleanup-every-night': {
        'task': 'your_app.tasks.add',
        'schedule': crontab(hour=3, minute=30),  # 每天凌晨3:30执行
    },
}

七、进阶技巧

任务重试与错误处理

代码语言:python
代码运行次数:0
运行
复制
@shared_task(bind=True, max_retries=3)
def process_data(self, data):
    try:
        # 处理数据...
    except Exception as e:
        self.retry(exc=e, countdown=60)  # 60秒后重试

任务链与组合

代码语言:python
代码运行次数:0
运行
复制
from celery import chain
chain(task1.s(), task2.s(), task3.s()).apply_async()

结果处理

代码语言:python
代码运行次数:0
运行
复制
result = add.delay(98, 98)
print(result.get(timeout=10))  # 获取任务结果(阻塞)

八、生产环境注意事项

使用 Supervisor 管理进程

代码语言:ini
复制
[program:celery_worker]
command=celery -A your_project worker --loglevel=info
directory=/path/to/your_project
autostart=true
autorestart=true

优化并发

代码语言:bash
复制
celery -A your_project worker --concurrency=4  # 根据 CPU 核心数调整

安全配置

  • 为 Redis 设置密码。
  • 记得保护 Broker 端口。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、基础概念理解
  • 二、环境准备
  • 三、配置 Celery
  • 四、编写第一个异步任务
  • 五、运行 Celery
  • 六、定时任务配置
  • 七、进阶技巧
  • 八、生产环境注意事项
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档