我正在为“循环”创建一个具有多个‘的动态DAG。它正确地启动了流量,但是在下游没有很好的连接。任务'dummy_ender__a‘正如期而至地连接到'toto_a’。但我希望'dummy_ender_1_a‘& 'dummy_ender_2_a’也能连接到下游任务toto_a。我不知道我在这里错过了什么。以下是代码:
import datetime
from airflow import models
from airflow.utils import dates
from airflow.operators.dummy_operator import
我试图从这个类中编写一个子类:
class InsertCreateAlter(object):
def __init__(self, url=None, engine=None, connection=None, session=None,
**kwargs):
if connection is not None:
self.engine = connection
elif engine is not None:
self.engine = engine
elif session is not None:
我试图在PythonOperator、_etl_lasic之间将数据传递给另一个运行良好的PythonOperator _download_s3_data,但是当传递的值是None时,我想抛出一个异常,它应该将任务标记为失败。
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowFailException
def _etl_lasic(**context):
path_s3 =
问题:当我使用连接器调用雪花中的存储过程时,会引发一个异常,其内容是:“名称‘EmptyArrowIterator’未定义”。
我的目标:使用雪花的Python连接器调用存储过程并将返回值保存到变量。
我尝试过的:
我的代码:
import pandas as pd
#my code is long...a dataframe called credentials is defined elsewhere in my code and is not shown below
def logging_sproc(credentials):
try:
#import sn
我试图修改ETL,但我发现老开发人员直接在连接上执行他的命令( ETL已经运行了几年)。当我尝试自己做这件事时,我得到了一个错误(因为我的编译器希望我从游标中做这件事)。 from etl.utils.logging import info
from etl.mysql.connect import db, db_name
from etl.mysql.operations import add_column_if_not_exists
from etl.utils.array import chunks
from pprint import pprint
def add_column_
我在postgres中创建了一个函数并得到了奇怪的错误。我做错了什么?我也想看看你的变体是怎么做的。
CREATE OR REPLACE FUNCTION export_csv(request TEXT, filename VARCHAR(255))
RETURNS VOID AS
$$
BEGIN
EXECUTE 'COPY (' || request || ') TO "/home/r90t/work/study/etl/postgres_etl/export/' || filename || '" WITH CSV;'
同事们我们需要帮助。有两个dags --父级和子级,父级有自己的调度,假设'30 *** ',子'1 -17***-5‘,子节点等待父级执行,例如40分钟,如果父级以错误结束,那么子类也会崩溃,否则将执行子类的下一个任务。问题是,即使在最简单的情况下,这也不起作用,我不知道如何同步它们。我写了这样的代码:
Dag亲本
import time
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_op
My :我正在使用脚本从几个前提数据源中提取数据,并将它们加载到Azure (原始数据)中。之后,我将执行一些ETL操作,并在PowerBI中使用该结果。
我想要达到的目标:目前我能够通过一个预定的触发器在Azure中执行操作。但是,我不想“浪费”时间,我希望在Python脚本完成后触发ETL。
示例::脚本在早上8点结束。为了构建一些空间,预定的Data运行时间是早上8:30。这是我正在寻找一些优化的部分,以便在Python脚本完成后,Data运行可以自动启动。
(FYI:之后,当ETL操作完成时,PowerBI报告会自动刷新)
使用这些工具的: Microsoft Azure Data F
我正在尝试使用读取,该use将用于识别日文汉字字符的神经网络。我最初是在我的系统上使用它,我能够正确地加载这个包,但我没有足够的资源在本地工作。
我转而使用Google Colab,当我尝试运行加载包时:
from etldr.etl_data_reader import ETLDataReader #specifically this line here
from etldr.etl_character_groups import ETLCharacterGroups
from etldr.etl_data_names import ETLDataNames
我得到了这个错误:
File &