首先,我为我的英语道歉。
我想使用jpa来分组,比如: select scrip,dustup,count(*) from data flow group by scrip,dstip。因此,编写以下代码:
public class DataflowSpec {
public static Specification<Dataflow> search(final String[] group, final String[] sort, final String[] desc) {
return new Specification<Dataflow>
我正在尝试修改示例以处理CSV文件。
我已经将readObject更改为解析CSV并将其添加到datum对象中。在运行管道时,我会得到以下错误:
(2b01c6a9d56ae128):com.google.cloud.dataflow.sdk.coders.CoderException:不能在com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnCon
在某些情况下,当从BigQuery加载或连接数据时,我们会在作业上得到"Write rejected“错误,即使相同的作业可以很好地处理相同大小的不同表。
我添加了详细的错误消息:
java.lang.RuntimeException: java.io.IOException: INTERNAL: Write rejected (writer id not found) when talking to tcp://localhost:12345 at
com.google.common.base.Throwables.propagate(Throwables.java:160) at
我是spring-cloud-data-flow的新手。我正在遵循Spring Cloud Data Flow (https://dataflow.spring.io/docs/installation/local/docker/)的文档。我已经下载了docker-compose.yml文件,并将其放在D:\Dev\spring-cloud-dataflow>目录中。当我尝试运行以下命令时: D:\Dev\spring-cloud-dataflow> set DATAFLOW_VERSION=2.1.0.RELEASE
D:\Dev\spring-cloud-da
我在他们的web上使用Pivotal提供的de docker-compose,当尝试使用Docker版本的时间戳启动任务时,日志显示以下消息:
dataflow-server | 2020-06-15 07:48:51.867 INFO 1 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalTaskLauncher : Preparing to run an application from Docker Resource [docker:springcloudtask/timestamp-task:2.1.1.RELEASE]. Th
光束作业的流水线给出了以下异常
java.lang.RuntimeException: java.lang.RuntimeException: Exception while fetching side input:
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunn
有时我在运行流数据流时出现以下异常:
exception: "java.lang.RuntimeException: Exception while fetching side input:
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher.fetchSideInput(StateFetcher.java:184)
at com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext.fetchSideInput(Stre
java.lang.IllegalArgumentException: FakeKeyedWorkItemCoder only works with KeyedWorkItemCoder or KvCoder; was: class org.apache.beam.sdk.coders.LengthPrefixCoder
com.google.cloud.dataflow.worker.WindmillKeyedWorkItem$FakeKeyedWorkItemCoder.<init>(WindmillKeyedWorkItem.java:211)
com.goo
我正在解析来自流的XML,并将POJO分派到ProcessContext.output。它是跟随ClosedChannelException抛出的。知道是怎么回事吗?
com.google.cloud.dataflow.sdk.util.UserCodeException: java.nio.channels.ClosedChannelException
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:193)
at com.google.cloud.da
我正在尝试处理来自BigQuery的2.5TB数据。管道开始的代码:
Pipeline p = Pipeline.create(options);
p.apply(BigQueryIO.Read.fromQuery(
"select * from table_query(Events, 'table_id contains \"20150601\"') where id is not null"))
.apply(ParDo.of(new DoFn<TableRow, KV<St
我试图通过气流脚本执行数据流jar。为此,我正在使用DataFlowJavaOperator。在param中,当我试图运行以下作业时,我将传递本地system.But中显示的可执行jar文件的路径。
{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
[2017-09-12 16:59:38,225] {models.py:1417} ERROR - DataFlow failed with return code 1
Traceback (most recent call last
我正在构建一个Dataflow管道示例,主要基于的代码
但是,当我运行我的代码时,我会遇到以下异常:
线程"main“中的异常:无法序列化com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$TestCombineDoFn@139982de at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51) at com.google.cloud.dataflow.sdk.util.
在我的数据流作业中,我使用protobuf为消息生成的java类。我一直试图让事情运行,但依赖项(jar地狱)出现了一些问题,导致反序列化失败,但有以下例外:
java.lang.NoClassDefFoundError: Could not initialize class com.brightcove.rna.model.Metadata$MetadataDefaultEntryHolder
at com.brightcove.rna.model.Metadata.(Metadata.java:50)
at com.brightcove.rna.model.Metadata
我试图在GCS Dataflow上上传/运行一个Apache项目,但我得到了
INFO:apache_beam.runners.dataflow.internal.apiclient:Defaulting to the temp_location as staging_location: gs://dataflow-staging-europe-west4-10a485e03dda20c80122afcef299fc02
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://datafl
我正在尝试通过Dataflow将突变(增量)应用于Bigtable,使用的是云-bigtable-client ()。
以下是我工作的高级总结:
PCollection<SomeData> somedata = ...;
somedata.apply(ParDo.of(new CreateMutations()))
.setCoder(new HBaseMutationCoder()).apply(CloudBigtableIO.writeToTable(config));
// I don't think it is necessary to ex
Beam2.1管道在有状态的ValueState中使用DoFn。它在单个工作人员中运行良好,但是当启用缩放时,如果“无法从状态读取值”和下面的根异常,则会失败。有什么原因吗?
Caused by: java.util.concurrent.ExecutionException: com.google.cloud.dataflow.worker.KeyTokenInvalidException: Unable to fetch data due to token mismatch for key ��
at com.google.cloud.dataflow.worker.repackaged.
当我运行我的数据流管道时,我得到下面的异常,抱怨我的DoFn不能被序列化。我该如何解决这个问题?
下面是堆栈跟踪:
Caused by: java.lang.IllegalArgumentException: unable to serialize contrail.dataflow.AvroMRTransforms$AvroReducerDoFn@bba0fc2
at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
at c
我正在使用GCP数据流运行Apache Beam管道,并从worker那里获得了以下错误:
Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException: Got poison pill or timeout but stream is not done
两分钟内就有了。
我正在使用管道将消息从PubSub写到BigQuery。在管道中,当将PubSub消息转换为TableRow时,我使用的是FailsafeElement<PubsubMessage, String>,并
我尝试在Google Cloud Dataflow中运行Apache光束管道(Python),这是由Google Cloud Coomposer中的DAG触发的。 我的dags文件夹在各自的GCS存储桶中的结构如下: /dags/
dataflow.py <- DAG
dataflow/
pipeline.py <- pipeline
setup.py
my_modules/
__init__.py
commons.py <- the module I want to import in the pipeline se
我试图通过一个(已经存在的) AppEngine应用程序启动数据流作业。DataFlow作业读取存储在DataStore中的GAE应用程序生成的数据,并将处理过的数据写入BigQuery。我收到了以下错误。
java.lang.SecurityException: Google App Engine does not support Runtime.addShutdownHook
at com.google.appengine.runtime.Request.process-a010d936cef53bc8(Request.java)
at java.lang.Runtime.addShutd
在运行一些从g3读取的作业时,我得到了以下异常,然后按键对数据进行分组。异常发生在读取过程中。
com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native方法( com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.outputChunk(ShuffleSink.java:293) at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSin
我有一个数据流,它向API发出请求以检索一些数据。最近API中的密码有了更新,数据流突然开始失败。我使用的是java 1.8和beam SDK 2.19.0。同样的代码在本地运行时也可以正常运行。我尝试升级到java 11和same SDK 2.24.0,以防我使用的版本不支持新的密码,但我得到了相同的结果,它在本地运行,但我在数据流中得到了相同的错误。这是我用来向API发出请求的代码:
URL url = new URL(urlString);
HttpsURLConnection con = null;
outer: for (int ret