我们正在创建一个数据流管道,我们将从postgres读取数据并将其写入一个拼花文件。我们使用org.apache.beam.sdk.io.jdbc读取文件,使用org.apache.beam.sdk.io.parquet包写入文件。ParquetIO.Sink允许您将PCollection of GenericRecord写入Parquet文件(从这里开始,)。
到目前为止,这是我的代码:
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
Schema
我编写了一个代码来使用Apache来执行一个LeftOuterJoin -- apache提供的类可以轻松地工作--提供一个连接类LeftOuterJoin,而当我使用POJO类或字符串、Integer,以KV格式长,但是当我在KV中使用TableRow并抛出一个异常时,整个代码都能正常工作。我也分享了下面的代码,以供参考.
Apr 12, 2018 6:26:03 PM org.apache.beam.sdk.Pipeline validate
WARNING: The following transforms do not have stable unique names: ParDo(A
我们有一个步骤,在Dataflow的newline上拆分Pubsub消息。我们有一个通过代码的测试,但它似乎在生产中失败了。看起来我们一次在多个地方得到相同的Pubsub消息(至少据我所知)。
我们应该用另一种方式写第一次考试吗?或者,这仅仅是一个关于在Apache中不做什么的艰难教训吗?
import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.ut
我已经创建了一个用于在数据流上运行的beam管道p,并且希望在运行管道之前将一些东西写入文件中。我的代码是:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import time
pipeline_options = PipelineOptions(runner='DirectRunner')
pipeline_opt
我试着熟悉Apache beam Kafka IO,并得到以下错误 Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
使用“数据流作业”从bigQuery表读取数据时,要尽量避免集合中的重复。对于这一点,使用beam.sdk.transforms.Distinct来读取带有distinct的记录。但却在错误之下
java.lang.IllegalStateException: the keyCoder of a GroupByKey must be deterministic
at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:193)
at org.apache.beam.sdk.transforms.Gro
我正在使用Apache光束,并将数据写入BigQuery。我的流水线在intellij中使用Direct runner在本地工作得很好,并且我能够在intellij中本地写入BigQuery表。然而,一旦我在Spark Cluster上部署了代码,我就得到了异常"java.lang.IllegalArgumentException: Invalid lambda反序列化“。 User class threw exception: java.lang.IllegalArgumentException: unable to deserialize Custom DoFn With Exe
我正在尝试使用apache的WriteToKafka类(python )编写一个流到Kafka主题。但是,它无休止地运行脚本(没有错误),并且不向主题写入流。我必须取消运行,它不会停止,不会产生错误。任何帮助都是非常感谢的。下面可以找到一个很小的例子来重现问题
from typing import Tuple
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import WriteToK
我试图使用DynamicDestinations写入BigQuery中的分区表,其中分区名为mytable$yyyyMMdd。如果我绕过了动态目的地,并在.to()中提供了一个硬编码的表名,那么它就能工作;但是,对于动态目的地,我得到了以下例外:
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1@6fff253c
at org.apache.beam.sdk.util.SerializableUtils.serializeToByt
我正在尝试编写一个光束流管道,它简单地从PubSub队列中读取数据,解析数据并写入两个BigQuery表中的任何一个。因此,代码利用副输出从DoFn中写入两个表中的一个。我遇到以下错误消息: java.lang.IllegalArgumentException: unable to serialize DoFnAndMainOutput{doFn=com.pipeline.PubSubToBigQuery$ParsePubSubMessage@50eca7c6,mainOutputTag=Tag}。我将完整的错误消息、DoFn和测试类附加到下面: DoFn: public static cl
我正在尝试设置一个数据流作业,用于将json文件转换为csv,并使用下面的python脚本将其写入桶中。(我在3.8.13中尝试了这一点),因为我使用的是apache。我尝试过改变许多版本的python和google云存储。在不使用存储库的情况下,有什么替代方法吗?
import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
from smart_open impor
当我试图发送或接收一个文件,我一直得到这个错误写在下面。我试图找出调试的原因,每次都停在这一行:
ObjectInputStream Inputs = new ObjectInputStream (socket.getInputStream ());
以下是在输入键盘文件路径后出现的运行错误:
java.io.EOFException
at java.io.ObjectInputStream$PeekInputStream.readFully (ObjectInputStream.java:2353)
at java.io.ObjectInputStream$BlockDataInpu
我遇到了一个“无法为org.apache.hadoop.hbase.client.Mutation提供编码器”的问题。在FlinkRunner中使用HbaseIO。例外情况如下:
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for ParDo(HBaseProfile)/ParMultiDo(HBaseProfile).output [PCollection]. Correct one of the following root caus
在运行数据流作业时,我得到了以下错误。我正在尝试将我现有的beam版本更新为2.11.0,但是在运行时,我会出现以下错误。
org.apache.beam.model.pipeline.v1.RunnerApi$StandardPTransforms$Primitives类没有实现请求的接口com.google.protobuf.ProtocolMessageEnum在org.apache.beam.runners.core.construction.BeamUrns.getUrn(BeamUrns.java:27) at org.apache.beam.runners.core.const
如我们所知,在java中使用Serialization,引用博客的话。
将对象的状态转换为字节流,可以持久化到磁盘/文件中,也可以通过网络发送到运行Java虚拟机的任何其他机器。
REST案例:
现在考虑第二种情况,将一个网络发送到另一个正在运行的jvm,如果我们考虑一个Rest 的例子,即“:port/path/”。
通常,我使用Spring的@RequestMapping将资源模型pojo类作为ResponseEntity返回。我Serializable 没有在模型类中实现接口,而且一切都很好,我在json中得到了API的响应。
ModelX.java
public c
目前,我在Python中有一个apache-beam管道,在该管道中,我正在读取拼花,将其转换为dataframe来进行一些熊猫的清理,然后将其转换回我想要编写文件的parquet。看起来是这样的:
with beam.Pipeline(options=pipeline_options) as p:
dataframes = p \
| 'Read' >> beam.io.ReadFromParquetBatched(known_args.input) \
| 'Convert to pandas' >&g
我很难将apache管道编译成数据流模板。我使用自定义aws凭据提供程序类从GCP秘密管理器加载基本aws凭据,并在构建模板时看到以下异常:
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'awsCredentialsProvider' with value 'org.example.iot.b
这是我编写的Apache:
public class NormalizeTransform
extends PTransform<PCollection<String>, PCollection<SimpleTable>> {
@Override
public PCollection<SimpleTable> expand(PCollection<String> lines) {
ExtractFields extract_i = new ExtractFields();
PCollection<SimpleTab
我在我的苹果电脑上使用DirectRunner测试我的数据流流水线,得到了很多这样的“警告”消息,我可能知道如何摆脱它们,因为它太多了,以至于我甚至看不到我的调试消息。
谢谢
Apr 05, 2018 2:14:48 PM org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector verifyUnmodifiedThrowingCheckedExceptions
WARNING: Coder of type class org.apache.beam.sdk.coders.SerializableCoder ha
我正在尝试从一桶谷歌云平台编写一个csv到数据存储,其中包含法语字符/口音,但我有一条关于解码的错误消息。
在尝试了从“拉丁语-1”到"utf-8“的编码和解码,但没有成功(使用unicode、unicodedata和编解码器)之后,我尝试手动更改.
我使用的Os默认使用"ascii“编码,我在”Anaconda3 3/envs/py27/lib/site.py“中手动将其更改为utf-8。
def setencoding():
"""Set the string encoding used by the Unicode implementati
我是Apache beam的新手,我使用Apache beam,在GCP.I中作为运行程序使用Dataflow,在执行管道时会出现以下错误。
coder of type class org.apache.beam.sdk.coders.ListCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element [Person [businessDay=01042020, departmentId=101, endTime=2020-0