对于Airflow,我想运行一个查询,根据ds返回给定时间段的所有数据。ds始终是我的结束日期,但开始日期可以改变。例如,它可以是一周或一个完整的月。为了处理这一点,我想创建不同的dags,每个月或每周运行一次。到目前一切尚好。然而,当我想通过start_dt考试时,我遇到了麻烦where report_dt between '{{ params.report_star
我正在尝试读取sql文件,其中包含与jinja模板在自定义操作符在气流中的查询。ds },比如SELECT * FROM my_table WHERE date > {{ ds }}I用template_fields和template_ext创建了CustomOperator='sql_process', sql="/
但是,pyspark模块需要将session变量作为参数。我已经使用application_args将参数传递给pyspark模块。但是,当我运行dag时,submit操作符失败了,我传入的参数被认为是None类型变量。需要知道如何将参数传递给通过spark_submit_operator触发的pyspark模块。DAG代码如下:
from pyspark.sql import SparkSessi
我是阿帕奇气流的新手。我想使用DAG调用REST端点。我正在做的是使用SimpleHttpOperator调用Rest端点。12-30T08:57:00.674386+00:0009:09:07,446] {{dagbag.py:92}} INFO - Filling up the DagBag from