对于任务调度的实现方案,其实开源的项目有很多。
我先说说对于任务调度的认识,如果从数据库层面来说,任务调度就是scheduler,这一点在Oracle中体现的更为细致。
Oracle中创建scheduler,在后台运行JOB完成数据的处理,基本上会把一个任务拆分成不同的几个维度属性。
而如果任务很多,有大批量的任务需要处理,而且任务位于不同的服务器环境中,那么这个复杂度就会大大增加,所以引入消息队列的方式就是一个很自然的方式。
消息队列目前有很多种可选方案,比如Redis,RabbitMQ等,根据自己的需求满足要求即可。
首先我们需要确认celery已正常安装。
>pip list|grep celery
celery (3.1.20)
celery-with-redis (3.0)
django-celery (3.2.2)
如果是在Django中在较新的版本中,也是自带的,我们来快速体验一下Django Celery的功能。
创建一个项目
django-admin startproject django_celery
初始化一个应用
cd django_celery
django-admin startapp celery_app
我们修改settings.py的配置。
在这里需要说明的是,如果我们不用Redis,RabbitMQ的话,测试使用自带的broker服务也是可以的。
如果启用自带的配置,settings.py的配置如下:
INSTALLED_APPS = (
'celery_app',
'djcelery',
)
BROKER_URL = 'django://localhost:8000//'
如果是用RabbitMQ,我们需要单独部署安装这个消息队列,可用
yum install rabbitmq-server即可,本身这个项目是用erlang开发的,所以会安装大量的erlang相关的包。
settings.py的配置如下:
import djcelery
djcelery.setup_loader()
BROKER_URL= 'amqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'
# Application definition
INSTALLED_APPS = (
'celery_app',
'djcelery',
)
然后我们配置任务的信息,在django-celery项目目录下,创建文件celery.py
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
app = Celery('django_celery')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: '.format(self.request))
在应用celery_app的目录下,创建任务tasks.py,我们定义了几个方法供调用。
from __future__ import absolute_import
from celery import shared_task
import time
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
time.sleep(10)
return x * y
@shared_task
def xsum(numbers):
time.sleep(10)
return sum(numbers)
基础配置完成后,我们来试用一下。
然后配置DB的信息,使用命令
python manage.py syncdb
这个过程会提示你创建一个超级用户,照做就可以了。
启动服务
python manage.py runserver
然后打开另外一个窗口,启动celery的服务
python manage.py celery worker -l info
这个过程很可能会跑出警告:
root@localhost django_celery]# python manage.py celery worker -l info
Running a worker with superuser privileges when the
worker accepts messages serialized with pickle is a very bad idea!
If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).
User information: uid=0 euid=0 gid=0 egid=0
其实这个意思很明确,如果确认需要,要设置变量C_FORCE_ROOT,风格和sandbox很类似。
export C_FORCE_ROOT=test
>python manage.py celery worker -l info
可以从启动日志看到task的信息:
[tasks]
. celery_app.tasks.add
. celery_app.tasks.mul
. celery_app.tasks.xsum
再次开启一个新的会话,这算是会话3,我们开启shell交互窗口。
>>> from celery_app.tasks import *
['__builtins__', 'absolute_import', 'add', 'mul', 'shared_task', 'xsum']
>>> mul(5,2)
10
这个时候如果使用delay,add的方式,就会进入消息队列。
>>> mul.delay(5,2)
>>> add.delay(2,3)
查看worker的日志信息如下:
[2018-01-08 14:34:47,505: INFO/MainProcess] Received task: celery_app.tasks.add[bac53d49-24cf-4d07-8515-8eff8083cab9]
[2018-01-08 14:34:47,507: INFO/MainProcess] Task celery_app.tasks.add[bac53d49-24cf-4d07-8515-8eff8083cab9] succeeded in 0.0008037839998s:6
使用RabbitMQ的日志是类似的。
如果要启用flower界面,也是分分钟搞定。
安装flower:
pip install flower
启动服务
python manage.py celery flower
访问端口:
http://127.0.0.1:5555/
领取专属 10元无门槛券
私享最新 技术干货