什么是AutoLine开源平台
AutoLine开源平台是一个开源自动化测试解决方案,基于RobotFramework进行二次开发,支持RobotFramework几乎所有的库。
源码地址
github地址: https://github.com/small99/AutoLine
码 云 地 址:https://gitee.com/lym51/AutoLine
AutoLine采用了Apscheduler库来实现AutoLine的测试用例的执行任务的调度管理
什么是Apscheduler
APScheduler:Python下强大的任务调度工具,可以完成定时任务,周期任务等,它是跨平台的,用于取代Linux下的cron daemon或者Windows下的task scheduler。
Apscheduler内置三种调度调度系统:
Cron风格
间隔性执行
仅在某个时间执行一次
在AutoLine开源平台中,我们采用了cron风格的支持以实现自由灵活的调度控制
Apscheduler支持哪些存储方式
Memory
SQLAlchemy (any RDBMS supported by SQLAlchemy works)
MongoDB
Redis
RethinkDB
ZooKeeper
AutoLine开源平台采用了SQLAlchemy MySQL存储调度任务
下面我们一起看看AutoLine下对调度的封装源码:
源码结构如下
__init__ 你懂的
setup 初始化调度器
start 启动调度器
is_running 判断调度是否在运行
shutdown 关闭调度
load_job_list 载入所有项目任务
add_job 新增调度任务
update_job 更新调度任务
remove_job 移除调度任务
pause_job 暂停调度任务
resume_job 恢复调度任务
get_jobs 获取所有任务
print_jobs 在控制台输出所有任务
setup源码分析
setup主要用于配置Apscheduler启动时加载的配置
defsetup(self):
self.scheduler = BackgroundScheduler({
# 设置调度任务存储mysql连接串
'apscheduler.jobstores.default': {
'type':'sqlalchemy',
'url':self.app.config["TRIGGER_DATABASE_URL"]
},
# 设置调度执行器进程池信息
'apscheduler.executors.processpool': {
'type':'threadpool',
'max_workers':'30'
},
# 设置调度其他配置
'apscheduler.job_defaults.coalesce':'false',
'apscheduler.job_defaults.max_instances':'4',
'apscheduler.timezone':'UTC',
})
load_job_list源码分析
用于加载所有设置了有效cron'表达式的项目进行自动调度,一般初始化启动时,调用一次即可
defload_job_list(self):
withself.app.app_context():
# 查询所有项目
projects = AutoProject.query.all()
forpinprojects:
ifp.enableandself.scheduler.get_job(p.id)is None:
cron = p.cron.replace("\n","").strip().split(" ")
# 判断cron表达式是否有效
iflen(cron)
continue
# 新增任务
j=self.scheduler.add_job(func=run_job,
trigger='cron',
name=p.name,
replace_existing=True,
minute=cron[],
hour=cron[1],
day=cron[2],
month=cron[3],
day_of_week=cron[4],
id="%s"% p.id,
args=(p.id,))
else:
self.update_job(p.id)
其他函数就不一一展示了,请直接查阅代码
最后附上Apscheduler的官方手册链接:http://apscheduler.readthedocs.io/en/latest/userguide.html
领取专属 10元无门槛券
私享最新 技术干货