前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python测试开发django-159.Celery 异步与 RabbitMQ 环境搭建

python测试开发django-159.Celery 异步与 RabbitMQ 环境搭建

作者头像
上海-悠悠
发布2021-10-22 16:23:59
1.1K0
发布2021-10-22 16:23:59
举报
文章被收录于专栏:从零开始学自动化测试

前言

Celery是一个Python任务队列系统,用于处理跨线程或网络节点的工作任务分配。它使异步任务管理变得容易。 您的应用程序只需要将消息推送到像RabbitMQ这样的代理,Celery worker会弹出它们并安排任务执行。

Celery

celery 的5个角色

  • Task 就是任务,有异步任务(Async Task)和定时任务(Celery Beat)
  • Broker 中间人,接收生产者发来的消息即Task,将任务存入队列。 任务的消费者是Worker。 Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
  • Worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
  • Beat 定时任务调度器,根据配置定时将任务发送给Broker。
  • Backend 用于存储任务的执行结果。

环境准备

1.django环境v2.1.2 2.安装celery版本

代码语言:javascript
复制
pip install celery==3.1.26.post2

3.安装django-celery包

代码语言:javascript
复制
pip install django-celery==3.3.1

RabbitMQ 环境

Broker(RabbitMQ) 负责创建任务队列,根据一些路由规则将任务分派到任务队列,然后将任务从任务队列交付给 worker 先使用docker 搭建RabbitMQ 环境,rabbitMQ 镜像仓库地址 https://hub.docker.com/_/rabbitmq找带有 mangement的版本,会带web后台管理界面

下载 3.8.0-management 镜像

代码语言:javascript
复制
docker pull rabbitmq:3.8.0-management

启动容器,设置账号 admin 和密码 123456

代码语言:javascript
复制
docker run -d --name rabbitmq3.8 -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8.0-management

宿主机需开放 5672 和 15672 这 2 个端口,5672 是后端接口访问的端口,15672 是前端 web 管理后台页面地址,输入http://ip:15672可以访问 web 网站

输入前面设置的账号 admin 和密码 123456 可以直接登录

Django 中使用 Celery

要在 Django 项目中使用 Celery,您必须首先定义 Celery 库的一个实例(称为“应用程序”)

如果你有一个现代的 Django 项目布局,比如:

代码语言:javascript
复制
- proj/
  - manage.py
  - proj/
    - __init__.py
    - settings.py
    - urls.py

那么推荐的方法是创建一个新的proj/proj/celery.py模块来定义 Celery 实例:

代码语言:javascript
复制
import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

其中debug_task是测试的任务,可以注掉

代码语言:javascript
复制
# @app.task(bind=True)
# def debug_task(self):
#     print('Request: {0!r}'.format(self.request))

上面一段只需改这句,’proj’是自己django项目的app名称

代码语言:javascript
复制
app = Celery('proj')

然后你需要在你的proj/proj/__init__.py 模块中导入这个应用程序。这确保在 Django 启动时加载应用程序,以便@shared_task装饰器(稍后提到)将使用它:

代码语言:javascript
复制
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

上面这段固定的,不用改

tasks任务

在app下新建tasks.py,必须要是tasks.py文件名称,django会自动查找到app下的该文件

代码语言:javascript
复制
@shared_task
def add(x, y):
    print("task----------1111111111111111111111")
    return x + y

@shared_task
def mul(x, y):
    return x * y

tasks.py可以写任务函数add、mul,让它生效的最直接的方法就是添加app.task 或shared_task 这个装饰器

添加setting配置

setting.py添加配置

  • broker参数表示用来连接broker的URL,rabbitmq采用的是一种称为’amqp’的协议,如果rabbitmq运行在默认设置下,celery不需要其他信息,只要amqp://即可。
  • backend参数是可选的,如果想要查询任务状态或者任务执行结果时必填, Celery中的后端用于存储任务结果。 rpc意味着将结果作为AMQP消息发送回去。
代码语言:javascript
复制
#   RabbitMQ配置BROKER_URL 和backend
BROKER_URL = 'amqp://admin:123456@192.168.1.11:5672//'
CELERY_RESULT_BACKEND = 'rpc://'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

创建视图

views.py创建视图

代码语言:javascript
复制
from .tasks import add, mul

def task_demo(request):
    res = add.delay(10, 20)
    print(res.task_id)  # 返回task_id
    return JsonResponse({"code": 0, "res": res.task_id})

启动worker

前面pip已经安装过celery应用了,celery是一个独立的应用,可以启动worker

代码语言:javascript
复制
celery -A MyDjango worker -l info

其中MyDjango是你自己的django项目名称

运行日志

代码语言:javascript
复制
 -------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         yoyo:0x1ea1a96e9b0
- ** ---------- .> transport:   amqp://admin:**@192.168.1.11:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[tasks]
  . yoyo.tasks.add
  . yoyo.tasks.mul

[2021-10-18 22:45:03,155: INFO/MainProcess] Connected to amqp://admin:**@192.168.1.11:5672//
[2021-10-18 22:45:03,347: INFO/MainProcess] mingle: searching for neighbors
[2021-10-18 22:45:04,897: INFO/MainProcess] mingle: all alone
[2021-10-18 22:45:05,406: WARNING/MainProcess] e:\python36\lib\site-packages\celery\fixups\django.py:265: 
UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2021-10-18 22:45:05,407: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.

运行的时候,当我们看到” Connected to amqp”说明已经连接成功了!

shell交互环境

在django shell交互环境调试运行任务

代码语言:javascript
复制
D:\202107django\MyDjango>python manage.py shell
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from yoyo.tasks import add,mul
>>> from celery.result import AsyncResult
>>>
>>> res = add.delay(11, 12)
>>> res
<AsyncResult: c5ff83a4-4840-4b36-8869-5ce6081904f1>
>>> res.status
'SUCCESS'
>>>
>>> res.backend
<celery.backends.redis.RedisBackend object at 0x0000015E011C3128>
>>>
>>> res.task_id
'c5ff83a4-4840-4b36-8869-5ce6081904f1'
>>>
>>>
>>> get_task = AsyncResult(id=res.task_id)
>>> get_task
<AsyncResult: c5ff83a4-4840-4b36-8869-5ce6081904f1>
>>> get_task.result
23
>>>

res.status是查看任务状态 res.task_id 是获取任务的id res.result 获取任务结果 根据任务的id查询任务的执行结果AsyncResult(id=res.task_id).result获取

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 从零开始学自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Celery
  • 环境准备
  • RabbitMQ 环境
  • Django 中使用 Celery
  • tasks任务
  • 添加setting配置
  • 创建视图
  • 启动worker
  • shell交互环境
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档