卡夫卡,Flink和Tidb的新版本。假设我有三个源MySql表-- s_a、s_b和s_c,并希望实时收集记录以针对TiDb表t_a和t_b。映射规则是
`s_a`  --> `t_a`                   
`s_b` union `s_c`   ---> `t_b`  with some transformation (e.g., field remapping).我采用的解决方案是kafka +带有Tidb接收器的Flink,其中binlog更改被订阅到Kafka主题;Flink使用该主题并将转换结果写入Tidb。对我来说,flink代码部分的问题是:
t_a或t_b )。我找到了一个名为Debezium的工具,名为Kafka&Flink连接器,但它似乎需要源表和目标表之间的相等。。
如果我有多个目标表,
VKDataMapper。我很难定义T,因为它可以是t_a DTO(数据传输对象)或t_b DTO.。
对我来说,现有的示例代码如下:
//主要例程。
   StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
       //consume is FlinkkafkaConsumer. TopicFilter returns true. 
        environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
                .addSink(new TidbSink());
        try {
            environment.execute();
        } catch (Exception e) {
            log.error("exception {}", e);
        }
 
 
 public class VKDataMapper implements MapFunction<String, T> {
    @Override
    public T map(String value) throws Exception {
        //How T can represents both `T_a data DTO` `T_b`...., 
        return null;
    }
}发布于 2020-11-27 08:56:59
为什么不试试Flink SQL呢?这样,您只需要在Flink中创建一些表,然后通过sql定义任务如下:
insert into t_a select * from s_a;
insert into t_b select * from s_b union select * from s_c;请参阅https://github.com/LittleFall/flink-tidb-rdw中的一些示例,可以随意询问任何让您感到困惑的事情。
https://stackoverflow.com/questions/64931487
复制相似问题