我们正在创建一个数据流管道,它将获得一个JSON并写入一个parquet文件。我们使用org.apache.beam.sdk.io.parquet包来编写文件。ParquetIO.Sink允许您将PCollection of GenericRecord写入Parquet文件(从这里开始,https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html)。现在我们要知道如何将JsonObject (具有复杂结构)转换为GenericRecord。
我们尝试使用(org.apache.avro.generic.GenericRecordBuilder).生成GenericRecord。我们使用的是来自JsonObject的com.google.gson.JsonObject,但是我们被困住了如何转换为带有对象的JsonArray生成GenericRecord
我们的样本Json
{
"event_name": "added_to_cart",
"event_id": "AMKL9877",
"attributes": [
{"key": "total", "value": "8982", "type": "double"},
{"key": "order_id", "value": "AKM1011", "type": "string"}
]
}我们的模式
{
"type":"record",
"name":"event",
"fields":[
{
"name":"event_name",
"type":"string"
},
{
"name":"event_id",
"type":"string"
},
{
"name":"attributes",
"type":{
"type":"array",
"items":{
"type":"record",
"name":"attribute_data",
"fields":[
{
"name":"key",
"type":"string"
},
{
"name":"value",
"type":"string"
},
{
"name":"type",
"type":"string"
}
]
}
}
}
]
}我们的代码用于使用JsonObject将GenericRecord转换为GenericRecordBuilder
JsonObject event = element.getAsJsonObject();
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(SCHEMA);
for (Schema.Field field:SCHEMA.getFields()) {
System.out.println(field);
String at_header = field.getProp(FIELD_AT_HEADER_PROPERTY);
System.out.println(at_header);
if(at_header != null && at_header.equals(Boolean.TRUE.toString())){
recordBuilder.set(field.name(), null);
}else{
JsonElement keyElement = event.get(field.name());
recordBuilder.set(field.name(), getElementAsType(field.schema(), keyElement));
}
}
return recordBuilder.build();
Object getElementAsType(Schema schema, JsonElement element) {
if(element == null || element.isJsonNull())
return null;
switch(schema.getType()){
case BOOLEAN:
return element.getAsBoolean();
case DOUBLE:
return element.getAsDouble();
case FLOAT:
return element.getAsFloat();
case INT:
return element.getAsInt();
case LONG:
return element.getAsLong();
case NULL:
return null;
case ARRAY:
???
case MAP:
???
default:
return element.getAsString();
}我们需要知道如何为复杂类型构建GenericRecord,比如从JSON映射的对象数组。提前谢谢。
发布于 2019-01-28 07:55:12
在这里,我找到了我的答案,从这页https://avro.apache.org/docs/1.8.2/api/java/org/apache/avro/generic/package-summary.html
Avro数据的泛型表示形式。
这种表示形式最适合处理动态数据的应用程序,这些应用程序的模式直到运行时才知道。
Avro模式映射到Java类型,如下所示:
https://stackoverflow.com/questions/53966290
复制相似问题