pip install celery
tasks.py
的新文件,并在其中编写你的Celery任务。例如,你可以编写一个每月执行一次的任务:from celery import Celery
app = Celery('tasks', broker='amqp://localhost//')
@app.task
def monthly_task():
# 这里是你的任务逻辑
print("执行每月任务")
app.py
或者__init__.py
文件中:from celery.schedules import crontab
from tasks import app as celery_app
# 配置定时任务调度
celery_app.conf.beat_schedule = {
'monthly_task': {
'task': 'tasks.monthly_task',
'schedule': crontab(day_of_month='1')
},
}
celery -A tasks beat
celery -A tasks worker --loglevel=info
现在,每当月份变为1时,就会执行monthly_task
任务。
在设置Celery时,你还需要配置一个消息代理(例如RabbitMQ或Redis)以及结果存储(例如Redis或数据库)等。在此示例中,我们使用的是AMQP(RabbitMQ)作为消息代理。你可以根据你的需求进行配置和更改。
schedule
模块。以下是一个例子:import schedule
import time
def job():
print("执行定时任务")
# 定义每天的定时任务时间段
start_time = "08:00"
end_time = "17:00"
# 将时间段字符串转换为时间对象
start_hour, start_minute = map(int, start_time.split(":"))
end_hour, end_minute = map(int, end_time.split(":"))
# 设置定时任务
schedule.every().day.at(start_time).do(job)
# 持续运行定时任务直到结束时间
while True:
current_time = time.localtime(time.time())
if (current_time.tm_hour, current_time.tm_min) >= (start_hour, start_minute) and \
(current_time.tm_hour, current_time.tm_min) <= (end_hour, end_minute):
schedule.run_pending()
time.sleep(60) # 每隔60秒检查一次定时任务
在上面的例子中,我们首先定义了一个名为job
的函数,该函数代表我们要执行的定时任务。然后,我们将指定的开始时间和结束时间转换为小时和分钟的时间对象。接下来,我们使用schedule.every().day.at(start_time).do(job)
将job
函数定义为每天指定的时间执行的定时任务。
最后,我们使用一个无限循环来检查当前时间是否在指定的时间段内,并运行schedule.run_pending()
来执行任务。我们还使用time.sleep(60)
来每隔60秒检查一次定时任务。
请注意,此示例中的时间判断是基于当前系统时间。如果要在不同的时区或使用其他时间参考,请相应地调整代码。