首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在Google中使用气流模板文件和template_searchpath

在Google中使用气流模板文件和template_searchpath
EN

Stack Overflow用户
提问于 2018-06-08 12:51:40
回答 3查看 11K关注 0票数 7

我正在Google上的气流DAG中扩展地使用BigQueryOperator

对于较长的查询,最好将每个查询放在自己的.sql文件中,而不是将DAG与其混淆。气流似乎支持所有SQL查询操作符,包括BigQueryOperator,正如您在文献资料中所看到的那样。

我的问题是:在我用.sql模板文件编写了我的sql语句之后,如何将它添加到Google中并在DAG中引用它呢?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2021-06-29 14:25:09

我为这个问题找到了一个理想的解决办法。在dag声明中,您可以设置template_searchpath,这是气流查找jinja模板文件的默认路径。

为了在Composer实例中完成此工作,必须将其设置为

代码语言:javascript
运行
复制
dag = DAG(
    ...
    template_searchpath=["/home/airflow/gcs/plugins"],
)

注意,我在本例中使用了plugins文件夹。您可以使用您的数据文件夹,或者您希望在桶中拥有的任何文件夹。

票数 8
EN

Stack Overflow用户

发布于 2018-06-08 13:04:03

在谷歌搜索并找到这个相关的问题之后。我找到了一种方法来解决这个问题(尽管这不是理想的解决方案,我们会看到的)。下面是一个包含三个部分的工作示例:

  1. 带有一些jinja模板的sql模板文件,
  2. DAG,和
  3. gcloud命令需要将模板上传到正确的位置。

(1) sql模板文件--这只是一个文本文件,其文件名以.sql扩展名结尾。假设这个文件名为my-templated-query.sql,包含:

代码语言:javascript
运行
复制
SELECT COUNT(1)
FROM mytable
WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')

(2) 引用DAG文件中的模板以引用此模板,创建如下操作符:

代码语言:javascript
运行
复制
count_task = BigQueryOperator(
  task_id='count_rows',
  sql='/my-templated-query.sql')

(3) 将模板文件添加到Google 中,在默认情况下,气流会在dags文件夹中查找模板文件。要将我们的模板文件上传到dags文件夹,我们运行

代码语言:javascript
运行
复制
gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql

您必须相应地替换env名称、位置和源路径。

将所有这些模板上传到dag文件夹似乎并不合适。更好的气流练习是将模板放在自己的文件夹中,并将template_searchpath参数指定为创建DAG时指向它。但是,我不知道如何用来完成这个任务。

更新:我认识到可以在文件夹中放置子文件夹,这对于组织大量的模板非常有用。假设我将一个SQL模板文件放在DAG_FOLDER/dataset1/table1.sql中的BigQueryOperator中,那么我就可以使用sql=/dataset1/table1.sql来引用它。如果您有一个包含大量文件和许多其他子文件夹的子文件夹,您也可以使用上面显示的dag import递归地上载整个子文件夹--只需将其指向子文件夹。

票数 5
EN

Stack Overflow用户

发布于 2020-07-07 23:48:17

我们最近用类似的策略解决了这个问题。这些步骤是:

  1. 将所有SQL文件放入谷歌云源库中。
  2. 在每个DAG运行开始时,将文件克隆到云存储桶中的"data“目录中,该目录将自动与您的气流环境共享。
  3. 使用BigQueryOperator中的模板在执行时读取查询。

下面是一个最小的解决方案:

代码语言:javascript
运行
复制
from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator

with models.DAG(
        'bigquery_dag',
        schedule_interval = None ,
        template_searchpath = ['/home/airflow/gcs/data/repo/queries/'],
        default_args = default_dag_args
        ) as dag:

    t1_clean_repo = bash_operator.BashOperator(
        task_id = 'clean_repo',
        bash_command = 'rm -rf /home/airflow/gcs/data/repo'
    )

    clone_command = """
        gcloud source repos clone repo --project=project_id
        cp -R repo /home/airflow/gcs/data
    """

    t2_clone_repo = bash_operator.BashOperator(
        task_id='clone_repo',
        bash_command=clone_command
        )

    t3_query = bigquery_operator.BigQueryOperator(
        task_id='query',
        sql= 'query.sql',
        use_legacy_sql = False,
        bigquery_conn_id='conn_id'
    )

我们在这里利用了一些重要的概念:

  1. 云存储桶中的数据目录通过保险丝自动与您的气流实例共享。在这里放置的任何东西都可以为大多数运营商所访问。
  2. 只要您的Google存储库与Composer位于同一个项目中,您的气流实例就不需要额外的git clone文件权限。
  3. 我们在DAG参数中设置template_searchpath,将搜索范围扩展到云存储桶中的data目录。
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50761084

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档