首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Airflow - Python文件不在同一个DAG文件夹中

Airflow - Python文件不在同一个DAG文件夹中
EN

Stack Overflow用户
提问于 2015-11-03 22:33:56
回答 3查看 32.1K关注 0票数 25

我试图使用气流来执行一个简单的任务python。

代码语言:javascript
运行
复制
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

例如,如果我尝试:

气流试验python_test打印2015-01-01

它起作用了!

现在,我想将我的def print_context(ds, **kwargs)函数放到其他python文件中。因此,我创建了一个名为: simple_test.py和change的文件:

代码语言:javascript
运行
复制
run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)

现在我试着再跑一次

气流试验python_test打印2015-01-01

好的!还能用!

但是,如果我创建一个模块,例如,带有文件SimplePython.py的worker模块,导入(from worker import SimplePython)它并尝试:

气流试验python_test打印2015-01-01

它传达了这样的信息:

ImportError:没有名为worker的模块

问题如下:

  1. 是否可以在DAG定义中导入模块?
  2. Airflow+Celery将如何在工作节点之间分发所有必需的python源文件?
EN

回答 3

Stack Overflow用户

发布于 2017-11-22 16:28:48

虽然将dags打包到zip中是我看到的唯一受支持的解决方案,但您也可以导入dags文件夹中的模块。如果您使用木偶和git等其他工具自动同步dags文件夹,这是非常有用的。

从问题中我不清楚您的目录结构,因此下面是一个基于典型python项目结构的示例dags文件夹:

代码语言:javascript
运行
复制
└── airflow/dags  # root airflow dags folder where all dags live
    └── my_dags  # git repo project root
        ├── my_dags  # python src root (usually named same as project)
        │   ├── my_test_globals.py  # file I want to import
        │   ├── dag_in_package.py 
        │   └── dags 
        │        └── dag_in_subpackage.py
        ├── README.md  # also setup.py, LICENSE, etc here
        └── dag_in_project_root.py

我遗漏了(必需的[1]) __init__.py文件。注意三个示例dags的位置。你几乎肯定只会把这些地方中的一个用在你所有的笨蛋身上。我把它们都包括在这里是为了举个例子,因为这对进口来说不重要。要从其中任何一个导入my_test_globals

代码语言:javascript
运行
复制
from my_dags.my_dags import my_test_globals

我相信这意味着气流运行时,python被设置为dages目录,因此dags文件夹的每个子目录都可以被视为package。在我的例子中,它是一个额外的中间项目根目录,妨碍了一个典型的包内绝对导入。因此,我们可以像这样重组这个气流项目:

代码语言:javascript
运行
复制
└── airflow/dags  # root airflow dags folder where all dags live
    └── my_dags  # git repo project root & python src root
        ├── my_test_globals.py  # file I want to import
        ├── dag_in_package.py 
        ├── dags 
        │    └── dag_in_subpackage.py
        ├── README.md  # also setup.py, LICENSE, etc here
        └── dag_in_project_root.py

因此,进口商品的外观与我们预期的一样:

代码语言:javascript
运行
复制
from my_dags import my_test_globals
票数 11
EN

Stack Overflow用户

发布于 2016-03-31 07:42:30

对于第二个问题: Airflow+Celery将如何在工作节点之间分发所有必需的python源文件?

从文档中看:工作人员需要访问它的DAGS_FOLDER,并且您需要通过自己的方式同步文件系统。一个常见的设置是将您的DAGS_FOLDER存储在Git存储库中,并使用主厨、木偶、Ansible或您在环境中配置机器的任何东西在计算机之间同步。如果您的所有框都有一个公共的挂载点,那么管道文件在那里也应该可以使用。

http://pythonhosted.org/airflow/installation.html?highlight=chef

票数 2
EN

Stack Overflow用户

发布于 2015-11-24 21:16:06

对于你的第一个问题,这是可能的。

我想您应该在同一个目录下创建一个名为__init__.py的空文件,并使用SimplePython.py (在您的例子中是worker目录)。通过这样做,worker目录将被看作是一个python模块。

然后在DAG定义中,尝试from worker.SimplePython import print_context

在您的情况下,我想如果您为气流编写一个插件会更好,因为您可能希望升级气流核心项目而不删除您的自定义功能。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33510365

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档