在开发Web应用时,常遇到这样的需求:每天凌晨3点自动备份数据库、每10分钟抓取一次API数据、每周一9点发送周报邮件。这些看似简单的定时任务,若用time.sleep()
循环实现,会面临进程崩溃后任务中断、修改时间需重启程序、多任务互相阻塞等问题。而APScheduler(Advanced Python Scheduler)的出现,彻底解决了这些痛点。
APScheduler的设计理念类似于乐高积木,通过组合四大核心组件实现灵活调度:
run_date="2025-10-10 08:00:00"
minutes=5
表示每5分钟一次hour=8, minute=30
表示每天8:30执行from apscheduler.triggers.cron import CronTrigger
# 每月1号凌晨2点执行
trigger = CronTrigger(day=1, hour=2)
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
'default': ThreadPoolExecutor(20), # 线程池最大20线程
'processpool': ProcessPoolExecutor(5) # 进程池最大5进程
}
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')
}
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
timezone='Asia/Shanghai'
)
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def print_time():
print(f"当前时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
scheduler = BlockingScheduler()
scheduler.add_job(print_time, 'interval', seconds=5)
scheduler.start()
运行效果:
当前时间: 2025-10-09 14:00:00
当前时间: 2025-10-09 14:00:05
当前时间: 2025-10-09 14:00:10
...
from apscheduler.schedulers.blocking import BlockingScheduler
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
def send_email():
msg = MIMEText("这是定时发送的测试邮件", 'plain', 'utf-8')
msg['From'] = "your_email@qq.com"
msg['To'] = "recipient@example.com"
msg['Subject'] = "APScheduler测试邮件"
with smtplib.SMTP_SSL("smtp.qq.com", 465) as server:
server.login("your_email@qq.com", "your_auth_code")
server.sendmail("your_email@qq.com", ["recipient@example.com"], msg.as_string())
print("邮件发送成功")
scheduler = BlockingScheduler()
# 设置2025年10月10日15点执行
scheduler.add_job(send_email, 'date', run_date=datetime(2025, 10, 10, 15, 0))
scheduler.start()
关键点:
run_date
支持datetime对象或字符串格式from apscheduler.schedulers.background import BackgroundScheduler
import requests
def fetch_weather():
try:
response = requests.get("https://api.example.com/weather")
print(f"天气数据: {response.json()}")
except Exception as e:
print(f"抓取失败: {str(e)}")
scheduler = BackgroundScheduler()
# 每天8:30执行
scheduler.add_job(fetch_weather, 'cron', hour=8, minute=30)
scheduler.start()
# 保持程序运行(Web应用中通常不需要)
import time
while True:
time.sleep(1)
Cron表达式详解:
字段 | 允许值 | 特殊字符 |
---|---|---|
年 | 1970-2099 | , - * / |
月 | 1-12 | , - * / |
日 | 1-31 | , - * ? / L W |
周 | 0-6 (0是周日) | , - * ? / L # |
时 | 0-23 | , - * / |
分 | 0-59 | , - * / |
秒 | 0-59 | , - * / |
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
'default': SQLAlchemyJobStore(url='mysql://user:pass@localhost/apscheduler')
}
scheduler = BackgroundScheduler(jobstores=jobstores)
# 即使程序重启,任务也会从数据库恢复
# 添加任务
def dynamic_task():
print("动态添加的任务执行了")
job = scheduler.add_job(dynamic_task, 'interval', minutes=1, id='dynamic_job')
# 暂停任务
scheduler.pause_job('dynamic_job')
# 恢复任务
scheduler.resume_job('dynamic_job')
# 删除任务
scheduler.remove_job('dynamic_job')
# 获取所有任务
all_jobs = scheduler.get_jobs()
for job in all_jobs:
print(f"任务ID: {job.id}, 下次执行时间: {job.next_run_time}")
import logging
logging.basicConfig(
filename='scheduler.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def safe_task():
try:
# 可能出错的代码
1 / 0
except Exception as e:
logging.error(f"任务执行失败: {str(e)}")
raise # 重新抛出异常让APScheduler记录
scheduler.add_job(safe_task, 'interval', seconds=5)
# 使用Redis作为任务存储和锁机制
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.base import ConflictingIdError
jobstores = {
'default': RedisJobStore(host='localhost', port=6379, db=0)
}
# 配合分布式锁使用(需额外实现)
def distributed_task():
try:
# 获取锁
if acquire_lock("task_lock"):
# 执行任务
print("执行分布式任务")
# 释放锁
release_lock("task_lock")
except ConflictingIdError:
print("其他实例正在执行该任务")
# 明确设置时区
from pytz import timezone
scheduler = BackgroundScheduler(timezone=timezone('Asia/Shanghai'))
# 或者在CronTrigger中指定
scheduler.add_job(
my_job,
'cron',
hour=8,
minute=30,
timezone='Asia/Shanghai'
)
# 限制同一任务的并发实例数
scheduler.add_job(
my_job,
'interval',
minutes=1,
max_instances=3 # 最多同时运行3个实例
)
# 对于耗时任务,考虑使用进程池
executors = {
'default': ProcessPoolExecutor(5) # 最多5个进程
}
Flask示例:
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
app = Flask(__name__)
scheduler = BackgroundScheduler()
def cron_job():
print("Flask应用中的定时任务执行了")
@app.route('/')
def index():
return "APScheduler与Flask集成成功"
if __name__ == '__main__':
scheduler.add_job(cron_job, 'cron', minute='*/1') # 每分钟执行
scheduler.start()
app.run()
Django示例:
# 在apps.py中初始化
from django.apps import AppConfig
from apscheduler.schedulers.background import BackgroundScheduler
class MyAppConfig(AppConfig):
name = 'myapp'
def ready(self):
scheduler = BackgroundScheduler()
scheduler.add_job(my_django_task, 'interval', hours=1)
scheduler.start()
方案 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
APScheduler | 复杂定时任务,需要持久化 | 功能全面,支持多种触发器 | 需要手动管理 |
Celery Beat | 分布式任务队列 | 与Celery无缝集成 | 依赖消息队列,配置复杂 |
schedule | 简单定时任务 | 纯Python实现,无需依赖 | 功能有限,不支持持久化 |
Airflow | 工作流管理 | 强大的DAG支持 | 重量级,适合大数据场景 |
max_instances
print_jobs()
方法调试任务APScheduler就像一个智能的闹钟系统,它不仅能准时提醒,还能根据复杂规则灵活调整。通过合理配置四大组件,你可以轻松实现从简单的每分钟执行到复杂的每月第一个周一这样的定时任务需求。在实际项目中,建议从内存存储+线程池的简单配置开始,随着需求增长逐步引入数据库持久化和进程池执行器,最终打造出稳定可靠的企业级定时任务系统。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。