前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Celery-分布式任务队列学习笔记

Celery-分布式任务队列学习笔记

作者头像
earthchen
发布于 2020-09-24 07:23:33
发布于 2020-09-24 07:23:33
89200
代码可运行
举报
文章被收录于专栏:earthchen的专栏earthchen的专栏
运行总次数:0
代码可运行

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。 它是一个专注于实时处理的任务队列,同时也支持任务调度。 以上是celery自己官网的介绍

celery的应用场景很广泛

  • 处理异步任务
  • 任务调度
  • 处理定时任务
  • 分布式调度

好处也很多,尤其在使用python构建的应用系统中,无缝衔接,使用相当方便。

Celery

安装

安装Celery

推荐使用pip安装,如果你使用的是虚拟环境,请在虚拟环境里安装

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ pip install celery

安装消息中间件

Celery 支持 RabbitMQ、Redis 甚至其他数据库系统作为其消息代理中间件

你希望用什么中间件和后端就请自行安装,一般都使用redis或者RabbitMQ

安装Redis

Ubuntu系统下使用apt-get命令就可以

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ sudo apt-get install redis-server

如果你使用redis作为中间件,还需要安装redis支持包,同样使用pip安装即可

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ pip install redis

能出现以下结果即为成功

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
redis 127.0.0.1:6379>

其他的redis知识这里不左介绍,如果有兴趣,可以自行了解

如果你使用RabbitMQ,也请安装RabbitMQ

安装RabbitMQ
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ sudo apt-get install rabbitmq-server

使用Celery

简单直接使用

可以在需要的地方直接引入Celery,直接使用即可。最简单的方式只需要配置一个任务和中间人即可

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/3')

@app.task
def add(x, y):
    return x + y

我这里使用了redis作为中间件,这是可以按自己的习惯替换的

由于默认的配置不是最切合我们的项目实际需要,一般来说我们都需要按我们自己的要求配置一些, 但是由于需要将项目解耦,也好维护,我们最好使用单独的一个文件编写配置。

单独配置配置文件

比上面的稍微复杂一点,我们需要创建两个文件,一个为config.py的celery配置文件,在其中填写适合我们项目的配置,在创建一个tasks.py文件来编写我们的任务。文件的名字可以按你的喜好自己命名。

config.py内容为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# coding=utf-8
# 配置文件同一配置celery
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 把“脏活”路由到专用的队列:
CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}

# 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

配置好以后可以用以下命令检查配置文件是否正确(config为配置文件名)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ python -m config

tasks.py内容为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# coding=utf-8
from celery import Celery

app = Celery()
# 参数为配置文件的文件名
app.config_from_object('config')

@app.task
def add(x, y):
    return x + y

还有一种同一设置配置的方式,不是很推荐

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

在app使用前先需要用以上方法批量更新配置文件。

在应用上使用

工程目录结构为

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
proj/
    __init__.py
    # 存放配置和启动celery代码
    celery.py
    # 存放任务
    tasks.py

celery.py为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/3',
             backend='redis://localhost:6379/4',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

tasks.py为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启动celery只需要在proj同级目录下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ celery -A proj worker -l info

在django中使用celery

我们的django的项目的目录结构一般如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
proj/
    manage.py
    myapp/
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

想要在django项目中使用celery,我们首先需要在django中配置celery

我们需要在与工程名同名的子文件夹中添加celery.py文件 在本例中也就是proj/proj/celery.py

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 第二个参数为工程名.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

# 括号里的参数为工程名
app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

然后我们需要在同级目录下的init.py文件中配置如下内容 proj/proj/init.py

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

然后我们就可以把需要的任务放到需要的app下的tasks.py中,现在项目目录结构如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
proj/
    manage.py
    myapp1/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    myapp2/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

可能的一个tasks.py文件内容如下: myapp1/tasks.py为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time


@shared_task
def add(x, y):
    # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
    time.sleep(5)
    print(x+y)
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

@shared_task修饰器可以让你创建task不需要app实体

在需要的地方调用相关任务即可,例如在myapp1/views.py中调用

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from django.shortcuts import render
from .tasks import add


def index(request):
    # 测试celery任务
    add.delay(4,5)
    return render(request,'index.html')

然后就可以启动项目,celery需要单独启动,所以需要开两个终端,分别

启动web应用服务器

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ python manage.py runserver

启动celery

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ celery -A proj worker -l info

然后访问浏览器就可以在启动celery的终端中看到输出

扩展
  • 如果你的项目需要在admin中管理调度,请使用django-celery-beat
  1. 使用pip安装django-celery-beat
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ pip install django-celery-beat 

不要在使用django-celery,这个项目已经停止更新好好多年。。。。

  1. 在settings.py中添加这个app
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
INSTALLED_APPS = (     ...,     'django_celery_beat', ) 
  1. 同步一下数据库
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ python manage.py migrate 
  1. 设置celery beat服务使用django_celery_beat.schedulers:DatabaseScheduler scheduler
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler 

然后在就可以admin界面看到了。

  • 如果你想使用Django-ORM或者Django Cache作为后端,需要安装django-celery-results扩展(笔者不建议)
  1. 使用pip安装django-celery-results
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ pip install django-celery-results 

不要在使用django-celery,这个项目已经停止更新好好多年。。。。

  1. 在settings.py中添加这个app
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
INSTALLED_APPS = (     ...,     'django_celery_results', ) 
  1. 同步一下数据库
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$ python manage.py migrate django_celery_results 
  1. 配置后端,在settings.py中配置
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 使用数据库作为结果后端 CELERY_RESULT_BACKEND = 'django-db'  # 使用缓存作为结果后端 CELERY_RESULT_BACKEND = 'django-cache' 

基本使用大概就是上述这些,其他具体配置和使用还需自己研读官方文档

注:

  • 上述环境在ubuntu16.04 lts django1.9中搭建测试成功
  • 上述文字皆为个人看法,如有错误或建议请及时联系我
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-08-20,,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
​反转?美国国家安全局雇员向外国特工泄密,对方实为FBI卧底
当地时间9月29日,据The Record报道,一名30岁的前国家安全局(NSA)雇员在联邦法院被指控,试图向外国政府代表出售与国家网络行动有关的敏感信息。
FB客服
2022/11/14
3590
​反转?美国国家安全局雇员向外国特工泄密,对方实为FBI卧底
FreeBuf 周报 | 马斯克血洗推特安全部门;新形式钓鱼软件针对 Python开发人员
各位 Buffer 周末好,以下是本周「FreeBuf周报」,我们总结推荐了本周的热点资讯、安全事件、一周好文和省心工具,保证大家不错过本周的每一个重点! 热点资讯 1、马斯克血洗Twitter,网络安全部门集体被裁 根据Twitter新任首席信息安全官Lea Kissner发布的消息,原有的网络安全部门集体被裁撤。 2、头铁!医疗保险巨头拒绝向黑客支付赎金 澳大利亚最大的医疗保险公司 Medibank 明确表示,不会向对其发动网络攻击并窃取内部数据的犯罪分子支付赎金。 3、将机密藏在三明治
FB客服
2023/03/29
5500
FreeBuf 周报 | 马斯克血洗推特安全部门;新形式钓鱼软件针对 Python开发人员
全解美军“水下黑客平台”安纳波利斯号核潜艇
7月底,据《华盛顿邮报》报道,两位美国海军官员在华盛顿国际战略研究会议上声称,潜艇是美国网络战略的重要组成部分,它们保护国家免受网络攻击,其在执行任务过程中发挥着重要作用。 美国潜艇项目执行官、海军少将迈克尔贾巴利说:“它们具有一种令我们高度重视的攻击能力,在这方面,我不能说太多,只能说这类潜艇活跃于前沿阵地,在最高技术层面上积极地做着它们应该做的事情。” 另外,两位海军官员还谈到水下无人航行器配合潜艇对敌方海域进行干扰或黑客行动,以扩大潜艇网络作战系统的能力。而“安纳波利斯”号核潜艇(USS Anna
FB客服
2018/02/08
1.4K0
全解美军“水下黑客平台”安纳波利斯号核潜艇
技术天才、无人驾驶元老承认窃取谷歌机密给Uber!面临2年监禁
上周,硅谷顶尖自动驾驶技术工程师、前谷歌高管Anthony Levandowski承认了他此前从谷歌窃取了关于自动驾驶技术的商业机密。
大数据文摘
2020/04/01
3490
最小年龄仅5岁!盘点全球最“天才”少年黑客 TOP 10
你还能想起自己8岁的时候,每天都在玩什么吗?可能是在楼下和小朋友一起捉迷藏?在家追一本连载的漫画书?又或者在电脑上玩种菜偷菜的小游戏?
FB客服
2023/08/08
1.1K0
最小年龄仅5岁!盘点全球最“天才”少年黑客 TOP 10
苹果汽车项目前员工认罪!跳槽小鹏汽车前,窃取了无人驾驶机密
8月23日消息,根据美国加州北区地区法院于当地时间8月22日公布的一份文件显示,针对苹果2018年7月12日起诉华裔前员工张晓浪(Xiaolang Zhang)窃取其无人驾驶机密案的指控,张晓浪已经承认了其中的一项指控,并签署了认罪协议。法院将于2022年11月14日下午1:30进行重新判刑。
芯智讯
2022/08/25
4060
苹果汽车项目前员工认罪!跳槽小鹏汽车前,窃取了无人驾驶机密
卧底揭秘:英国竟是印度黑客雇佣行业的大金主
非法黑客,也被称为“雇佣黑客”,活跃在世界各地。印度雇佣黑客在国际上十分活跃,收取高额费用入侵重要国家和人物的电子邮件和手机。 11月6日,《星期日泰晤士报》(The Sunday Times)和调查新闻局(Bureau of Investigative Journalism)的一项卧底调查,通过一个印度“雇佣黑客”团伙内部泄露的数据库,揭露了一些印度黑客替伦敦的企业情报公司入侵英国企业、记者和政界人士的电子邮件账户等攻击行为。 这些卧底爆料在西方企业情报圈引起了恐慌,因为许多企业都曾使用过印度雇佣黑客,尤
FB客服
2023/03/29
5030
卧底揭秘:英国竟是印度黑客雇佣行业的大金主
那个靠大数据抓到拉登的公司
据说,一家名叫帕兰提尔(Palantir)的初创公司帮助美军捕杀了奥萨马·本·拉登(Osama bin Laden)。自从这个传闻开始传播以来,阿历克斯·卡普(Alex Karp)就没有了太多独处的时间。 这位帕兰提尔公司的CEO现年45岁,身材消瘦,乱蓬蓬的卷发给人以头重脚轻之感。 7月份一个艳阳高照的早晨,他在斯坦福大学青草葱葱的山坡上漫步,不远处耸立着外号叫“大盘子”的巨型卫星天线。这是他特别喜欢的沉思时刻。 但他的独处在某种程度上被“迈克”(Mike)打破了。这位沉默寡言的海军陆战队退役士兵身高
CSDN技术头条
2018/02/09
8190
那个靠大数据抓到拉登的公司
国际知名“青少年黑客”盘点
全美大学生网络防御大赛(NCCDC)是一项年度盛会,旨在让大学生参与到网络安全活动中来。像往常一样,今年,这些孩子正投身于这场激烈的网络防御战中,但是有变化的是,其中许多竞争者过去都有过黑帽攻击经历,他们入侵的系统包括胰岛素泵、航空电子系统以及生啤保鲜机系统等。 当然,年轻人参与黑客攻击的历史已经相当长远,提到“少年天才”不由得脑海里就会浮现很多面容、名字以及他们不朽的事迹。这是一篇拜会“黑客祖师爷”的文章。让我们回到现代互联网还没有成型的那个年代,看看那些注定不凡的天才少年们: 1. James Ko
FB客服
2018/02/26
1.3K0
国际知名“青少年黑客”盘点
希拉里PK川普激战正酣,看看黑客如何“干预”美国总统大选
昨天的美国大选第二轮辩论,相信大家都有所耳闻,撕得那是一个风生水起,好生热闹!不过今年除了两位总统候选人备受关注之外,黑客攻击干扰大选的新闻也是屡屡见诸报端,引发了激烈的讨论。 美国大选流程 在谈黑客
FB客服
2018/02/09
9480
希拉里PK川普激战正酣,看看黑客如何“干预”美国总统大选
中东泥潭里的伊朗,网络战能力是否被严重低估?
最近的网络空间,并不安宁。美国总统特朗普在接受媒体采访时证实,他于2018年批准了对俄罗斯互联网研究院的网络攻击,发动攻击的正是由特朗普在2017年下令组建的美军网络司令部。
FB客服
2020/07/28
6850
【史上最全】计算机的编年史
前几天我写算力简史的时候,顺便整理了一份计算机技术的编年史(将近一万字)。今天发给大家,以供参考。
鲜枣课堂
2023/08/21
1.1K0
【史上最全】计算机的编年史
计算机的发展史,让你想到了什么?
对世界保持一份好奇心很重要,为什么这么说呢?有天使投资人问比尔盖茨说:你最害怕什么?比尔盖茨回答很妙,他所:我最怕那些躲在车库里头捣鼓新玩意儿的年轻人。这显然是一个很幽默的回答。但是你觉得他说的有道理吗?
杨永贞
2022/10/31
1K0
计算机的发展史,让你想到了什么?
“超级天才”冯·诺依曼与原子弹的诞生
20世纪40年代的约翰·冯·诺伊曼(John von Neumann,1903-1957)。图源:维基百科
用户9861443
2023/12/19
4190
“超级天才”冯·诺依曼与原子弹的诞生
世界算力简史(上)
1946年2月14日,在美国宾夕法尼亚州东南部的费城,人们正在像以往一样正常工作和生活。
鲜枣课堂
2023/08/21
4010
世界算力简史(上)
曾凭借大力投资科学引领全球,美国的创新引擎是如何失速的?
1940年6月,世界的未来陷于僵持之中。德国一个月前攻陷了荷兰、比利时和法国,纳粹的胜利让人触目惊心。凭借军事的新技术和新战略,德国演示了一种新的战争形式:闪电战、重武器和空中优势。从理论上来说,根据传统思维,英、法联军应该能够阻挡德军的进攻。然而,仅仅六周时间,英军被打散,连滚带爬地实行敦刻尔克大撤退。巴黎失陷。
大数据文摘
2021/07/06
2900
2022 年全球数据泄露事件 TOP 100 | FreeBuf 年度盘点
数字化时代,数据已然是一种战略资源,是企业发展经营的“催化剂”,企业拥有的数据规模以及数据处理能力,决定其是否具备核心竞争力,因此数据成为了网络犯罪分子眼中的“摇钱树”。
FB客服
2023/02/10
2.2K0
2022 年全球数据泄露事件 TOP 100 | FreeBuf 年度盘点
算力简史(完整版)
引言:今天这篇文章,我将给大家详细介绍一下人类算力的演进过程。这是一段波澜壮阔的历史,值得我们驻足与回忆。
鲜枣课堂
2023/11/17
6020
算力简史(完整版)
重温计算机简史:从石头计数到计算机
计算机始祖 谁都知道,电脑的学名叫做电子计算机。以人类发明这种机器的初衷,它的始祖应该是计算工具。英语里“Calculus”(计算)一词来源于拉丁语,既有“算法”的含义,也有肾脏或胆囊里的“结石”的意思。远古的人们用石头来计算捕获的猎物,石头就是他们的计算工具。著名科普作家阿西莫夫说,人类最早的计算工具是手指,英语单词“Dight”既表示“手指”又表示“整数数字”;而中国古人常用“结绳”来帮助记事,“结绳”当然也可以充当计算工具。石头、手指、绳子……,这些都是古人用过的“计算机”。 不知何时,许多国家的人
大数据文摘
2018/05/24
1.4K0
社会工程:攻击系统、国家和社会(一)
社会工程是一种极其有效的攻击过程,超过 80% 的网络攻击,其中超过 70% 是来自国家级别的,都是通过利用人类而不是计算机或网络安全漏洞发起和执行的。因此,要构建安全的网络系统,不仅需要保护构成这些系统的计算机和网络,还需要对其人类用户进行安全程序的教育和培训。
ApacheCN_飞龙
2024/05/24
2880
推荐阅读
相关推荐
​反转?美国国家安全局雇员向外国特工泄密,对方实为FBI卧底
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档