首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Mysql To ES By Flink-CDC

Mysql To ES By Flink-CDC

原创
作者头像
沈小翊
发布于 2023-11-27 08:59:31
发布于 2023-11-27 08:59:31
1.5K00
代码可运行
举报
文章被收录于专栏:大数据生态大数据生态
运行总次数:0
代码可运行

本文将介绍如何通过Flink实现Mysql到ES的CDC近实时数据同步

CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、

更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

目前市面上大多数flink cdc到ES的方法都是flink sql client建源端表同步mysql表,建终端表同步关联ES索引,建立一个同步任务

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
insert into es_table select * from mysql_table;

实时地完成Mysql到ES的数据同步,依赖flink内核实现,非常简便。但如果需要在CDC过程中进行数据处理则需要手动建立CDC

1. 环境准备

Mysql 8.0

ElasticSearch 7.16

Flink 1.14.4

JDK 1.8

pom文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7</artifactId>
            <version>1.15.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>   <artifactId>flink-walkthrough-common_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
2. 连接Mysql获取binlog Datastream
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
RestHighLevelClient  client = new RestHighLevelClient(RestClient.builder(new HttpHost("es-ip", 9200, "http")));
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("ip")
                .port(3306)
                .databaseList("database_name")
                .tableList("table_name")
                .username("root")
                .password("password")
             .deserializer(newJsonDebeziumDeserializationSchema())
                .build();

每隔三秒向mysql查询新的binlog

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        DataStream<String> input = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1);
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("es-ip", 9200, "http"));
3. 解析binlog-对应处理ES中数据后sink到ES
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//创建ElasticsearchSink sink to es
        ElasticsearchSink.Builder<String> esSinkBuilder = 
        new ElasticsearchSink.Builder<>(httpHosts,
        new ElasticsearchSinkFunction<String>() {
        //向ES添加数据
        public IndexRequest createIndexRequest(
        String index,HashMap<String,Object> map) 
       {
       return Requests.indexRequest().index(index).source(map);
       }
        //向ES删除数据
       public void delete(String index,String element,
       String before_after) throws IOException {
       System.out.println("删除该数据");
       client.delete(Requests.deleteRequest(index)
       .id(getID(element,before_after)),
       RequestOptions.DEFAULT);
       }
        //根据binlog中字段,在ES中进行多重匹配查询数据ID
        public String getID(String element,String before_after) throws IOException {
        JSONObject object = JSON.parseObject(element);
        JSONObject json_value =object.getJSONObject(before_after);
        if(json_value.toString().equals("null")){
           System.out.println("这是条删除binlog,数据已删除无法找到");
              return "";
           }
        int i = 0;
Set<Map.Entry<String, Object>> entrySet = json_value.entrySet();

        HashMap<String,Object> map = new HashMap<>();
        //通过迭代器获取这段json当中所有的key值
        for (Map.Entry<String, Object> entry : entrySet) {
            map.put("field"+i,entry.getKey());
            map.put("value"+i,entry.getValue());
            i++;
            }
        //添加字段匹配查询
        MultiSearchRequest request = new MultiSearchRequest();
        SearchRequest firstSearchRequest = new SearchRequest();
        for (i = 0; i < entrySet.size(); i++) {
            SearchSourceBuilder searchSourceBuilder = 
            new SearchSourceBuilder();
        //多重查询
searchSourceBuilder.query(QueryBuilders.matchQuery(map.get("field"+i).toString(), map.get("value"+i).toString()));
                              firstSearchRequest.source(searchSourceBuilder);
        request.add(firstSearchRequest);
        }
        //在response中拿到配对数据id
        MultiSearchResponse response = client.msearch
        (request, RequestOptions.DEFAULT);
        MultiSearchResponse.Item firstResponse = response
        .getResponses()[0];
        SearchResponse searchResponse=firstResponse.getResponse();
        SearchHits hits = searchResponse.getHits();
        return firstResponse.getResponse().toString()
        .contains("\"hits\":[]") ? "空数据" : hits.getHits()[0].getId();
        }
        @Override
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        //对binlog进行判断,对ES数据做出"增" | "删" | "改"动作
        String index = "mysql_es";
        if(element.contains("\"before\":null")){   
        //解析增加数据binlog
        JSONObject json_value = JSON.parseObject(element)
        .getJSONObject("after");
        int i = 0;
        Set<Map.Entry<String, Object>> entrySet = json_value
        .entrySet();
        HashMap<String,Object> map = new HashMap<>();
        //通过迭代器获取这段json当中所有的key value
        for (Map.Entry<String, Object> entry : entrySet) {
                map.put(entry.getKey(),entry.getValue());
            }
                                      indexer.add(createIndexRequest(index,map));
         }else if (element.contains("\"after\":null")){         
         //解析删除数据binlog
         try {
             delete(index,element,"before");
            } catch (IOException e) {
             System.out.println("运行异常");
             throw new RuntimeException(e);
            }
          }else if (!element.contains("\"after\":null") && !element.contains("\"before\":null)")){
          try {
              delete(index,element,"before");  
              //解析更新数据binlog
             } catch (IOException e) {
               throw new RuntimeException(e);
            }

           JSONObject json_value = JSON.parseObject(element)
           .getJSONObject("after");

           Set<Map.Entry<String, Object>> entrySet = json_value
           .entrySet();

           HashMap<String,Object> map = new HashMap<>();
            //通过迭代器获取这段json当中所有的key值
           for (Map.Entry<String, Object> entry : entrySet) {
                  map.put(entry.getKey(),entry.getValue());
               }
                            indexer.add(createIndexRequest(index,map));
           }else {
           System.out.println("binlog不在判断范围内");
             }
            }
          }
        );
4. 配置ES sink
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

// provide a RestClientFactory for custom configuration on the internally created REST client
        esSinkBuilder.setRestClientFactory(
                restClientBuilder -> {}
        );

// finally, build and add the sink to the job's pipeline
        input.addSink(esSinkBuilder.build());

        env.execute();
程序局限
  1. 不适用mysql数据库内有相同数据场景,mysql表需要有主键
  2. 不支持断点进行,每次启动程序重新同步
  3. 未考虑字段嵌套场景(多层JSON)

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink CDC 2.2.1 + Flink 1.13 开发一个简单的CDC项目
本文将演示如何使用 Flink DataStream API 开发一个 Flink CDC 应用。
大数据学习指南
2022/05/26
6.2K0
Flink CDC 2.2.1 + Flink 1.13 开发一个简单的CDC项目
Flink CDC
CDC是Change Data Capture(变更数据获取)的简称。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
挽风
2023/12/18
7470
Flink CDC
如何利用 Flink CDC 实现数据增量备份到 Clickhouse
首先什么是CDC ?它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。
麒思妙想
2021/07/19
4.8K0
Flink CDC 2.0原理详解和生产实践
CDC 的全称是 Change Data Capture ,在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向 数据库的变更,是一种用于捕获数据库中数据变更的技术。
王知无-import_bigdata
2022/04/13
4.3K0
Flink CDC 2.0原理详解和生产实践
Flink-kafka源-esSink
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" x
用户5927264
2023/03/16
3990
【Flink】从零搭建实时数据分析系统
除了看过两周 Flink 外,其他的框架都没有接触过,只是简单的拿来用一下,也并不是很了解,所以本篇教程如果有什么错误,欢迎指出。
阿泽 Crz
2020/09/22
2K0
【Flink】从零搭建实时数据分析系统
Flink新增特性 | CDC(Change Data Capture) 原理和实践应用
CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等。
王知无-import_bigdata
2020/12/08
4K0
Flink新增特性 | CDC(Change Data Capture) 原理和实践应用
Flink CDC 与Hudi整合
之前写过Flink CDC sink 到 Iceberg中,本篇主要实践如何CDC到hudi中.
awwewwbbb
2022/05/09
1.2K0
Flink CDC 与Hudi整合
大数据开发之Flink Table操作
前言 本文使用环境版本 Hive:2.3.9 Flink:flink-1.12.7-bin-scala_2.12 依赖 <?xml version="1.0" encoding="UTF-8"?> <
码客说
2022/11/22
4820
(1)通过FlinkSQL将数据写入mysql demo
FlinkSQL的出现,极大程度上降低了Flink的编程门槛,更加容易理解和掌握使用。今天将自己的笔记分享出来,希望能帮助在这方面有需要的朋友。
NBI大数据
2022/08/08
2K0
(1)通过FlinkSQL将数据写入mysql demo
Flink 的三种WordCount(文末领取Flink书籍)
今天是 Flink 从 0 到 1 系列的第 2 篇:《WordCount及FlinkSQL》。
Python编程爱好者
2022/09/21
1.2K0
Flink 的三种WordCount(文末领取Flink书籍)
大数据-Flink编程
groupBy会将一个DataSet转化为一个GroupedDataSet,聚合操作会将GroupedDataSet转化为DataSet。如果聚合前每个元素数据类型是T,聚合后的数据类型仍为T。
码客说
2022/10/04
1.2K0
大数据-Flink编程
深入解读flink sql cdc的使用以及源码分析
CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等.
大数据技术与应用实战
2020/09/16
5.8K0
深入解读flink sql cdc的使用以及源码分析
大数据-Flink版本升级到1.17Maven中的相关依赖
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/overview/
码客说
2023/09/01
3.2K0
FlinkCDC的探索与实践1
本人开通付费的知识群,如果需要可以添加QQ:975863632,需要99.9元即可加入,添加需要备注【云雀课堂知识群】,这里可以获取到上面的源码,如果遇到问题可以一起解决,同时可以一起学习和进步。
石昊
2022/01/15
7020
flink 1.11.2 学习笔记(2)-Source/Transform/Sink
从上一节wordcount的示例可以看到,flink的处理过程分为下面3个步骤:
菩提树下的杨过
2025/08/22
1200
flink 1.11.2 学习笔记(2)-Source/Transform/Sink
001. Flink入门案例-WordCount实时处理
1. maven依赖 <properties> <!-- flink版本好 --> <flink.version>1.8.1</flink.version> <!-- scala主版本号 --> <scala.binary.version>2.11</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId>
CoderJed
2019/07/25
6K1
001. Flink入门案例-WordCount实时处理
flink 1.11.2 学习笔记(2)-Source/Transform/Sink
从上一节wordcount的示例可以看到,flink的处理过程分为下面3个步骤:
菩提树下的杨过
2020/11/24
1.2K0
flink 1.11.2 学习笔记(2)-Source/Transform/Sink
Flink CDC 原理及生产实践
MySQL CDC连接器允许从MySQL数据库读取快照数据和增量数据。本文档根据官网翻译了如何设置MySQL CDC连接器以对MySQL数据库运行SQL查询。
王知无-import_bigdata
2021/01/06
3.7K0
Flink CDC 原理及生产实践
Flink Mysql CDC 统计处理
说明: 该依赖已经内置了debezium进行处理mysql 变更数据并发送了,所以我们不需要额外的方式,简化了异常 mysql → debezium → kafka的这种方式和数据流程。
礼兴
2021/08/16
4.6K1
Flink Mysql CDC 统计处理
相关推荐
Flink CDC 2.2.1 + Flink 1.13 开发一个简单的CDC项目
更多 >
领券
一站式MCP教程库,解锁AI应用新玩法
涵盖代码开发、场景应用、自动测试全流程,助你从零构建专属AI助手
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档