我正在Google上的气流DAG中扩展地使用BigQueryOperator。
对于较长的查询,最好将每个查询放在自己的.sql文件中,而不是将DAG与其混淆。气流似乎支持所有SQL查询操作符,包括BigQueryOperator,正如您在文献资料中所看到的那样。
我的问题是:在我用.sql模板文件编写了我的sql语句之后,如何将它添加到Google中并在DAG中引用它呢?
发布于 2021-06-29 14:25:09
我为这个问题找到了一个理想的解决办法。在dag声明中,您可以设置template_searchpath,这是气流查找jinja模板文件的默认路径。
为了在Composer实例中完成此工作,必须将其设置为
dag = DAG(
...
template_searchpath=["/home/airflow/gcs/plugins"],
)注意,我在本例中使用了plugins文件夹。您可以使用您的数据文件夹,或者您希望在桶中拥有的任何文件夹。
发布于 2018-06-08 13:04:03
在谷歌搜索并找到这个相关的问题之后。我找到了一种方法来解决这个问题(尽管这不是理想的解决方案,我们会看到的)。下面是一个包含三个部分的工作示例:
gcloud命令需要将模板上传到正确的位置。(1) sql模板文件--这只是一个文本文件,其文件名以.sql扩展名结尾。假设这个文件名为my-templated-query.sql,包含:
SELECT COUNT(1)
FROM mytable
WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')(2) 引用DAG文件中的模板以引用此模板,创建如下操作符:
count_task = BigQueryOperator(
task_id='count_rows',
sql='/my-templated-query.sql')(3) 将模板文件添加到Google 中,在默认情况下,气流会在dags文件夹中查找模板文件。要将我们的模板文件上传到dags文件夹,我们运行
gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql您必须相应地替换env名称、位置和源路径。
将所有这些模板上传到dag文件夹似乎并不合适。更好的气流练习是将模板放在自己的文件夹中,并将template_searchpath参数指定为创建DAG时指向它。但是,我不知道如何用来完成这个任务。
更新:我认识到可以在文件夹中放置子文件夹,这对于组织大量的模板非常有用。假设我将一个SQL模板文件放在DAG_FOLDER/dataset1/table1.sql中的BigQueryOperator中,那么我就可以使用sql=/dataset1/table1.sql来引用它。如果您有一个包含大量文件和许多其他子文件夹的子文件夹,您也可以使用上面显示的dag import递归地上载整个子文件夹--只需将其指向子文件夹。
发布于 2020-07-07 23:48:17
我们最近用类似的策略解决了这个问题。这些步骤是:
BigQueryOperator中的模板在执行时读取查询。下面是一个最小的解决方案:
from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator
with models.DAG(
'bigquery_dag',
schedule_interval = None ,
template_searchpath = ['/home/airflow/gcs/data/repo/queries/'],
default_args = default_dag_args
) as dag:
t1_clean_repo = bash_operator.BashOperator(
task_id = 'clean_repo',
bash_command = 'rm -rf /home/airflow/gcs/data/repo'
)
clone_command = """
gcloud source repos clone repo --project=project_id
cp -R repo /home/airflow/gcs/data
"""
t2_clone_repo = bash_operator.BashOperator(
task_id='clone_repo',
bash_command=clone_command
)
t3_query = bigquery_operator.BigQueryOperator(
task_id='query',
sql= 'query.sql',
use_legacy_sql = False,
bigquery_conn_id='conn_id'
)我们在这里利用了一些重要的概念:
git clone文件权限。template_searchpath,将搜索范围扩展到云存储桶中的data目录。https://stackoverflow.com/questions/50761084
复制相似问题