前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink SQL 自定义 format

Flink SQL 自定义 format

作者头像
shengjk1
发布2020-10-27 11:01:17
2.4K0
发布2020-10-27 11:01:17
举报
文章被收录于专栏:码字搬砖

1.背景

由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format。

2.步骤

1.自定义 Factory 实现 DeserializationFormatFactory 2.自定义 DeserializationSchema 实现 DeserializationSchema 3. 自定义 Factory 中 createDecodingFormat 方法返回 createDecodingFormat

3.自定义 Format

为了简单起见,我们自定义一个 NullFormat ,也就是无论 kafka 中的消息是什么都返回 null,相当于 kafka 中没有消息

自定义 Factory

代码语言:javascript
复制
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static org.apache.flink.formats.json.JsonOptions.*;

/**
 * Table format factory for providing configured instances of JSON to RowData
 * {@link SerializationSchema} and {@link DeserializationSchema}.
 */
public class NullFormatFactory implements
		DeserializationFormatFactory {
	// Factory 的唯一标识
	public static final String IDENTIFIER = "null";
	
	@SuppressWarnings("unchecked")
	@Override
	// 解码的入口方法 基本上属于固定写法
	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
			DynamicTableFactory.Context context,
			ReadableConfig formatOptions) {
		FactoryUtil.validateFactoryOptions(this, formatOptions);
		validateFormatOptions(formatOptions);
		
		final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
		final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
		TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
		
		return new DecodingFormat<DeserializationSchema<RowData>>() {
			@Override
			public DeserializationSchema<RowData> createRuntimeDecoder(
					DynamicTableSource.Context context,//ScanRuntimeProviderContext
					DataType producedDataType) { // 表的字段名和数据类型
				final RowType rowType = (RowType) producedDataType.getLogicalType();
				final TypeInformation<RowData> rowDataTypeInfo =
						(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
				return new NullRowDataDeserializationSchema(
						rowType,
						rowDataTypeInfo,
						failOnMissingField,
						ignoreParseErrors,
						timestampOption
				);
			}
			
			@Override
			public ChangelogMode getChangelogMode() {
				return ChangelogMode.insertOnly();
			}
		};
	}
	
	
	@Override
	public String factoryIdentifier() {
		return IDENTIFIER;
	}
	
	@Override
	public Set<ConfigOption<?>> requiredOptions() {
		return Collections.emptySet();
	}
	
	@Override
	public Set<ConfigOption<?>> optionalOptions() {
		Set<ConfigOption<?>> options = new HashSet<>();
		options.add(FAIL_ON_MISSING_FIELD);
		options.add(IGNORE_PARSE_ERRORS);
		options.add(TIMESTAMP_FORMAT);
		return options;
	}
	
	// ------------------------------------------------------------------------
	//  Validation
	// ------------------------------------------------------------------------
	
	static void validateFormatOptions(ReadableConfig tableOptions) {
		boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD);
		boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS);
		String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);
		if (ignoreParseErrors && failOnMissingField) {
			throw new ValidationException(FAIL_ON_MISSING_FIELD.key()
					+ " and "
					+ IGNORE_PARSE_ERRORS.key()
					+ " shouldn't both be true.");
		}
		if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)) {
			throw new ValidationException(String.format("Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].",
					timestampFormat, TIMESTAMP_FORMAT.key()));
		}
	}
}

自定义 DeserializationSchema

代码语言:javascript
复制
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import java.io.IOException;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

@Internal
public class NullRowDataDeserializationSchema implements DeserializationSchema<RowData> {
	private static final long serialVersionUID = 1L;
	
	/**
	 * Flag indicating whether to fail if a field is missing.
	 */
	private final boolean failOnMissingField;
	
	/**
	 * Flag indicating whether to ignore invalid fields/rows (default: throw an exception).
	 */
	private final boolean ignoreParseErrors;
	
	/**
	 * TypeInformation of the produced {@link RowData}.
	 **/
	private final TypeInformation<RowData> resultTypeInfo;
	
	/**
	 * Runtime converter that converts {@link JsonNode}s into
	 * objects of Flink SQL internal data structures.
	 **/
	
	/**
	 * Object mapper for parsing the JSON.
	 */
	private final ObjectMapper objectMapper = new ObjectMapper();
	
	/**
	 * Timestamp format specification which is used to parse timestamp.
	 */
	private final TimestampFormat timestampFormat;
	
	public NullRowDataDeserializationSchema(
			RowType rowType,
			TypeInformation<RowData> resultTypeInfo,
			boolean failOnMissingField,
			boolean ignoreParseErrors,
			TimestampFormat timestampFormat) {
		if (ignoreParseErrors && failOnMissingField) {
			throw new IllegalArgumentException(
					"JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
		}
		this.resultTypeInfo = checkNotNull(resultTypeInfo);
		this.failOnMissingField = failOnMissingField;
		this.ignoreParseErrors = ignoreParseErrors;
		this.timestampFormat = timestampFormat;
	}
	
	@Override
	// 这里其实是真正的反序列化逻辑,比如说将 json 拍平 (多层嵌套转化为一层嵌套 )
	// 这里是重点,记得关注重点
	public RowData deserialize(byte[] message) throws IOException {
		return null;
	}
	
	@Override
	public boolean isEndOfStream(RowData nextElement) {
		return false;
	}
	
	@Override
	public TypeInformation<RowData> getProducedType() {
		return resultTypeInfo;
	}
	
	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}
		if (o == null || getClass() != o.getClass()) {
			return false;
		}
		NullRowDataDeserializationSchema that = (NullRowDataDeserializationSchema) o;
		return failOnMissingField == that.failOnMissingField &&
				ignoreParseErrors == that.ignoreParseErrors &&
				resultTypeInfo.equals(that.resultTypeInfo) &&
				timestampFormat.equals(that.timestampFormat);
	}
	
	@Override
	public int hashCode() {
		return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat);
	}
}

4.使用自定义 Format

代码语言:javascript
复制
public class SqlKafka {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
		// enable checkpointing
		Configuration configuration = tableEnv.getConfig().getConfiguration();
		configuration.set(
				ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
		configuration.set(
				ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
		
		String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +
				") WITH (" +
				"'connector' = 'kafka','topic' = 'canal_monitor_order'," +
				"'properties.bootstrap.servers' = 'bigdata-dev-mq:9092','properties.group.id' = 'testGroup'," +
				"'format' = 'null','scan.startup.mode' = 'earliest-offset')";
		tableEnv.executeSql(sql);
		......

‘format’ = ‘null’ Factory 的唯一标识

然后就可以直接执行了

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/10/20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.背景
  • 2.步骤
  • 3.自定义 Format
  • 4.使用自定义 Format
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档