正文前先来一波福利推荐:
福利一:
百万年薪架构师视频,该视频可以学到很多东西,是本人花钱买的VIP课程,学习消化了一年,为了支持一下女朋友公众号也方便大家学习,共享给大家。
福利二:
毕业答辩以及工作上各种答辩,平时积累了不少精品PPT,现在共享给大家,大大小小加起来有几千套,总有适合你的一款,很多是网上是下载不到。
获取方式:
微信关注 精品3分钟 ,id为 jingpin3mins,关注后回复 百万年薪架构师 ,精品收藏PPT 获取云盘链接,谢谢大家支持!
------------------------正文开始---------------------------
生产者配置:
FlinkKafkaProducer09<DoubtEventPreformatDataAvro> convertOutTopicProducer = new FlinkKafkaProducer09<>(
outputTopic,
ConfluentRegistryAvroSerializationSchema.<DoubtEventPreformatDataAvro>ofValue(outputTopic, jobConfig.getKafkaMasterConfig()),
jobConfig.getKafkaMasterConfig(),
(FlinkKafkaPartitioner)null);
ConfluentRegistryAvroSerializationSchema 实现自定义序列化方法:
public class ConfluentRegistryAvroSerializationSchema <V extends SpecificRecord> implements SerializationSchema<V> {
private transient KafkaAvroSerializer kafkaAvroSerializer;
private String topic;
private Map<String, Object> config;
private boolean isKey;
private ConfluentRegistryAvroSerializationSchema(String topic, boolean isKey, Map<String, Object> config) {
this.topic = topic;
this.isKey = isKey;
this.config = config;
initKafkaSerializer();
}
public static ConfluentRegistryAvroSerializationSchema ofValue(String topic, Properties config) {
return new ConfluentRegistryAvroSerializationSchema(topic, false, config);
}
public static ConfluentRegistryAvroSerializationSchema ofKey(String topic, Properties config) {
return new ConfluentRegistryAvroSerializationSchema(topic, true, config);
}
private void initKafkaSerializer(){
kafkaAvroSerializer = new KafkaAvroSerializer();
kafkaAvroSerializer.configure(config, isKey);
}
@Override
public byte[] serialize(V element) {
if(kafkaAvroSerializer == null){
initKafkaSerializer();
}
return kafkaAvroSerializer.serialize(topic, element);
}
}
生产者的数据源:
private DoubtEventPreformatDataAvro convert(JSONObject jsonValue){
avro格式的反序列化:
FlinkKafkaConsumer09<RetryKeyPreformatAvroValue> inputPreformatTopicConsumer = new FlinkKafkaConsumer09<>(
jobConfig.getKafkaInputTopicName(), new RetryKeyPreformatAvroValueDeserializationSchema(schemaUrl), kafkaMasterConfig);
JobUtils.setStartupMode(jobConfig.getStartModeOfInputTopic(), inputPreformatTopicConsumer);
inputPreformatTopicConsumer.setCommitOffsetsOnCheckpoints(true);
自定义实现反序列化的函数:
public class RetryKeyPreformatAvroValueDeserializationSchema
extends AbstractAvroKeyValueDeserializationSchema<KafkaRetryKeyMeta, DoubtEventPreformatDataAvro, RetryKeyPreformatAvroValue>{
public RetryKeyPreformatAvroValueDeserializationSchema(String schemaRegisterUrl) {
super(KafkaRetryKeyMeta.class, DoubtEventPreformatDataAvro.class, RetryKeyPreformatAvroValue.class, schemaRegisterUrl);
}
@Override
protected RetryKeyPreformatAvroValue newInstance() {
return new RetryKeyPreformatAvroValue();
}
}
public abstract class AbstractAvroKeyValueDeserializationSchema<K extends SpecificRecord, V extends SpecificRecord, R extends KeyValueBase<K, V>> extends AbstractKeyValueDeserializationSchema<K, V, R> {
private static final long serialVersionUID = 1509391548173891955L;
public AbstractAvroKeyValueDeserializationSchema() {
}
public AbstractAvroKeyValueDeserializationSchema(Class<K> kClass, Class<V> vClass, Class<R> kvClass, String schemaRegisterUrl) {
this.kClass = kClass;
this.vClass = vClass;
this.kvClass = kvClass;
this.schemaRegisterUrl = schemaRegisterUrl;
}
@Override
DeserializationSchema<K> newKeyDeserializer() {
return ConfluentRegistryAvroDeserializationSchema.forSpecific(kClass, schemaRegisterUrl);
}
@Override
DeserializationSchema<V> newValueDeserializer() {
return ConfluentRegistryAvroDeserializationSchema.forSpecific(vClass, schemaRegisterUrl);
}
}
public abstract class AbstractKeyValueDeserializationSchema<K, V, R extends KeyValueBase<K, V>> implements KafkaDeserializationSchema<R> {
private static final long serialVersionUID = 1509391548173891955L;
private DeserializationSchema<K> keyDeserializer;
private DeserializationSchema<V> valueDeserializer ;
protected Class<K> kClass;
protected Class<V> vClass;
protected Class<R> kvClass;
protected String schemaRegisterUrl;
public AbstractKeyValueDeserializationSchema() {
}
public AbstractKeyValueDeserializationSchema(Class<K> kClass, Class<V> vClass, Class<R> kvClass, String schemaRegisterUrl) {
this.kClass = kClass;
this.vClass = vClass;
this.kvClass = kvClass;
this.schemaRegisterUrl = schemaRegisterUrl;
initDeserializer();
}
private void initDeserializer(){
keyDeserializer = newKeyDeserializer();
valueDeserializer = newValueDeserializer();
}
abstract DeserializationSchema<K> newKeyDeserializer();
abstract DeserializationSchema<V> newValueDeserializer();
@Override
public R deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if(keyDeserializer == null || valueDeserializer == null){
initDeserializer();
}
R keyValue = newInstance();
if(record.key() != null){
try {
keyValue.key = keyDeserializer.deserialize(record.key());
} catch (Exception e) {
}
}
if (record.value() != null) {
try{
keyValue.value = valueDeserializer.deserialize(record.value());
} catch (Exception e) {
}
}
return keyValue;
}
protected abstract R newInstance();
@Override
public boolean isEndOfStream(R nextElement) {
return false;
}
@Override
public TypeInformation<R> getProducedType() {
return getForClass(kvClass);
}
}