在 2020 年 12 月 17 日 Apache Airflow 团队发布了 Apache Airflow 2.0.0。当时就想写写 Airflow 的新特性,但是粗略的看了下《Apache Airflow 2.0 is here!》这篇文章,发现 Airflow2.0 是一个超级大的版本更新,不仅仅 UI 更新了,最核心的组件 Scheduler 性能也有了极大的提升,分布式环境下的高可用模型也做了改变,同时还有 Airflow 上的 Operator 和 Hook 也做了新的分门别类,对于这个版本在复杂的生产环境下是否能稳定运行,感到一丝怀疑,遂后面没有在关注了。
等了半年后,注意到 Airflow 已经发布版本到 2.1.1 了,而且Airflow 1.0+的版本也即将不再维护,自己也做了小规模测试,基本上可以确定 Airflow2.0 可以作为生产环境下的版本了,遂有了这篇文章,对 Airflow2.0 简单介绍下。
参考:https://github.com/apache/airflow/blob/main/UPDATING.md。目前为止 Airflow 2.0.0 到 2.1.1 的版本更新没有什么大的变化,只是一些小的配置文件和行为逻辑的更新,比如Dummy trigger在2.1.1版本过时了、DAG concurrency 配置更改了。
所以最大的版本更新还是在于 Airflow2.0.0,在这一次版本更新里,包括了:
这块的话,取决于个人审美吧,毕竟只是一个调度系统,长啥样都没有什么影响。具体可以参考下面这个动图:
新的方法对依赖关系的处理更清晰,XCom 也更易于使用。第一次看到这种的调度配置方式,还是在 prefect 调度系统上,感兴趣的话,可以看看:https://listen-lavender.gitbook.io/prefect-docs/gettingstarted/whynotairflow。
我认为这种新的配置调度方式的引入,极大改善了如何调度机器学习模型的配置任务,写过用 Airflow 调度机器学习模型的读者可以比较下,TaskFlow API 会更好用。
TaskFlow API 像下面这样:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
@dag(default_args={'owner': 'airflow'}, schedule_interval=None, start_date=days_ago(2))
def tutorial_taskflow_api_etl():
@task
def extract():
return {"1001": 301.27, "1002": 433.21, "1003": 502.22}
@task
def transform(order_data_dict: dict) -> dict:
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
print("Total order value is: %.2f" % total_order_value)
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_etl_dag = tutorial_taskflow_api_etl()
Fully specified REST API (AIP-32)
对于 Scheduler 性能优化的想法从 2019 年 03 月 02 日就开始了,到 2.0.0 终于实现了突破,这部分的历程可以看看 AIP-15(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651)。
之前 Scheduler 的分布式执行是使用主从模型,但是在 Airflow 2.0 改成了主主模型,我的理解是就是基于元数据库,所有的 Scheduler 都是对等的。带来的优势就是:
对于某个单 Scheduler 来说,1.7 就引入了 DAG 序列化,通过使 Web 服务器无需解析 DAG 文件而允许它读取序列化的DAG,大大提高了 DAG 文件的读取性能。Airflow 2.0 Scheduler 通过使用来自数据库的序列化后 DAG 进行任务调度和调用,扩展了 DAG 序列化的使用。这减少了重复解析 DAG 文件以进行调度所需的时间。
当然最后有一份性能对比,可以看看:
这块用的不多,就直接摘抄了。
Airflow 2.0 重新建立了 KubernetesExecutor 架构,为 Airflow 用户提供更快、更容易理解和更灵活的使用方式。用户现在可以访问完整的 Kubernetes API 来创建一个 .yaml pod_template_file,而不是在 airflow.cfg 中指定参数。 此外还用pod_override参数替换了executor_config词典,此项变化从 KubernetesExecutor 删除了三千多行代码,使其运行速度更快,并减少潜在错误。
Airflow 终于将 operator,sensor或hook 拆分为 60 多个 packages,而不是都放在一起了。最简单的例子就是:让 airflow.contrib 变得非常大,以至于依赖管理以及下一版本的计划和测试都变得充满挑战。
在Airflow 2.0中,已根据可与Airflow一起使用的外部系统对模块进行了重组。这意味着,如果您想使用与AWS相关的operators,而不是与GCP和Kubernetes相关的operators,则只能使用Amazon提供程序子软件包安装Airflow:
pip install apache-airflow[amazon]
这项更改意义重大,因为它可以使关注点分离,更快的特定组件发布周期以及更干净的组织结构,使您可以在其中找到与特定外部系统相关的代码。从早期版本迁移工作流时,请确保使用正确的导入。例如,
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator
传感器(sensors)非常棘手,因为它们一直在寻找状态,并且可能会消耗大量资源。在新版本中,Airflow引入了对传感器逻辑的更改,以使其更加节省资源和更智能。就个人而言,我倾向于使用事件驱动的AWS Lambda函数处理用例,这些用例通常在Airflow中通过传感器使用(例如,当特定文件到达S3后立即触发管道)。但是,此功能对于许多希望将所有工作流程保持在一个地方而不是依赖于FaaS进行事件驱动的人来说非常有用。
SubDAG 通常用于在 UI 中对任务进行分组,但它们的执行行为有许多缺点(主要是它们只能并行执行单个任务!)为了改善这种体验,我们引入了“TaskGroup”:一种用于组织任务提供与 subdag 相同的分组行为,而没有任何执行时间缺陷。
可惜的是,Airflow 的调度时间问题依然没有得到解决。2.0 最大的更新我认为是 Scheduler 性能的提升,这真的是让我惊讶了,毕竟之前老版本 Scheduler 对 DAG 文本文件的解析是真的慢,现在改造成了序列化的方式,快了不止一点。其它的话,TaskFlow API的引入,会帮助 Airflow 更好的兼容机器学习模型的部署和调度。
参考链接:
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有