由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format。
1.自定义 Factory 实现 DeserializationFormatFactory 2.自定义 DeserializationSchema 实现 DeserializationSchema 3. 自定义 Factory 中 createDecodingFormat 方法返回 createDecodingFormat
为了简单起见,我们自定义一个 NullFormat ,也就是无论 kafka 中的消息是什么都返回 null,相当于 kafka 中没有消息
自定义 Factory
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.apache.flink.formats.json.JsonOptions.*;
/**
* Table format factory for providing configured instances of JSON to RowData
* {@link SerializationSchema} and {@link DeserializationSchema}.
*/
public class NullFormatFactory implements
DeserializationFormatFactory {
// Factory 的唯一标识
public static final String IDENTIFIER = "null";
@SuppressWarnings("unchecked")
@Override
// 解码的入口方法 基本上属于固定写法
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateFormatOptions(formatOptions);
final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context,//ScanRuntimeProviderContext
DataType producedDataType) { // 表的字段名和数据类型
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
return new NullRowDataDeserializationSchema(
rowType,
rowDataTypeInfo,
failOnMissingField,
ignoreParseErrors,
timestampOption
);
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
};
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FAIL_ON_MISSING_FIELD);
options.add(IGNORE_PARSE_ERRORS);
options.add(TIMESTAMP_FORMAT);
return options;
}
// ------------------------------------------------------------------------
// Validation
// ------------------------------------------------------------------------
static void validateFormatOptions(ReadableConfig tableOptions) {
boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD);
boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS);
String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);
if (ignoreParseErrors && failOnMissingField) {
throw new ValidationException(FAIL_ON_MISSING_FIELD.key()
+ " and "
+ IGNORE_PARSE_ERRORS.key()
+ " shouldn't both be true.");
}
if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)) {
throw new ValidationException(String.format("Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].",
timestampFormat, TIMESTAMP_FORMAT.key()));
}
}
}
自定义 DeserializationSchema
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class NullRowDataDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
/**
* Flag indicating whether to fail if a field is missing.
*/
private final boolean failOnMissingField;
/**
* Flag indicating whether to ignore invalid fields/rows (default: throw an exception).
*/
private final boolean ignoreParseErrors;
/**
* TypeInformation of the produced {@link RowData}.
**/
private final TypeInformation<RowData> resultTypeInfo;
/**
* Runtime converter that converts {@link JsonNode}s into
* objects of Flink SQL internal data structures.
**/
/**
* Object mapper for parsing the JSON.
*/
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* Timestamp format specification which is used to parse timestamp.
*/
private final TimestampFormat timestampFormat;
public NullRowDataDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
boolean failOnMissingField,
boolean ignoreParseErrors,
TimestampFormat timestampFormat) {
if (ignoreParseErrors && failOnMissingField) {
throw new IllegalArgumentException(
"JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
}
this.resultTypeInfo = checkNotNull(resultTypeInfo);
this.failOnMissingField = failOnMissingField;
this.ignoreParseErrors = ignoreParseErrors;
this.timestampFormat = timestampFormat;
}
@Override
// 这里其实是真正的反序列化逻辑,比如说将 json 拍平 (多层嵌套转化为一层嵌套 )
// 这里是重点,记得关注重点
public RowData deserialize(byte[] message) throws IOException {
return null;
}
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
}
@Override
public TypeInformation<RowData> getProducedType() {
return resultTypeInfo;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NullRowDataDeserializationSchema that = (NullRowDataDeserializationSchema) o;
return failOnMissingField == that.failOnMissingField &&
ignoreParseErrors == that.ignoreParseErrors &&
resultTypeInfo.equals(that.resultTypeInfo) &&
timestampFormat.equals(that.timestampFormat);
}
@Override
public int hashCode() {
return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat);
}
}
public class SqlKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
// enable checkpointing
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.set(
ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
configuration.set(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +
") WITH (" +
"'connector' = 'kafka','topic' = 'canal_monitor_order'," +
"'properties.bootstrap.servers' = 'bigdata-dev-mq:9092','properties.group.id' = 'testGroup'," +
"'format' = 'null','scan.startup.mode' = 'earliest-offset')";
tableEnv.executeSql(sql);
......
‘format’ = ‘null’ Factory 的唯一标识
然后就可以直接执行了