首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Flink:多DTO的binlog变换及其在flink中的转换方法

Flink:多DTO的binlog变换及其在flink中的转换方法
EN

Stack Overflow用户
提问于 2020-11-20 14:42:53
回答 1查看 111关注 0票数 0

卡夫卡,Flink和Tidb的新版本。假设我有三个源MySql表-- s_as_bs_c,并希望实时收集记录以针对TiDb表t_at_b。映射规则是

代码语言:javascript
复制
`s_a`  --> `t_a`                   
`s_b` union `s_c`   ---> `t_b`  with some transformation (e.g., field remapping).

我采用的解决方案是kafka +带有Tidb接收器的Flink,其中binlog更改被订阅到Kafka主题;Flink使用该主题并将转换结果写入Tidb。对我来说,flink代码部分的问题是:

  1. 如何轻松地将从kafka轮询的json字符串(包含操作信息、表)还原到不同类型的DTO操作(例如,insert/creat t_at_b )。我找到了一个名为Debezium的工具,名为Kafka&Flink连接器,但它似乎需要源表和目标表之间的相等。

如果我有多个目标表,

  1. 如何编写转换VKDataMapper。我很难定义T,因为它可以是t_a DTO(数据传输对象)或t_b DTO.

对我来说,现有的示例代码如下:

//主要例程。

代码语言:javascript
复制
   StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
       //consume is FlinkkafkaConsumer. TopicFilter returns true. 
        environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
                .addSink(new TidbSink());

        try {
            environment.execute();
        } catch (Exception e) {
            log.error("exception {}", e);
        }
 
 
 public class VKDataMapper implements MapFunction<String, T> {

    @Override
    public T map(String value) throws Exception {
        //How T can represents both `T_a data DTO` `T_b`...., 
        return null;
    }

}
EN

回答 1

Stack Overflow用户

发布于 2020-11-27 08:56:59

为什么不试试Flink SQL呢?这样,您只需要在Flink中创建一些表,然后通过sql定义任务如下:

代码语言:javascript
复制
insert into t_a select * from s_a;
insert into t_b select * from s_b union select * from s_c;

请参阅https://github.com/LittleFall/flink-tidb-rdw中的一些示例,可以随意询问任何让您感到困惑的事情。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64931487

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档