在Airflow中使用JDBC操作符来获取SQL查询结果,可以通过以下步骤完成:
from airflow.models import DAG
from airflow.providers.jdbc.operators.jdbc import JdbcOperator
from datetime import datetime
dag = DAG(
dag_id='jdbc_operator_example',
start_date=datetime(2022, 1, 1),
schedule_interval=None
)
jdbc_operator = JdbcOperator(
task_id='jdbc_task',
jdbc_conn_id='jdbc_connection',
sql="SELECT * FROM your_table",
dag=dag
)
其中,jdbc_conn_id
是在Airflow的连接配置中配置的JDBC连接的标识符,可以在Airflow的Web界面进行配置。
FileWriter
类:from airflow.hooks.base import BaseHook
from airflow.utils.file import TemporaryDirectory, get_task_logger
class JdbcToFileOperator(JdbcOperator):
def execute(self, context):
with TemporaryDirectory(prefix='jdbc_to_file_') as tmp_dir:
hook = BaseHook.get_hook(conn_id=self.jdbc_conn_id)
connection = hook.get_conn()
cursor = connection.cursor()
cursor.execute(self.sql)
result = cursor.fetchall()
logger = get_task_logger(__name__)
file_path = tmp_dir + "/result.txt"
with open(file_path, 'w') as file:
for row in result:
file.write(str(row) + '\n')
logger.info(f"Saved query result to {file_path}")
jdbc_to_file_operator = JdbcToFileOperator(
task_id='jdbc_to_file_task',
jdbc_conn_id='jdbc_connection',
sql="SELECT * FROM your_table",
dag=dag
)
jdbc_operator >> jdbc_to_file_operator
这样,当Airflow调度器执行该DAG时,JDBC操作符将会连接到指定的数据库,执行SQL查询,并将查询结果返回或保存到文件中。根据需要,你可以根据具体的业务需求进行扩展和定制。
领取专属 10元无门槛券
手把手带您无忧上云