首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Airflow -从BigQuery动态生成任务,但任务在之前完成之前重复运行

基础概念

Apache Airflow 是一个用于创建、调度和监控工作流的开源平台。它允许你定义工作流为有向无环图(DAG),并提供了丰富的操作符来执行各种任务,如 Bash 命令、Python 函数、数据库操作等。BigQuery 是 Google 提供的一个完全托管的、可扩展的、服务器less的数据仓库,用于大规模数据集的分析。

动态生成任务

动态生成任务意味着任务的创建不是预先定义的,而是在运行时根据某些条件或数据生成的。在 Airflow 中,这通常通过 PythonOperator 实现,它允许你编写 Python 代码来动态创建任务。

问题描述

当你在 Airflow 中从 BigQuery 动态生成任务时,可能会遇到任务在完成之前重复运行的问题。这通常是由于以下几个原因造成的:

  1. 任务状态未正确更新:Airflow 可能没有正确地记录任务的完成状态,导致任务被错误地重新调度。
  2. 并发问题:如果有多个调度器实例同时运行,可能会导致任务被重复执行。
  3. 依赖关系配置错误:任务之间的依赖关系配置不正确,导致任务在不应该执行的时候被触发。

解决方案

1. 确保任务状态正确更新

确保 Airflow 的数据库中正确记录了任务的完成状态。你可以通过以下方式检查和修复:

代码语言:txt
复制
from airflow.models import TaskInstance

# 检查任务实例的状态
ti = TaskInstance(task_id='your_task_id', execution_date='your_execution_date')
print(ti.state)

如果状态不正确,可以尝试手动更新状态:

代码语言:txt
复制
from airflow.utils.state import State

ti.state = State.SUCCESS
ti.update_state()

2. 避免并发问题

确保只有一个调度器实例在运行。你可以在 Airflow 的配置文件中设置 single_instance=True

代码语言:txt
复制
# airflow.cfg
[scheduler]
single_instance = True

3. 正确配置依赖关系

确保任务之间的依赖关系配置正确。例如,如果你有两个任务 A 和 B,B 依赖于 A 的完成,你可以这样配置:

代码语言:txt
复制
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

with DAG('example_dag', start_date=datetime(2023, 1, 1)) as dag:
    task_a = DummyOperator(task_id='task_a')
    task_b = DummyOperator(task_id='task_b')

    task_b.set_upstream(task_a)

4. 使用 BigQuery 连接器

确保你正确配置了 Airflow 和 BigQuery 的连接。你可以在 Airflow 的 Web UI 中添加一个 BigQuery 连接:

  1. 进入 Airflow Web UI。
  2. 导航到 Admin -> Connections
  3. 添加一个新的连接,选择 BigQuery 类型,并填写相关信息。

示例代码

以下是一个简单的示例,展示如何在 Airflow 中从 BigQuery 动态生成任务:

代码语言:txt
复制
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from google.cloud import bigquery

def generate_tasks():
    client = bigquery.Client()
    query = """
        SELECT id
        FROM your_dataset.your_table
        WHERE status = 'pending'
    """
    results = client.query(query).result()
    
    tasks = []
    for row in results:
        task_id = f"task_{row.id}"
        task = PythonOperator(
            task_id=task_id,
            python_callable=your_task_function,
            op_args=[row.id],
            dag=dag
        )
        tasks.append(task)
    
    return tasks

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

dag = DAG('dynamic_tasks_dag', default_args=default_args, schedule_interval='@daily')

tasks = generate_tasks()

参考链接

通过以上步骤和示例代码,你应该能够解决从 BigQuery 动态生成任务时任务重复运行的问题。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

构建端到端的开源现代数据平台

为了能够信任数据,我们需要对其进行监控并确保基于它生成准确的见解,目前是可选的,因为开始时最有效的选择是利用其他组件的数据测试功能,但我们将在本文中讨论数据监控工具。...现在已经选择了数据仓库,架构如下所示: 进入下一个组件之前,将 BigQuery 审计日志存储专用数据集中[14](附加说明[15]),这些信息设置元数据管理组件时会被用到。...• dbt CLI:此选项允许直接与 dbt Core 交互,无论是通过使用 pip 本地安装它还是像之前部署的 Airbyte 一样 Google Compute Engine 上运行 docker...要允许 dbt 与 BigQuery 数据仓库交互,需要生成所需的凭据(可以创建具有必要角色的服务帐户),然后 profiles.yml 文件中指明项目特定的信息。...部署完成后会注意到虚拟机上实际上运行了四个容器,用于以下目的: • MySQL 上存储元数据目录 • 通过 Elasticsearch 维护元数据索引 • 通过 Airflow 编排元数据摄取 •

5.5K10

Kubernetes上运行Airflow两年后的收获

我将根据形成我们当前 Airflow 实现的关键方面来分割它: 执行器选择 解耦和动态 DAG 生成 微调配置 通知、报警和可观测性 执行器选择 在这里,我们所有的东西都在 Kubernetes 中运行...也许最简单的动态生成 DAG 的方法是使用单文件方法。您有一个文件,循环中生成 DAG 对象,并将它们添加到 globals() 字典中。...当我们首次根据我们的 DBT 项目生成动态 DAG 时,这种方法非常直接(DBT 编排的主题需要单独发布,将在未来完成)。... 建议将其设置为您最长运行任务平均完成时间的 1.5 倍。...在这里,我们 BaseNotifier 类创建了自己的自定义通知器,这样我们就可以根据需要定制通知模板并嵌入自定义行为。例如,开发环境中运行任务时,默认仅将失败通知发送到 Slack。

35110
  • Agari使用Airbnb的Airflow实现更智能计划任务的实践

    之前的文章中,我描述了我们如何利用AWSAgari中建立一个可扩展的数据管道。...查询数据库中导出记录的数量 把数量放在一个“成功”邮件中并发送给工程师 随着时间的推移,我们根据Airflow的树形图迅速进掌握运行的状态。...Airflow命令行界面 Airflow还有一个非常强大的命令界面,一是我们使用自动化,一个是强大的命令,“backfill”,、允许我们几天内重复运行一个DAG。...当Airflow可以基于定义DAG时间有限选择的原则时,它可以同时进行几个任务,它基于定义时间有限选择的原则时(比如前期的任务必须在运行执行当前期任务之前成功完成)。...在这两个任务中的时间差异就会导致完成全部工作的时间差异很大。因此,这个图很清晰地告诉了为了运行时间更可预测,如果我们要根据速度和可扩展性增强,我们该在哪里花时间。

    2.6K90

    大规模运行 Apache Airflow 的经验和教训

    然而,由于我们允许用户自己的项目中部署工作负载(甚至部署时动态生成作业),这就变得更加困难。...这一点规模上尤为重要,因为要让 Airflow 管理员在所有作业进入生产之前对其进行审查是不现实的。...下图显示了我们最大的单一 Airflow 环境中,每 10 分钟完成任务数。...虽然不是资源争用的直接解决方案, priority_weight 对于确保延迟敏感的关键任务低优先级任务之前运行是很有用的。...虽然池允许限制单个工作负载内的并发性, priority_weight 可以用来使单个任务以比其他任务更低的延迟运行。如果你需要更多的灵活性,工作者隔离可以对执行任务的环境进行细粒度的控制。

    2.7K20

    Apache DolphinScheduler之有赞大数据开发平台的调度系统演进

    前言 不久前的 Apache DolphinScheduler Meetup 2021 上,有赞大数据开发平台负责人宋哲琦带来了平台调度系统 Airflow 迁移到 Apache DolphinScheduler...,将其生成 DAG round 实例执行任务调度。...稳定性与可用性上来说,DolphinScheduler 实现了高可靠与高可扩展性,去中心化的多 Master 多 Worker 设计架构,支持服务动态上下线,自我容错与调节能力更强。...工作流的原数据维护和配置同步其实都是基于 DP master来管理,只有在上线和任务运行时才会到调度系统进行交互,基于这点,DP 平台实现了工作流维度下的系统动态切换,以便于后续的线上灰度测试。...图 1 中,工作流在 6 点准时调起,每小时调一次,可以看到 6 点任务准时调起并完成任务执行,当前状态也是正常调度状态。

    2.8K20

    Apache Airflow 2.3.0 五一重磅发布!

    AirflowDAG中管理作业之间的执行依赖,并可以处理作业失败,重试和警报。开发人员可以编写Python代码以将数据转换为工作流中的操作。...有700多个提交,包括50个新功能,99个改进,85个错误修复~ 以下是最大的和值得注意的变化: 动态任务映射(Dynamic Task Mapping):允许工作流在运行时根据当前数据创建一些任务,而不是让...,将依赖关系线留给图形视图,并更好地处理任务组!...元数据数据库中清除历史记录 (Purge history from metadata database):新的 "airflow db clean "CLI命令用于清除旧记录:这将有助于减少运行DB迁移的时间...还可以为你的数据库生成降级/升级 SQL 脚本并针对您的数据库手动运行它,或者只查看将由降级/升级命令运行的 SQL 查询。

    1.9K20

    Python中有啥好用的开源任务调度管理项目

    不过,这并不是一个0到1的工作,之前最开始是采用的Django框架搭建起一个服务,使用apschedule 做任务管理,但是没有可视化的监控和预警。...模型的运行任务大体的分为三块, 数据准备,检查数据是否已经下发,模型运行的前置要求 模型运行,检查模型是否运行完成,中间是否有报错 模型结果,检查目标结果表是否有模型跑出来的结果 这三步是具有依赖关系,...后者的运行依赖前者运行完成。...列表中编辑功能不可用,也没有列表操作中接入任务日志查看的功能。 总结: 有句话说,踏破铁鞋无觅处,得来全不费功夫。...目前来看,JobCenter的功能仿佛可以实现我的需求,本身模型的任务量级也不大,百八十个左右。

    9.6K23

    Flink on Zeppelin 作业管理系统实践

    一年多时间的产线实践中,我们对作业提交的方式策略进行了几次演进,目前跑作业规模Flink Batch 任务日均运行超5000次,流作业500+,均稳定运行。...模式进行运行,由于每个长跑作业都需要建立实时监控,对server压力很大,调度任务外部运行SQL,也经常出现卡顿,无法提交作业的情况。...并发提交任务几乎不可能,虽然后续切换Yarn Application 模式可以把Flink interpreter 跑了JobManager里 缓解客户端压力,同时大规模提交pyflink作业仍存在执行效率问题...批作业提交优化 统一作业管理中注册Flink Batch SQL 作业,并配置调度时间及依赖关系; Airflow 生成dag,定时触发执行; 每一组任务执行时,首先新建EMR 集群,初始化Zeppelin...通过作业管理系统,我们将注册的任务记录在mysql数据库中,使用Airflow 通过扫描数据库动态创建及更新运行dag,将flink batch sql 封装为一类task group,包含了创建AWS

    2K20

    有赞大数据平台的调度系统演进

    概述 2017年,我们引入Airflow搭建了有赞大数据平台(DP)的调度系统,并完成了全量离线任务的接入。...Airflow的1.X版本存在的性能问题和稳定性问题,这其中也是我们生产环境中实际碰到过的问题和踩过的坑: 性能问题:Airflow对于Dag的加载是通过解析Dag文件实现的,因为Airflow2.0版本之前...任务执行流程改造 任务运行测试流程中,原先的DP-Airflow流程是通过dp的Master节点组装dag文件并通过DP Slaver同步到Worker节点上再执行Airflow Test命令执行任务测试...切换为DP-DS后所有的交互都基于DS-API来进行,当在DP启动任务测试时,会在DS侧生成对应的工作流定义配置并上线,然后进行任务运行,同时我们会调用ds的日志查看接口,实时获取任务运行日志信息。...通过任务测试和工作流发布这两个核心操作的流程可以看到,因为工作流的元数据维护和配置同步都是基于DP Master来管理,只有在上线和任务运行的时候才会与调度系统(Airflow、DS)进行交互,我们也基于这点实现了工作流维度下调度系统的动态切换

    2.3K20

    MLFlow︱机器学习工作流框架:介绍(一)

    2.3 MLFlow 和 AirFlow的差异 作者:谷瑞-Roliy: 之前我研究过用airflow来做类似的事情,想利用它的工作流和dag来定义机器学习流程,包括各种复杂的配置的管理功能也有实现。...不过airflow的一点点问题是,它还是更适合定时调度的任务。而像机器学习实验这种场景,run的频率可是很随意的。不过,现在有一个想法,离线的实验用mlflow,上线以后用airflow。...因为在线基本上就是很稳定的运行流程+固定或很少频率的更新,airflow时间纬度上的回退功能还是很有用的。也可以认为是现在mlflow缺的一点功能,daily run,或者叫自学习。...MLSQL核心在于: 提供了一个7*24小时的运行平台,算法的工作IDE中完成调试,Web界面上完成开发和部署,共享CPU/GPU/内存资源。...1,2 解决了算法脚本难于重复运行的问题,以及模型部署的问题,同时还解决了数据预处理复用的问题。 允许算法嵌入任何算法框架完成训练和预测,给了算法工程师足够的灵活性。

    4.3K21

    Airflow DAG 和最佳实践简介

    随着时间的推移,各种业务活动中使用的数据量急剧增长,每天兆字节到每分钟千兆字节。 尽管处理这种数据泛滥似乎是一项重大挑战,这些不断增长的数据量可以通过正确的设备进行管理。...Apache Airflow 是一个允许用户开发和监控批处理数据管道的平台。 例如,一个基本的数据管道由两个任务组成,每个任务执行自己的功能。但是,经过转换之前,新数据不能在管道之间推送。...基于图的表示中,任务表示为节点,而有向边表示任务之间的依赖关系。边的方向代表依赖关系。例如,任务 1 指向任务 2(上图)的边意味着任务 1 必须在任务 2 开始之前完成。该图称为有向图。...这意味着即使任务不同时间执行,用户也可以简单地重新运行任务并获得相同的结果。 始终要求任务是幂等的:幂等性是良好 Airflow 任务的最重要特征之一。不管你执行多少次幂等任务,结果总是一样的。...避免将数据存储本地文件系统上: Airflow 中处理数据有时可能很容易将数据写入本地系统。因此,下游任务可能无法访问它们,因为 Airflow 会并行运行多个任务

    3.1K10

    没看过这篇文章,别说你会用Airflow

    Scheduler:Airflow Scheduler 是一个独立的进程,通过读取 meta database 的信息来进行 task 调度,根据 DAGs 定义生成任务,提交到消息中间队列中(Redis...Worker:Airflow Worker 是独立的进程,分布相同 / 不同的机器上,是 task 的执行节点,通过监听消息中间件(redis)领取并且执行任务。...task, task 中实现这样的判断逻辑,就可以实现是否需要清理之前 publish 过的数据的逻辑,进而保证 task 本身是幂等的。...遇到的问题 分布式与代码同步问题 Airflow 是分布式任务分发的系统, master 和 worker 会部署不同的机器上,并且 worker 可以有很多的类型和节点。...,目前较少人力成本下,已经稳定运行超过 2 年时间,并没有发生故障。

    1.6K20

    OpenTelemetry实现更好的Airflow可观测性

    您探索 Grafana 之前,下面是一个示例演示 DAG,它每分钟运行一次并执行一项任务,即等待 1 到 10 秒之间的随机时间长度。...将其放入 DAG 文件夹中,启用它,并让它运行多个周期,以您浏览时生成一些指标数据。我们稍后将使用它生成的数据,它运行的时间越长,它看起来就越好。因此,请放心让它运行并离开一段时间,然后再继续。...如果您看到相同的值每次重复四次,如上面的屏幕截图所示,您可以将分辨率调整为 1/4,也可以调整 OTEL_INTERVAL 环境值(然后重新启动 Airflow 并重新运行 DAG 并等待值再次生成)...跟踪让我们了解管道运行时幕后实际发生的情况,并有助于可视化其任务运行的完整“路径”。例如,当与我们已经探索过的持续时间指标相结合时,我们将能够自动生成甘特图,以帮助找到减慢 DAG 速度的瓶颈。...例如,您汽车中的里程表或自您启动 Airflow 以来完成任务数。如果你可以说“再加一个”,那么你很可能正在处理一个计数器。

    45020

    apache-airflow

    “工作流即代码”有以下几个用途: 动态Airflow 管道配置为 Python 代码,允许生成动态管道。 可扩展:Airflow® 框架包含用于连接众多技术的运算符。...名为 “demo” 的 DAG, 2022 年 1 月 1 日开始,每天运行一次。...“demo” DAG 的状态 Web 界面中可见: 此示例演示了一个简单的 Bash 和 Python 脚本,这些任务可以运行任意代码。...Airflow 的用户界面提供: 深入了解两件事: 管道 任务 一段时间内管道概述 界面中,您可以检查日志和管理任务,例如在失败时重试任务。...虽然 CLI 和 REST API 确实允许触发工作流, Airflow 并不是为无限运行基于事件的工作流而构建的。Airflow 不是流式处理解决方案。

    12710

    【翻译】Airflow最佳实践

    now函数会得到一个当前时间对象,直接用在任务中会得到不同的结果。 类似connection_id或者S3存储路径之类重复的变量,应该定义default_args中,而不是重复定义每个任务里。...定义default_args中有助于避免一些类型错误之类的问题。 1.3 删除任务 不要从DAG中删除任务,因为一旦删除,任务的历史信息就无法再Airflow中找到了。...每次Airflow解析符合条件的python文件时,任务外的代码都会被运行,它运行的最小间隔是使用min_file_process_interval来定义的。 2....例如,如果我们有一个推送数据到S3的任务,于是我们能够在下一个任务完成检查。...2.4 暂存(staging)环境变量 如果可能,部署到生产环境运行起来之前,我们应该保持一个暂存环境去测试完整的DAG。需要确保我们的DAG是已经参数化了的,而不是DAG中硬编码。

    3.2K10

    Centos7安装部署Airflow详解

    AIRFLOW_HOME目录下生成了.cfg及相关文件即证明本次执行成功# 如果配置了pytho的环境变量直接执行# 没配置${PYTHON_HOME}/lib/python3.6/sit-packages...# 执行worker之前运行临时变量(临时的不能永久使用)export C_FORCE_ROOT="true"# 不需要切换用户cd /usr/local/python3/bin/# 前台启动worker...这是airflow集群的全局变量。airflow.cfg里面配置concurrency :每个dag运行过程中最大可同时运行的task实例数。...如果你没有设置这个值的话,scheduler 会airflow.cfg里面读取默认值 dag_concurrencyDAG中加入参数用于控制整个dagmax_active_runs : 来控制同一时间可以运行的最多的...需要不小于10才行,若小于10,那么会有任务需要等待之前任务执行完成才会开始执行。

    6.1K30

    Apache Airflow单机分布式环境搭建

    Airflow的可视化界面提供了工作流节点的运行监控,可以查看每个节点的运行状态、运行耗时、执行日志等。也可以界面上对节点的状态进行操作,如:标记为成功、标记为失败以及重新运行等。...例如: 时间依赖:任务需要等待某一个时间点触发 外部系统依赖:任务依赖外部系统需要调用接口去访问 任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响 资源环境依赖:任务消耗资源非常多...本地模式下会运行在调度器中,并负责所有任务实例的处理。...之所以要先执行一下这条命令是为了让Airflow我们设定的目录下生成配置文件: [root@localhost ~]# ls /usr/local/airflow/ airflow.cfg webserver_config.py...首先,拉取airflow的docker镜像: [root@localhost ~]# docker pull apache/airflow 拷贝之前本地安装时生成airflow配置文件: [root@

    4.4K20

    Airflow配置和使用

    Airflow独立于我们要运行任务,只需要把任务的名字和运行方式提供给Airflow作为一个task就可以。...把文后TASK部分的dag文件拷贝几个到~/airflow/dags目录下,顺次执行下面的命令,然后打开网址http://127.0.0.1:8080就可以实时侦测任务动态了: ct@server:~/...=/var/log/airflow-scheduler.err.log stdout_logfile=/var/log/airflow-scheduler.out.log 特定情况下,修改DAG后,为了避免当前日期之前任务运行...内网服务器只开放了SSH端口22,因此 我尝试另外一台电脑上使用相同的配置,然后设置端口转发,把外网服务器 的rabbitmq的5672端口映射到内网服务器的对应端口,然后启动airflow连接 。...netstat -lntp | grep 6379 任务未按预期运行可能的原因 检查 start_date 和end_date是否合适的时间范围内 检查 airflow worker, airflow

    13.9K71
    领券