
咱们做自动化脚本的时候,经常会遇到 “定时执行” 的需求 —— 比如每天早上 8 点抓数据、每周一晚上发报表、每隔 10 秒检查一次接口。要是手动跑这些脚本,早晚得累死;用 Linux 的 crontab 吧,跨平台又不方便。
今天就教你一个 Python 神器 ——schedule包,轻量级、API 简单,零基础也能 10 分钟上手,从 “秒级任务” 到 “每周定点任务” 都能搞定,还能解决任务阻塞、多任务管理这些头疼问题。
schedule是第三方包,得先装了才能用。不管你是 Windows、Mac 还是 Linux,打开终端 / 命令提示符,输一行命令就行:
pip install schedulepip install schedule -i https://mirrors.aliyun.com/pypi/simple/sudo,Windows 用 “管理员身份” 打开命令提示符。schedule的核心逻辑特别简单:定义 “多久执行一次”+“执行什么函数”+“循环检查任务”。下面把所有常用的定时方式整理成表格,每个都配可直接复制运行的代码。
定时需求 | schedule 方法 | 代码示例 | 说明 |
|---|---|---|---|
每隔 N 秒执行 | every(N).seconds | every (5).seconds.do (函数) | N 是整数,比如 5 就是每 5 秒 |
每隔 N 分钟执行 | every(N).minutes | every (3).minutes.do (函数) | 比如 3 就是每 3 分钟 |
每隔 N 小时执行 | every(N).hours | every (2).hours.do (函数) | 比如 2 就是每 2 小时 |
每天固定时间执行 | every().day.at("HH:MM") | every ().day.at ("08:30").do (函数) | 时间格式是 “小时:分钟”,比如 08:30 就是早 8 点半 |
每周固定星期 + 时间执行 | every (). 星期.at ("HH:MM") | every ().monday.at ("18:00").do (函数) | 星期用小写:monday/tuesday/.../sunday |
每月固定日期 + 时间执行 | every().month.at("DD HH:MM") | every ().month.at ("05 10:00").do (函数) | 比如 05 10:00 就是每月 5 号 10 点 |
先写个最基础的 “每秒打印一句话”,理解核心流程:
import schedule # 导入schedule包
import time # 导入时间模块,用来控制循环
# 1. 定义要执行的任务(就是一个普通函数)
def print_hello():
print(f"当前时间:{time.strftime('%H:%M:%S')},执行任务啦!")
# 2. 设置定时规则:每1秒执行一次print_hello
schedule.every(1).seconds.do(print_hello)
# 3. 循环检查任务(关键!没有这步任务不会执行)
while True:
schedule.run_pending() # 检查有没有到点要执行的任务
time.sleep(0.1) # 暂停0.1秒,避免占用太多CPU(别省这行!)复制上面的代码,运行后会看到每秒打印一次时间,这就说明定时任务生效了。
再试一个 “每天 15:15 执行” 和 “每周一 18:00 执行” 的组合:
import schedule
import time
def daily_task():
print(f"每日任务:{time.strftime('%Y-%m-%d %H:%M:%S')} - 抓取今日数据")
def weekly_task():
print(f"每周任务:{time.strftime('%Y-%m-%d %H:%M:%S')} - 生成周报表")
# 每天15:15执行每日任务
schedule.every().day.at("15:15").do(daily_task)
# 每周一18:00执行每周任务
schedule.every().monday.at("18:00").do(weekly_task)
# 循环检查
while True:
schedule.run_pending()
time.sleep(60) # 这里可以设60秒,因为任务是按分钟/小时的,不用查那么勤while True循环:schedule不会自己启动,得靠这个循环不断调用schedule.run_pending()检查任务。time.sleep()别省略:如果不暂停,循环会一秒跑几十万次,CPU 直接拉满,电脑会变卡。实际用的时候,任务函数经常需要传参数(比如 “给指定用户发邮件” 需要传用户名),而且重复写schedule.every().do(...)也麻烦。这部分教你两个技巧:传参和装饰器。
比如我们有个任务,需要打印 “给 XX 发送了 XX 消息”,需要传两个参数:username和msg。
do()里传参(简单直观)import schedule
import time
# 带参数的任务函数
def send_msg(username, msg):
print(f"{time.strftime('%H:%M:%S')} - 给{username}发送消息:{msg}")
# 传参:位置参数直接跟在函数后面,关键字参数用key=value
schedule.every(2).seconds.do(
send_msg, # 任务函数
username="张三", # 第一个参数
msg="该打卡了!" # 第二个参数
)
# 循环检查
while True:
schedule.run_pending()
time.sleep(0.1)functools.partial(适合参数多的场景)如果参数特别多,直接传会很乱,用functools.partial把 “函数 + 参数” 打包成一个新函数:
import schedule
import time
from functools import partial # 导入partial
def send_email(to, subject, content):
print(f"{time.strftime('%H:%M:%S')} - 给{to}发邮件:主题[{subject}],内容[{content}]")
# 打包函数和参数
email_task = partial(
send_email,
to="test@example.com",
subject="每日报表",
content="今日数据已更新"
)
# 直接用打包好的函数
schedule.every(3).seconds.do(email_task)
while True:
schedule.run_pending()
time.sleep(0.1)do())schedule提供了@repeat装饰器,可以直接把定时规则写在函数上面,不用再写schedule.every().do(...),代码更短。
import schedule
import time
from schedule import repeat, every # 导入repeat和every装饰器
# 用装饰器指定定时规则:每2秒执行一次
@repeat(every(2).seconds)
def print_time():
print(f"当前时间:{time.strftime('%H:%M:%S')}")
# 循环检查(这步还是不能少)
while True:
schedule.run_pending()
time.sleep(0.1)带参数的装饰器也能写:
from schedule import repeat, every
import schedule
import time
@repeat(every(3).seconds, username="李四") # 参数直接跟在定时规则后面
def greet(username):
print(f"{time.strftime('%H:%M:%S')} - 你好,{username}!")
while True:
schedule.run_pending()
time.sleep(0.1)当你有多个任务的时候,肯定需要 “取消某个任务”“清空所有任务” 或者 “按类型管理任务”(比如把 “抓取数据” 和 “发送邮件” 分开)。schedule的任务管理功能正好能解决这些问题。
schedule.every().do(...)会返回一个 “任务对象”,只要保存这个对象,后面就能用schedule.cancel_job(任务对象)取消它。
import schedule
import time
def task1():
print("执行任务1")
def task2():
print("执行任务2")
# 1. 保存任务对象(关键!不保存就没法取消)
job1 = schedule.every(1).seconds.do(task1)
job2 = schedule.every(2).seconds.do(task2)
# 2. 模拟5秒后取消task1
time.sleep(5)
print("取消任务1")
schedule.cancel_job(job1) # 取消task1
# 3. 继续循环,此时只有task2在执行
while True:
schedule.run_pending()
time.sleep(0.1)运行后会发现:前 5 秒任务 1 和任务 2 都执行,5 秒后只有任务 2 在执行。
如果有很多任务,一个个取消太麻烦,用schedule.clear()直接清空所有任务:
import schedule
import time
def task1():
print("任务1")
def task2():
print("任务2")
# 添加两个任务
schedule.every(1).seconds.do(task1)
schedule.every(2).seconds.do(task2)
# 3秒后清空所有任务
time.sleep(3)
print("清空所有任务")
schedule.clear()
# 循环继续,但没有任务执行了
while True:
schedule.run_pending()
time.sleep(0.1)给任务加 “标签”(比如tag="数据抓取"),后面可以按标签筛选任务,比如 “取消所有数据抓取相关的任务”。
import schedule
import time
def crawl_data():
print("抓取数据")
def send_report():
print("发送报表")
def check_api():
print("检查接口")
# 给任务加标签:tag参数指定标签
schedule.every(1).seconds.do(crawl_data, tag="数据抓取")
schedule.every(2).seconds.do(send_report, tag="报表")
schedule.every(3).seconds.do(check_api, tag="数据抓取") # 这个也归为“数据抓取”
# 2秒后,取消所有“数据抓取”标签的任务
time.sleep(2)
print("取消所有数据抓取任务")
# 1. 先获取所有“数据抓取”标签的任务
crawl_jobs = schedule.get_jobs(tag="数据抓取")
# 2. 逐个取消
for job in crawl_jobs:
schedule.cancel_job(job)
# 后续只有“发送报表”任务在执行
while True:
schedule.run_pending()
time.sleep(0.1)这是schedule最容易踩坑的地方!默认情况下,schedule是单线程的 —— 如果一个任务执行时间很长(比如调用 API 卡了 5 秒),后面的任务会被 “堵住”,到点也不执行。
比如我们有两个任务:任务 A 每 1 秒执行,任务 B 每 2 秒执行但要耗时 3 秒。单线程下会怎么样?
import schedule
import time
def fast_task(): # 快任务:1秒执行一次
print(f"快任务:{time.strftime('%H:%M:%S')}")
def slow_task(): # 慢任务:2秒执行一次,但要耗时3秒
print(f"慢任务开始:{time.strftime('%H:%M:%S')}")
time.sleep(3) # 模拟任务耗时3秒
print(f"慢任务结束:{time.strftime('%H:%M:%S')}")
# 添加任务
schedule.every(1).seconds.do(fast_task)
schedule.every(2).seconds.do(slow_task)
# 单线程循环
while True:
schedule.run_pending()
time.sleep(0.1)运行后会发现:慢任务开始后,快任务会 “消失” 3 秒,直到慢任务结束才继续 —— 这就是阻塞。
解决思路很简单:把 “慢任务” 放到单独的线程里执行,让主线程继续检查其他任务。Python 的threading模块就能实现,不用学复杂的多线程知识,套个模板就行。
import schedule
import time
import threading # 导入多线程模块
def fast_task():
print(f"快任务:{time.strftime('%H:%M:%S')}")
def slow_task():
print(f"慢任务开始:{time.strftime('%H:%M:%S')}")
time.sleep(3)
print(f"慢任务结束:{time.strftime('%H:%M:%S')}")
# 关键:写一个包装函数,把任务放到线程里执行
def run_in_thread(func):
def wrapper():
threading.Thread(target=func).start() # 启动新线程执行任务
return wrapper
# 给慢任务用包装函数,快任务可以不用(也能用,不影响)
schedule.every(1).seconds.do(fast_task)
schedule.every(2).seconds.do(run_in_thread(slow_task)) # 慢任务走多线程
# 循环不变
while True:
schedule.run_pending()
time.sleep(0.1)改造后再运行,会发现:慢任务执行的时候,快任务依然每秒执行 —— 阻塞问题解决了!
threading.Lock()加锁,比如:lock = threading.Lock() # 创建锁
def write_file(content):
with lock: # 加锁,同一时间只有一个线程能执行下面的代码
with open("data.txt", "a") as f:
f.write(content + "n")concurrent.futures.ThreadPoolExecutor),控制最大线程数。学了这么多,该上实战了!下面三个场景是工作中最常用的,代码都能直接改改就能用。
用requests抓公开天气 API 的数据,每天 9 点保存到文件:
import schedule
import time
import requests
import threading
# 1. 定义抓取函数
def crawl_weather():
city = "北京"
api_url = f"https://wttr.in/{city}?format=3" # 公开天气API,返回简洁结果
try:
response = requests.get(api_url, timeout=10)
response.raise_for_status() # 抛出自定义异常
weather = response.text.strip()
# 保存到文件
with open("weather_log.txt", "a", encoding="utf-8") as f:
f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {city}天气:{weather}n")
print(f"抓取成功:{weather}")
except Exception as e:
print(f"抓取失败:{str(e)}")
# 2. 多线程包装(避免API请求阻塞)
def run_in_thread(func):
def wrapper():
threading.Thread(target=func).start()
return wrapper
# 3. 设置每天9点执行
schedule.every().day.at("09:00").do(run_in_thread(crawl_weather))
# 4. 循环运行
while True:
schedule.run_pending()
time.sleep(60) # 每60秒检查一次用 Python 自带的smtplib和email模块发送邮件,需要先开启邮箱的 “SMTP 服务”(比如 QQ 邮箱要在设置里开,获取授权码):
import schedule
import time
import smtplib
import threading
from email.mime.text import MIMEText
from email.header import Header
# 1. 邮件配置(替换成你的信息)
SMTP_SERVER = "smtp.qq.com" # QQ邮箱SMTP服务器
SMTP_PORT = 587 # 端口
FROM_EMAIL = "你的QQ邮箱@qq.com"
AUTH_CODE = "你的邮箱授权码" # 不是密码,是SMTP授权码
TO_EMAIL = "接收邮箱@example.com"
# 2. 发送邮件函数
def send_weekly_report():
subject = "每周工作报表"
content = """
本周工作总结:
1. 完成数据抓取脚本开发
2. 修复API超时问题
3. 优化定时任务性能
"""
# 构造邮件
msg = MIMEText(content, "plain", "utf-8")
msg["From"] = Header(FROM_EMAIL)
msg["To"] = Header(TO_EMAIL)
msg["Subject"] = Header(subject, "utf-8")
try:
# 连接SMTP服务器发送
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls() # 开启TLS加密
server.login(FROM_EMAIL, AUTH_CODE)
server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string())
print("邮件发送成功")
except Exception as e:
print(f"邮件发送失败:{str(e)}")
# 3. 多线程包装
def run_in_thread(func):
def wrapper():
threading.Thread(target=func).start()
return wrapper
# 4. 设置每周一18点执行
schedule.every().monday.at("18:00").do(run_in_thread(send_weekly_report))
# 5. 循环运行
while True:
schedule.run_pending()
time.sleep(60)这里用 “模拟发帖” 为例(实际项目中需要用对应平台的 API,比如微博开放平台、抖音开放平台):
import schedule
import time
import threading
# 1. 模拟社交媒体发帖(实际替换成平台API调用)
def post_to_weibo(content):
# 这里是模拟,实际需要调用微博API的post接口
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - 微博发帖成功:{content}")
# 2. 定时发帖任务(每小时发不同内容)
def scheduled_post():
hour = time.localtime().tm_hour # 获取当前小时
if 9 <= hour <= 18: # 只在工作时间发帖
content = f"工作时间打卡-{hour}点,今天也要加油!"
else:
content = f"休息时间分享-{hour}点,晚安~"
post_to_weibo(content)
# 3. 多线程包装
def run_in_thread(func):
def wrapper():
threading.Thread(target=func).start()
return wrapper
# 4. 设置每小时执行一次
schedule.every(1).hours.do(run_in_thread(scheduled_post))
# 5. 循环运行
while True:
schedule.run_pending()
time.sleep(60)这部分整理了大家用schedule时最容易遇到的问题,每个问题都讲 “为什么会这样” 和 “怎么解决”。
while True循环:schedule需要不断检查任务,没循环就没机会执行。time.sleep()设得太长:比如任务是每秒执行,你设了time.sleep(10),任务会 10 秒才检查一次,看起来像没执行。every().``day.at``("2000")(少了冒号),schedule会忽略这个错误,但任务不执行。run_pending(),time.sleep()根据任务精度设(秒级任务设 0.1-1 秒,小时级设 60 秒)。at()里的时间必须是 “HH:MM”(比如 “20:00”),月份日期是 “DD HH:MM”(比如 “05 10:00”)。schedule.every(1).``seconds.do``(task),任务会每秒执行两次。time.sleep()设得太短,且任务执行快:比如time.sleep(0.001),run_pending()会一秒检查上千次,可能重复触发任务(但schedule内部有防抖,概率低)。schedule.get_jobs()查看所有任务,确认没有重复:print("当前所有任务:", schedule.get_jobs()) # 打印任务列表,看有没有重复time.sleep(),不要设太短。任务函数里有未捕获的异常(比如 API 请求失败、文件不存在),会导致run_pending()报错,循环中断。
在任务函数里加try-except捕获所有异常,或者用 “包装函数” 统一捕获:
import schedule
import time
# 统一异常捕获的包装函数
def catch_exception(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"任务执行出错:{str(e)}") # 打印错误,但不中断脚本
return wrapper
# 用包装函数装饰任务
@catch_exception
def risky_task():
# 这个任务可能抛出异常(比如除以0)
1 / 0
schedule.every(1).seconds.do(risky_task)
while True:
schedule.run_pending()
time.sleep(0.1)schedule默认用本地时区,如果你的服务器在国外,想按国内时间(UTC+8)执行,就会差 8 小时。
自己计算时区差,比如 UTC 时间比本地时间慢 8 小时(国内),就把执行时间设为 “本地时间 = UTC 时间 + 8”:
import schedule
import time
from datetime import datetime, timezone
# 要按UTC时间12:00执行,本地时间(UTC+8)就是20:00
def utc_task():
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
print(f"UTC时间任务执行:{utc_time}")
# 本地时间20:00 = UTC时间12:00
schedule.every().day.at("20:00").do(utc_task)
while True:
schedule.run_pending()
time.sleep(60)如果面试时被问到 “Python 定时任务”,大概率会提到schedule,下面是常见问题和加分回答。
主要有 4 种,根据场景选:
核心是 “任务存储 + 循环检查”:
schedule.every().do(...)添加任务时,会把 “定时规则” 和 “任务函数” 封装成一个Job对象,存在内部的任务列表里。schedule.run_pending()时,会遍历所有Job对象,检查每个任务的 “下次执行时间” 是否小于当前时间。while True循环,配合time.sleep()减少 CPU 占用。因为 schedule 默认是单线程,一个任务执行久了会阻塞其他任务,解决方案是 “多线程 / 多进程”:
threading模块,把慢任务放到单独的线程里执行(最常用,轻量),比如写个包装函数run_in_thread,内部用threading.Thread(target=func).start()。multiprocessing模块(避免 GIL 锁影响),但多进程开销比多线程大。threading.Lock()加锁,防止数据错乱。之前做过一个 “每 10 秒检查设备状态” 的脚本,遇到两个问题:
every(5).``seconds.do``(func),看一眼就懂。如果是大型项目、分布式场景,建议用 APScheduler 或 Celery;如果是简单脚本,schedule 绝对是性价比最高的选择。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。