我现在开始工作,我需要一些帮助,我有一个自定义模型,我使用apachebeam创建了一个管道,该管道从存储桶内的文件夹中获取来自csv文件的数据,然后将数据抛到bigquery表中,这个表已经按照我的要求工作,但是由于它是一个批处理管道,它只在我运行数据流时运行,所以我想自动化这个函数,条件是当加载一个新文件时,这个作业自行运行,我怎么做呢?模板数据流import os
fro
我使用的是Apache Bea,Python和DataFlow,还有BigQuery。我需要为pcollection的每个元素分配一个序列号,以便将其加载到BigQuery中,但我找不到任何方法来做到这一点。我认为我需要DataFlow来进行前面的聚合和连接,以获得添加序列号的最终pcollection,但此时我需要停止并行处理,并将pcollection转换为一个列表(就像使用.collect()时在Spark这是我编写的管道:
p
我使用Pub/Sub到BigQuery 来流发送到Pub/Sub主题的JSON数据。通过Dataflow,我希望平平数据以匹配BigQuery模式并对它们进行流。:272)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValu