我试图使用气流来执行一个简单的任务python。
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)例如,如果我尝试:
气流试验python_test打印2015-01-01
它起作用了!
现在,我想将我的def print_context(ds, **kwargs)函数放到其他python文件中。因此,我创建了一个名为: simple_test.py和change的文件:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)现在我试着再跑一次
气流试验python_test打印2015-01-01
好的!还能用!
但是,如果我创建一个模块,例如,带有文件SimplePython.py的worker模块,导入(from worker import SimplePython)它并尝试:
气流试验python_test打印2015-01-01
它传达了这样的信息:
ImportError:没有名为worker的模块
问题如下:
发布于 2017-11-22 16:28:48
虽然将dags打包到zip中是我看到的唯一受支持的解决方案,但您也可以导入dags文件夹中的模块。如果您使用木偶和git等其他工具自动同步dags文件夹,这是非常有用的。
从问题中我不清楚您的目录结构,因此下面是一个基于典型python项目结构的示例dags文件夹:
└── airflow/dags # root airflow dags folder where all dags live
└── my_dags # git repo project root
├── my_dags # python src root (usually named same as project)
│ ├── my_test_globals.py # file I want to import
│ ├── dag_in_package.py
│ └── dags
│ └── dag_in_subpackage.py
├── README.md # also setup.py, LICENSE, etc here
└── dag_in_project_root.py我遗漏了(必需的[1]) __init__.py文件。注意三个示例dags的位置。你几乎肯定只会把这些地方中的一个用在你所有的笨蛋身上。我把它们都包括在这里是为了举个例子,因为这对进口来说不重要。要从其中任何一个导入my_test_globals:
from my_dags.my_dags import my_test_globals我相信这意味着气流运行时,python被设置为dages目录,因此dags文件夹的每个子目录都可以被视为package。在我的例子中,它是一个额外的中间项目根目录,妨碍了一个典型的包内绝对导入。因此,我们可以像这样重组这个气流项目:
└── airflow/dags # root airflow dags folder where all dags live
└── my_dags # git repo project root & python src root
├── my_test_globals.py # file I want to import
├── dag_in_package.py
├── dags
│ └── dag_in_subpackage.py
├── README.md # also setup.py, LICENSE, etc here
└── dag_in_project_root.py因此,进口商品的外观与我们预期的一样:
from my_dags import my_test_globals发布于 2016-03-31 07:42:30
对于第二个问题: Airflow+Celery将如何在工作节点之间分发所有必需的python源文件?
从文档中看:工作人员需要访问它的DAGS_FOLDER,并且您需要通过自己的方式同步文件系统。一个常见的设置是将您的DAGS_FOLDER存储在Git存储库中,并使用主厨、木偶、Ansible或您在环境中配置机器的任何东西在计算机之间同步。如果您的所有框都有一个公共的挂载点,那么管道文件在那里也应该可以使用。
http://pythonhosted.org/airflow/installation.html?highlight=chef
发布于 2015-11-24 21:16:06
对于你的第一个问题,这是可能的。
我想您应该在同一个目录下创建一个名为__init__.py的空文件,并使用SimplePython.py (在您的例子中是worker目录)。通过这样做,worker目录将被看作是一个python模块。
然后在DAG定义中,尝试from worker.SimplePython import print_context。
在您的情况下,我想如果您为气流编写一个插件会更好,因为您可能希望升级气流核心项目而不删除您的自定义功能。
https://stackoverflow.com/questions/33510365
复制相似问题