首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Airflow 2.0.1 :在本地时区写入日志

基础概念

Apache Airflow 是一个用于创建、调度和监控工作流的开源平台。它允许用户通过有向无环图(DAG)定义任务之间的依赖关系,并自动执行这些任务。Airflow 2.0.1 是该平台的一个版本。

相关优势

  1. 灵活性:用户可以自定义任务和工作流。
  2. 可扩展性:支持插件机制,方便扩展功能。
  3. 可视化:提供直观的Web界面来监控和管理任务。
  4. 社区支持:拥有活跃的社区和丰富的文档资源。

类型与应用场景

  • 批处理作业:定期执行的数据处理任务。
  • ETL(提取、转换、加载)流程:数据仓库中的常见操作。
  • 机器学习工作流:模型训练和评估的自动化。
  • 基础设施即代码(IaC):自动化部署和管理云资源。

遇到的问题及原因

在本地时区写入日志时,可能会遇到时区不一致的问题。这通常是由于Airflow默认使用UTC时区,而系统或应用程序可能配置为其他时区。

解决方法

设置Airflow时区

  1. 修改配置文件: 编辑 airflow.cfg 文件,找到 [core] 部分,添加或修改以下配置:
  2. 修改配置文件: 编辑 airflow.cfg 文件,找到 [core] 部分,添加或修改以下配置:
  3. 环境变量: 在启动Airflow之前,设置环境变量 AIRFLOW__CORE__DEFAULT_TIMEZONE
  4. 环境变量: 在启动Airflow之前,设置环境变量 AIRFLOW__CORE__DEFAULT_TIMEZONE
  5. 代码中指定时区: 在DAG文件中,可以通过Python代码指定时区:
  6. 代码中指定时区: 在DAG文件中,可以通过Python代码指定时区:

日志配置

确保日志记录时也使用正确的时区。可以在 log_config.py 中进行配置:

代码语言:txt
复制
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import pendulum

local_tz = pendulum.timezone("Asia/Shanghai")

LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)

handler = TimedRotatingFileHandler(
    os.path.join('logs', 'airflow.log'),
    when='midnight',
    interval=1,
    backupCount=7
)
handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(handler)

示例代码

以下是一个简单的DAG示例,展示了如何在Airflow 2.0.1中设置时区并记录日志:

代码语言:txt
复制
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import pendulum

local_tz = pendulum.timezone("Asia/Shanghai")

with DAG(
    'timezone_example_dag',
    default_args={'owner': 'airflow', 'start_date': days_ago(2)},
    schedule_interval=timedelta(days=1),
    catchup=False
) as dag:
    task = DummyOperator(task_id='dummy_task', execution_date=local_tz.localize(datetime.now()))

    def print_hello():
        logging.info("Hello, Airflow! Current time is %s", datetime.now(local_tz))

    hello_task = PythonOperator(
        task_id='hello_task',
        python_callable=print_hello,
        execution_date=local_tz.localize(datetime.now())
    )

    task >> hello_task

通过以上步骤和示例代码,可以有效解决Airflow在本地时区写入日志的问题。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

    领券