问题陈述是我们正在自定义PubSubToBQ提供的PubSubToBQ数据流java模板,在该模板中我们将配置多个订阅/主题来读取数据并将数据推送到多个Bigquery表中,这需要作为单个数据流管道执行,以从源读取所有流并推入Bigquery表。但是,当我们从eclipse执行模板时,我们必须在gcs桶上传递订阅/主题和烧烤细节,然后使用具有不同订阅和烧烤细节的gcloud命令运行模板。数据流作业不会被新的订阅表或BQ表覆盖。
目标:我的目标是使用Google提供的PubSubTOBQ.java类模板,并通过相应的Bigquery传递订阅列表,并创建每个表传递订阅的管道。因此,n-n,n个管道在一个单一作业.
我使用谷歌提供的PubSubTOBQ.java类模板,这是作为一个单一的订阅或单一主题和相应的大查询表细节输入。
现在,我需要定制它,以便将输入作为主题列表,或者将订阅列表作为逗号分隔。我可以使用ValueProvider>并在main或run方法中迭代字符串数组,并将订阅/主题和bq表作为字符串传递。有关更多信息,请参阅下面的代码。
我在gcp文档上看到的是,如果我们想要在瘤时重写或使用值来创建动态样条,我们就不能将ValueProvider变量传递到DoFn之外。不确定我们是否可以在DoFn中读取消息。
**PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i])**
如果是的话请告诉我。这样我的目标就实现了。
代码:
public static void main(String[] args) {
StreamingDataflowOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(StreamingDataflowOptions.class);
List<String> listOfSubStr = new ArrayList<String>();
List<String> listOfTopicStr = new ArrayList<String>();
List<String> listOfTableStr = new ArrayList<String>();
String[] providedSubscriptionArray = null;
String[] providedTopicArray = null;
String[] providedTableArray = null;
if (options.getInputSubscription().isAccessible()) {
listOfSubStr = options.getInputSubscription().get();
providedSubscriptionArray = new String[listOfSubStr.size()];
providedSubscriptionArray = createListOfProvidedStringArray(listOfSubStr);
}
if (options.getInputTopic().isAccessible()) {
listOfTopicStr = options.getInputTopic().get();
providedTopicArray = new String[listOfSubStr.size()];
providedTopicArray = createListOfProvidedStringArray(listOfTopicStr);
}
if (options.getOutputTableSpec().isAccessible()) {
listOfTableStr = options.getOutputTableSpec().get();
providedTableArray = new String[listOfSubStr.size()];
providedTableArray = createListOfProvidedStringArray(listOfTableStr);
}
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> readPubSubMessage = null;
for (int i = 0; i < providedSubscriptionArray.length; i++) {
if (options.getUseSubscription()) {
readPubSubMessage = pipeline
.apply(PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i]));
} else {
readPubSubMessage = pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic(providedTopicArray[i]));
}
readPubSubMessage
/*
* Step #2: Transform the PubsubMessages into TableRows
*/
.apply("Convert Message To TableRow", ParDo.of(new PubsubMessageToTableRow()))
.apply("Insert Data To BigQuery",
BigQueryIO.writeTableRows().to(providedTableArray[i])
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
}
pipeline.run().waitUntilFinish();
}
应该能够使用单个数据流PubSubTOBQ模板来处理与单个数据流流作业中的bigquery模板数量相对应的多条订阅管道。
发布于 2019-07-16 19:53:24
问题是,到目前为止,Dataflow模板需要在分阶段/创建时知道管道图,这样它在运行时就不会有所不同。如果您仍然希望使用非模板化管道并将逗号分隔的Pub/Sub主题列表作为--topicList
选项参数传递,则可以执行以下操作:
String[] listOfTopicStr = options.getTopicList().split(",");
PCollection[] p = new PCollection[listOfTopicStr.length];
for (int i = 0; i < listOfTopicStr.length; i++) {
p[i] = pipeline
.apply(PubsubIO.readStrings().fromTopic(listOfTopicStr[i]))
.apply(ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Log.info(String.format("Message=%s", c.element()));
}
}));
}
全代码这里。
如果我们用3个主题来测试它,比如:
mvn -Pdataflow-runner compile -e exec:java \
-Dexec.mainClass=com.dataflow.samples.MultipleTopics \
-Dexec.args="--project=$PROJECT \
--topicList=projects/$PROJECT/topics/topic1,projects/$PROJECT/topics/topic2,projects/$PROJECT/topics/topic3 \
--stagingLocation=gs://$BUCKET/staging/ \
--runner=DataflowRunner"
gcloud pubsub topics publish topic1 --message="message 1"
gcloud pubsub topics publish topic2 --message="message 2"
gcloud pubsub topics publish topic3 --message="message 3"
输出和数据流图将如预期的那样:
将这种方法引入模板的一个可能的解决办法是在最坏的情况下拥有足够多的主题( N
)。当我们使用n
主题(满足n <= N
)执行模板时,我们需要指定未使用/虚拟主题来填充。
https://stackoverflow.com/questions/57053573
复制相似问题