首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Airflow 2.0.1 :在本地时区写入日志

基础概念

Apache Airflow 是一个用于创建、调度和监控工作流的开源平台。它允许用户通过有向无环图(DAG)定义任务之间的依赖关系,并自动执行这些任务。Airflow 2.0.1 是该平台的一个版本。

相关优势

  1. 灵活性:用户可以自定义任务和工作流。
  2. 可扩展性:支持插件机制,方便扩展功能。
  3. 可视化:提供直观的Web界面来监控和管理任务。
  4. 社区支持:拥有活跃的社区和丰富的文档资源。

类型与应用场景

  • 批处理作业:定期执行的数据处理任务。
  • ETL(提取、转换、加载)流程:数据仓库中的常见操作。
  • 机器学习工作流:模型训练和评估的自动化。
  • 基础设施即代码(IaC):自动化部署和管理云资源。

遇到的问题及原因

在本地时区写入日志时,可能会遇到时区不一致的问题。这通常是由于Airflow默认使用UTC时区,而系统或应用程序可能配置为其他时区。

解决方法

设置Airflow时区

  1. 修改配置文件: 编辑 airflow.cfg 文件,找到 [core] 部分,添加或修改以下配置:
  2. 修改配置文件: 编辑 airflow.cfg 文件,找到 [core] 部分,添加或修改以下配置:
  3. 环境变量: 在启动Airflow之前,设置环境变量 AIRFLOW__CORE__DEFAULT_TIMEZONE
  4. 环境变量: 在启动Airflow之前,设置环境变量 AIRFLOW__CORE__DEFAULT_TIMEZONE
  5. 代码中指定时区: 在DAG文件中,可以通过Python代码指定时区:
  6. 代码中指定时区: 在DAG文件中,可以通过Python代码指定时区:

日志配置

确保日志记录时也使用正确的时区。可以在 log_config.py 中进行配置:

代码语言:txt
复制
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import pendulum

local_tz = pendulum.timezone("Asia/Shanghai")

LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)

handler = TimedRotatingFileHandler(
    os.path.join('logs', 'airflow.log'),
    when='midnight',
    interval=1,
    backupCount=7
)
handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(handler)

示例代码

以下是一个简单的DAG示例,展示了如何在Airflow 2.0.1中设置时区并记录日志:

代码语言:txt
复制
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import pendulum

local_tz = pendulum.timezone("Asia/Shanghai")

with DAG(
    'timezone_example_dag',
    default_args={'owner': 'airflow', 'start_date': days_ago(2)},
    schedule_interval=timedelta(days=1),
    catchup=False
) as dag:
    task = DummyOperator(task_id='dummy_task', execution_date=local_tz.localize(datetime.now()))

    def print_hello():
        logging.info("Hello, Airflow! Current time is %s", datetime.now(local_tz))

    hello_task = PythonOperator(
        task_id='hello_task',
        python_callable=print_hello,
        execution_date=local_tz.localize(datetime.now())
    )

    task >> hello_task

通过以上步骤和示例代码,可以有效解决Airflow在本地时区写入日志的问题。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

闲聊调度系统 Apache Airflow

时区问题 时区问题真的是一言难尽。当时 Airflow 从 1.9 版本开始全局统一使用 UTC 时间,虽然后续版本可以配置化了,但是当时的 1.9 版本还不能进行更改。...虽然我理解这种设计是为了解决当 Airflow 集群分布在不同时区的时候内部时间依然是相同的,不会出现时间不同步的情况。但是我们的节点只有一个,即使后面扩展为集群,集群内部的时间也会是同一个时区。...如果不用本地时区的话,使用 UTC 时间很容易对开发者造成困惑。当时又不想降版本到 1.8 ,因为 1.9 新增的很多功能都是很有意义的。...最后是在 Github 上发现孵化中的 2.0 版本时区已经可以配置化了,我们就直接使用 Github 上的孵化版本了。...Backfill Airflow 有一个 backfill 的功能,可以支持重跑历史任务,但是只能在命令行执行,要是在 WebUI 上就需要一个个 clear 掉状态,有时候挺痛苦的。

9.3K21
  • 大数据调度平台Airflow(八):Airflow分布式集群搭建及测试

    = /root/airflow/dags#修改时区default_timezone = Asia/Shanghai#配置Executor类型,集群建议配置CeleryExecutorexecutor...use_unicode=true&charset=utf8[webserver]#设置时区default_ui_timezone = Asia/Shanghai[celery]#配置Celery broker.../dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"), dag=dag)first >> second将以上内容写入execute_shell.py...重启后进入Airflow WebUI查看任务:图片 点击“success”任务后,可以看到脚本执行成功日志:图片图片图片4、测试Airflow HA当我们把node1节点的websever关闭后,可以直接通过...节点查看scheduler_failover_controller进程日志中有启动schudler动作,注意:这里是先从node1启动,启动不起来再从其他Master 节点启动Schduler。

    2.5K106

    【补充】助力工业物联网,工业大数据之AirFlow安装

    [redis] pip install apache-airflow[mysql] pip install flower pip install celery 验证 airflow -h ll /root...修改配置文件:airflow.cfg [core] #18行:时区 default_timezone = Asia/Shanghai #24行:运行模式 # SequentialExecutor是单进程顺序执行任务...,默认执行器,通常只用于测试 # LocalExecutor是多进程本地执行任务使用的 # CeleryExecutor是分布式调度使用(可以单机),生产环境常用 # DaskExecutor则用于动态任务调度...-D airflow scheduler -D airflow celery flower -D airflow celery worker -D 关闭【不用执行】 # 统一杀掉airflow的相关服务进程命令...# 下一次启动之前 rm -f /root/airflow/airflow-* 5、验证AirFlow Airflow Web UI:node1:8085 Airflow Celery Web

    24920

    Airflow2.2.3 + Celery + MYSQL 8构建一个健壮的分布式调度集群

    1集群环境 同样是在Ubuntu 20.04.3 LTS机器上安装Airflow集群,这次我们准备三台同等配置服务器,进行测试,前篇文章[1]中,我们已经在Bigdata1服务器上安装了airflow的所有组件...服务 docker-compose up -d 接下来,按照同样的方式在bigdata3节点上安装airflow-worker服务就可以了。...,因此这里需要修改一下docker-compose.yaml中x-airflow-common的volumes,将airflow.cfg通过挂载卷的形式挂载到容器中,配置文件可以在容器中拷贝一份出来,然后在修改...; 前期使用的时候,我们需要将docker-compose文件中的一些环境变量的值写入到airflow.cfg文件中,例如以下信息: [core] dags_folder = /opt/airflow/...#自定义airflow域名 default_ui_timezone = Asia/Shanghai # 设置默认的时区 web_server_host = 0.0.0.0 web_server_port

    1.8K10

    AIRFLow_overflow百度百科

    与crontab相比Airflow可以方便查看任务的执行状况(执行是否成功、执行时间、执行依 赖等),可追踪任务历史执行情况,任务执行失败时可以收到邮件通知,查看错误日志。...apache-airflow (2)修改airflow对应的环境变量:export AIRFLOW_HOME=/usr/local/airflow (3)执行airflow version,在/usr...:airflow webserver –p 8080 在安装过程中如遇到如下错误: 在my.cnf中加explicit_defaults_for_timestamp=1,然后重启数据库 5、Airflow...开始执行和结束执行的UTC时间⑥该task开始执行和结束执行的CST时间,也就是中国香港本地时间。...调度时间还可以以“* * * * *”的形式表示,执行时间分别是“分,时,天,月,年” 注意:① Airflow使用的时间默认是UTC的,当然也可以改成服务器本地的时区。

    2.2K20

    Apache Airflow单机分布式环境搭建

    Airflow在2014年由Airbnb发起,2016年3月进入Apache基金会,在2019年1月成为顶级项目。...Airflow的可视化界面提供了工作流节点的运行监控,可以查看每个节点的运行状态、运行耗时、执行日志等。也可以在界面上对节点的状态进行操作,如:标记为成功、标记为失败以及重新运行等。...在本地模式下会运行在调度器中,并负责所有任务实例的处理。...,首页如下: 右上角可以选择时区: 页面上有些示例的任务,我们可以手动触发一些任务进行测试: 点击具体的DAG,就可以查看该DAG的详细信息和各个节点的运行状态: 点击DAG中的节点,就可以对该节点进行操作...首先,拉取airflow的docker镜像: [root@localhost ~]# docker pull apache/airflow 拷贝之前本地安装时生成的airflow配置文件: [root@

    4.5K20

    Centos7安装部署Airflow详解

    创建用户(worker 不允许在root用户下执行)# 创建用户组和用户groupadd airflow useradd airflow -g airflow# 将 {AIRFLOW_HOME}目录修用户组...worker# 后台启动work服务airflow worker -D修改时区修改airflow.cfg文件 default_timezone = Asia/Shanghai找到airflow安装路径参考如下...时区修改配置email报警在airflow配置文件airflow.cfg中修改参考aiflow官方文档email_backend = airflow.utils.email.send_email_smtpsmtp...这是airflow集群的全局变量。在airflow.cfg里面配置concurrency :每个dag运行过程中最大可同时运行的task实例数。...如果你没有设置这个值的话,scheduler 会从airflow.cfg里面读取默认值 dag_concurrency在DAG中加入参数用于控制整个dagmax_active_runs : 来控制在同一时间可以运行的最多的

    6.2K30

    Azure Airflow 中配置错误可能会使整个集群受到攻击

    网络安全研究人员在 Microsoft 的 Azure 数据工厂 Apache Airflow 中发现了三个安全漏洞,如果成功利用这些漏洞,攻击者可能会获得执行各种隐蔽操作的能力,包括数据泄露和恶意软件部署...在本月早些时候发布的分析中表示。...Geneva 服务中的缺陷来篡改日志数据或发送虚假日志,以避免在创建新的 Pod 或账户时引起怀疑。...最终目标是在导入后立即向外部服务器反弹 shell。要实现此目的,攻击者必须首先通过使用遭到入侵的服务主体或文件的共享访问签名 (SAS) 令牌来获得对包含 DAG 文件的存储账户的写入权限。...尽管发现以这种方式获得的 shell 在 Kubernetes Pod 中的 Airflow 用户上下文中以最低权限运行,但进一步分析确定了一个具有 cluster-admin 权限的服务账户连接到 Airflow

    12010

    Airflow速用

    web界面 可以手动触发任务,分析任务执行顺序,任务执行状态,任务代码,任务日志等等; 实现celery的分布式任务调度系统; 简单方便的实现了 任务在各种状态下触发 发送邮件的功能;https://airflow.apache.org...#queues 存储日志到远程 http://airflow.apache.org/howto/write-logs.html 调用 远程 谷歌云,亚马逊云 相关服务(如语音识别等等)https://airflow.apache.org...Executor间(如 LocalExecutor,CeleryExecutor)不同点在于他们拥有不同的资源以及如何利用资源分配工作,如LocalExecutor只在本地并行执行任务,CeleryExecutor...,在连接的数据库服务创建一个 名为 airflow_db的数据库 命令行初始化数据库:airflow initdb 命令行启动web服务: airflow webserver -p 8080...服务时,报错如下 Error: No module named airflow.www.gunicorn_config * 处理方式 在supervisor的配置文件的 environment常量中添加

    5.5K10

    TiDB 2.1 GA Release Notes

    集群中的分布 控制是否打开 general log 在线修改日志级别 查询 TiDB 集群信息 添加 auto_analyze_ratio 系统变量控制自动 Analyze 的阈值 添加 tidb_retry_limit...slow 语句来获取慢查询语句 增加环境变量 tidb_slow_log_threshold 动态设置 slow log 的阈值 增加环境变量 tidb_query_log_max_len 动态设置日志中被截断的原始...Region tree 性能 优化计算热点统计的性能问题 TiKV Coprocessor 新增支持大量内建函数 新增 Coprocessor ReadPool,提高请求处理并发度 修复时间函数解析以及时区相关问题...drop table/index 的情况下快速回收空间 GC 模块独立出来,减少对正常写入的影响 kv_scan 命令支持设置 upper bound Raftstore 优化 snapshot 文件写入流程避免导致...版本的集群,无法滚动升级到 2.1,可以选择下面两种方案: 停机升级,直接从早于 2.0.1 的 TiDB 版本升级到 2.1 先滚动升级到 2.0.1 或者之后的 2.0.x 版本,再滚动升级到 2.1

    77600

    Agari使用Airbnb的Airflow实现更智能计划任务的实践

    在我之前的文章中,我描述了我们如何加载并处理本地收集器中的数据(即存在于我们企业级客户的数据中心里的收集器)。...-一个用来收集日志的中心位置供配置管理; 提供强大的CLI用于自动易于集成; 提供状态捕获功能; 对于任何运行,我们能够知道用于运行的输入和配置文件。...这涉及到几个更多的任务: wait_for_new_data_in_db 确保新生成的数据正在被成功地写入数据库 wait_for_empty_queue 等待SQS队列清空 send_email_notification_flow_successful...Airflow命令行界面 Airflow还有一个非常强大的命令界面,一是我们使用自动化,一个是强大的命令,“backfill”,、允许我们在几天内重复运行一个DAG。...因为Luigi和Airflow都是在云环境中产生的,这样少了一个让人头痛的烦恼。

    2.6K90

    Airflow秃头两天填坑过程:任务假死问题

    这也意味着这个问题没法在本地重现,只能在线上处理,这本身就比较大风险,因为线上的数据量很大,搞不好就删库跑路的了。...,调度器和worker也在跑,但是任务不会自动调度; 重启Airflow,手动执行任务等,都没有报错; 在界面上clear一个任务的状态时,会卡死,而通过命令来执行则耗时很长,最后也抛异常。...根据第三个症状,怀疑是Dag任务日志太多导致的,查Airflow的日志,确实很多,于是删删删。清掉了很多日志之后,问题依旧。...这个数据库是Airflow和业务系统共用的, 虽然Airflow停掉了且长时间在执行的sql也清理了, 不会有什么负载, 但是业务系统还一直在跑, 于是进业务系统的数据库看正在执行的sql进程: show...: 要么是系统负载问题(本地资源问题), 要么是上游资源问题。

    2.7K20

    apache-airflow

    ——《自由在高处》 Apache Airflow® 是一个开源平台,用于开发、安排和监控面向批处理的工作流。Airflow 的可扩展 Python 框架使您能够构建与几乎任何技术连接的工作流。...在解决错误后重新运行部分管道的能力有助于最大限度地提高效率。...Airflow 的用户界面提供: 深入了解两件事: 管道 任务 一段时间内管道概述 在界面中,您可以检查日志和管理任务,例如在失败时重试任务。...Airflow 的开源性质可确保您使用由全球许多其他公司开发、测试和使用的组件。在活跃的社区中,您可以找到大量有用的资源,包括博客文章、文章、会议、书籍等。...Kafka 可用于实时摄取和处理,事件数据写入存储位置,并且 Airflow 会定期启动处理一批数据的工作流。 如果您更喜欢单击而不是编码,Airflow 可能不是正确的解决方案。

    24810

    大数据调度平台Airflow(二):Airflow架构及原理

    ;监控任务;断点续跑任务;查询任务状态、详细日志等。...Executor:执行器,负责运行task任务,在默认本地模式下(单机airflow)会运行在调度器Scheduler中并负责所有任务的处理。...但是在airflow集群模式下的执行器Executor有很多类型,负责将任务task实例推送给Workers节点执行。...在Airflow中执行器有很多种选择,最关键的执行器有以下几种:SequentialExecutor:默认执行器,单进程顺序执行任务,通常只用于测试。LocalExecutor:多进程本地执行任务。...TaskTask是Operator的一个实例,也就是DAG中的一个节点,在某个Operator的基础上指定具体的参数或者内容就形成一个Task,DAG中包含一个或者多个Task。

    6.3K33
    领券