首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在气流上使用对接操作符安装目录不起作用

在气流上使用对接操作符安装目录不起作用
EN

Stack Overflow用户
提问于 2020-11-21 20:24:34
回答 1查看 3K关注 0票数 0

我试图使用码头操作员自动执行一些脚本使用气流。

气流版本:apache-airflow==1.10.12

我想要做的是“复制”我的项目的所有文件(文件夹和文件)到容器使用这段代码。

以下文件ml-intermediate.py位于此目录~/airflow/dags/ml-intermediate.py

代码语言:javascript
复制
"""
Template to convert a Ploomber DAG to Airflow
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from ploomber.spec import DAGSpec
from soopervisor.script.ScriptConfig import ScriptConfig

script_cfg = ScriptConfig.from_path('/home/letyndr/airflow/dags/ml-intermediate')
# Replace the project root to reflect the new location - or maybe just
# write a soopervisor.yaml, then we can we rid of this line
script_cfg.paths.project = '/home/letyndr/airflow/dags/ml-intermediate'

# TODO: use lazy_import from script_cfg
dag_ploomber = DAGSpec('/home/letyndr/airflow/dags/ml-intermediate/pipeline.yaml',
                       lazy_import=True).to_dag()
dag_ploomber.name = "ML Intermediate"

default_args = {
    'start_date': days_ago(0),
}

dag_airflow = DAG(
    dag_ploomber.name.replace(' ', '-'),
    default_args=default_args,
    description='Ploomber dag',
    schedule_interval=None,
)

script_cfg.save_script()

from airflow.operators.docker_operator import DockerOperator
for task_name in dag_ploomber:
    DockerOperator(task_id=task_name,
        image="continuumio/miniconda3",
        api_version="auto",
        auto_remove=True,
        # command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
        command="sleep 600",
        docker_url="unix://var/run/docker.sock",
        volumes=[
            "/home/letyndr/airflow/dags/ml-intermediate:/home/letyndr/airflow/dags/ml-intermediate:rw",
            "/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"
        ],
        working_dir=script_cfg.paths.project,
        dag=dag_airflow,
        container_name=task_name,
    )



for task_name in dag_ploomber:
    task_ploomber = dag_ploomber[task_name]
    task_airflow = dag_airflow.get_task(task_name)

    for upstream in task_ploomber.upstream:
        task_airflow.set_upstream(dag_airflow.get_task(upstream))

dag = dag_airflow

当我使用气流执行这个DAG时,我得到了对接者找不到/home/letyndr/airflow/dags/ml-intermediate/script.sh脚本的错误。我更改了docker操作符sleep 600的执行命令,以进入容器并检查容器中的文件以及校正路径。

例如,当我在容器中时,我可以转到这个路径/home/letyndr/airflow/dags/ml-intermediate/,但是我看不到应该在那里的文件。

我尝试复制气流实现用于Python的Docker的方式,检查包码头操作人员气流的这一部分,特别是这个部分,它创建了码头容器:码头集装箱制造

这是我复制的一个docker实现:

代码语言:javascript
复制
import docker

client = docker.APIClient()

# binds = {
#         "/home/letyndr/airflow/dags": {
#             "bind": "/home/letyndr/airflow/dags",
#             "mode": "rw"
#         },
#         "/home/letyndr/airflow-data/ml-intermediate": {
#             "bind": "/home/letyndr/airflow-data/ml-intermediate",
#             "mode": "rw"
#         }
#     }

binds = ["/home/letyndr/airflow/dags:/home/letyndr/airflow/dags:rw",
"/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"]

container = client.create_container(
    image="continuumio/miniconda3",
    command="sleep 600",
    volumes=["/home/letyndr/airflow/dags", "/home/letyndr/airflow-data/ml-intermediate"],
    host_config=client.create_host_config(binds=binds),
    working_dir="/home/letyndr/airflow/dags",
    name="simple_example",
)

client.start(container=container.get("Id"))

我发现,只有在设置了host_configvolumes时,安装卷才能工作,问题是在气流上的实现只是设置了host_config,而不是volumes。我在方法create_container上添加了参数,它起作用了。

你知道我是否正确地使用了码头操作员,还是这是一个问题?

EN

回答 1

Stack Overflow用户

发布于 2021-07-20 15:52:37

尝试使用mounts参数而不是volumes。这就是在气流文件 / 源代码中定义卷的方式。

所以应该是这样的:

代码语言:javascript
复制
DockerOperator(task_id=task_name,
        image="continuumio/miniconda3",
        api_version="auto",
        auto_remove=True,
        # command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
        command="sleep 600",
        docker_url="unix://var/run/docker.sock",
        mounts=[
            "/home/letyndr/airflow/dags/ml-intermediate:/home/letyndr/airflow/dags/ml-intermediate:rw",
            "/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"
        ],
        working_dir=script_cfg.paths.project,
        dag=dag_airflow,
        container_name=task_name,
    )

以下是其他一些可能有用的可选参数:

  1. host_tmp_dir:指定主机上临时目录的位置,该位置将映射到tmp_dir。如果没有提供,默认使用标准系统临时目录。
  2. tmp_dir:将容器内的点安装到由操作符tmp_dir在主机上创建的临时目录:

编辑:,经过额外的检查后,我发现每个mount项必须是来自docker.typesMount类型。参数volumes也被重命名为mount,作为气流2.1变化量的一部分。下面是来自气流源代码的一个示例。

修改后,代码应该看起来类似于

代码语言:javascript
复制
from docker.types import Mount 

...
...

DockerOperator(task_id=task_name,
    image="continuumio/miniconda3",
    api_version="auto",
    auto_remove=True,
    # command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
    command="sleep 600",
    docker_url="unix://var/run/docker.sock",
    mounts=[
        Mount(
            source='/home/letyndr/airflow/dags/ml-intermediate', 
            target='/home/letyndr/airflow/dags/ml-intermediate:rw', 
            type='bind'
        )
    ],
    working_dir=script_cfg.paths.project,
    dag=dag_airflow,
    container_name=task_name,
)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64947706

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档