Apache Airflow 是一个用于创建、调度和监控工作流的开源平台。它允许用户通过有向无环图(DAG)定义任务之间的依赖关系,并自动执行这些任务。Airflow 2.0.1 是该平台的一个版本。
在本地时区写入日志时,可能会遇到时区不一致的问题。这通常是由于Airflow默认使用UTC时区,而系统或应用程序可能配置为其他时区。
airflow.cfg
文件,找到 [core]
部分,添加或修改以下配置:airflow.cfg
文件,找到 [core]
部分,添加或修改以下配置:AIRFLOW__CORE__DEFAULT_TIMEZONE
:AIRFLOW__CORE__DEFAULT_TIMEZONE
:确保日志记录时也使用正确的时区。可以在 log_config.py
中进行配置:
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import pendulum
local_tz = pendulum.timezone("Asia/Shanghai")
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
handler = TimedRotatingFileHandler(
os.path.join('logs', 'airflow.log'),
when='midnight',
interval=1,
backupCount=7
)
handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(handler)
以下是一个简单的DAG示例,展示了如何在Airflow 2.0.1中设置时区并记录日志:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import pendulum
local_tz = pendulum.timezone("Asia/Shanghai")
with DAG(
'timezone_example_dag',
default_args={'owner': 'airflow', 'start_date': days_ago(2)},
schedule_interval=timedelta(days=1),
catchup=False
) as dag:
task = DummyOperator(task_id='dummy_task', execution_date=local_tz.localize(datetime.now()))
def print_hello():
logging.info("Hello, Airflow! Current time is %s", datetime.now(local_tz))
hello_task = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
execution_date=local_tz.localize(datetime.now())
)
task >> hello_task
通过以上步骤和示例代码,可以有效解决Airflow在本地时区写入日志的问题。
领取专属 10元无门槛券
手把手带您无忧上云