首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Mysql To ES By Flink-CDC

Mysql To ES By Flink-CDC

原创
作者头像
沈小翊
发布于 2023-11-27 08:59:31
发布于 2023-11-27 08:59:31
1.3K00
代码可运行
举报
文章被收录于专栏:大数据生态大数据生态
运行总次数:0
代码可运行

本文将介绍如何通过Flink实现Mysql到ES的CDC近实时数据同步

CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、

更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

目前市面上大多数flink cdc到ES的方法都是flink sql client建源端表同步mysql表,建终端表同步关联ES索引,建立一个同步任务

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
insert into es_table select * from mysql_table;

实时地完成Mysql到ES的数据同步,依赖flink内核实现,非常简便。但如果需要在CDC过程中进行数据处理则需要手动建立CDC

1. 环境准备

Mysql 8.0

ElasticSearch 7.16

Flink 1.14.4

JDK 1.8

pom文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7</artifactId>
            <version>1.15.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>   <artifactId>flink-walkthrough-common_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
2. 连接Mysql获取binlog Datastream
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
RestHighLevelClient  client = new RestHighLevelClient(RestClient.builder(new HttpHost("es-ip", 9200, "http")));
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("ip")
                .port(3306)
                .databaseList("database_name")
                .tableList("table_name")
                .username("root")
                .password("password")
             .deserializer(newJsonDebeziumDeserializationSchema())
                .build();

每隔三秒向mysql查询新的binlog

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        DataStream<String> input = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1);
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("es-ip", 9200, "http"));
3. 解析binlog-对应处理ES中数据后sink到ES
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//创建ElasticsearchSink sink to es
        ElasticsearchSink.Builder<String> esSinkBuilder = 
        new ElasticsearchSink.Builder<>(httpHosts,
        new ElasticsearchSinkFunction<String>() {
        //向ES添加数据
        public IndexRequest createIndexRequest(
        String index,HashMap<String,Object> map) 
       {
       return Requests.indexRequest().index(index).source(map);
       }
        //向ES删除数据
       public void delete(String index,String element,
       String before_after) throws IOException {
       System.out.println("删除该数据");
       client.delete(Requests.deleteRequest(index)
       .id(getID(element,before_after)),
       RequestOptions.DEFAULT);
       }
        //根据binlog中字段,在ES中进行多重匹配查询数据ID
        public String getID(String element,String before_after) throws IOException {
        JSONObject object = JSON.parseObject(element);
        JSONObject json_value =object.getJSONObject(before_after);
        if(json_value.toString().equals("null")){
           System.out.println("这是条删除binlog,数据已删除无法找到");
              return "";
           }
        int i = 0;
Set<Map.Entry<String, Object>> entrySet = json_value.entrySet();

        HashMap<String,Object> map = new HashMap<>();
        //通过迭代器获取这段json当中所有的key值
        for (Map.Entry<String, Object> entry : entrySet) {
            map.put("field"+i,entry.getKey());
            map.put("value"+i,entry.getValue());
            i++;
            }
        //添加字段匹配查询
        MultiSearchRequest request = new MultiSearchRequest();
        SearchRequest firstSearchRequest = new SearchRequest();
        for (i = 0; i < entrySet.size(); i++) {
            SearchSourceBuilder searchSourceBuilder = 
            new SearchSourceBuilder();
        //多重查询
searchSourceBuilder.query(QueryBuilders.matchQuery(map.get("field"+i).toString(), map.get("value"+i).toString()));
                              firstSearchRequest.source(searchSourceBuilder);
        request.add(firstSearchRequest);
        }
        //在response中拿到配对数据id
        MultiSearchResponse response = client.msearch
        (request, RequestOptions.DEFAULT);
        MultiSearchResponse.Item firstResponse = response
        .getResponses()[0];
        SearchResponse searchResponse=firstResponse.getResponse();
        SearchHits hits = searchResponse.getHits();
        return firstResponse.getResponse().toString()
        .contains("\"hits\":[]") ? "空数据" : hits.getHits()[0].getId();
        }
        @Override
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        //对binlog进行判断,对ES数据做出"增" | "删" | "改"动作
        String index = "mysql_es";
        if(element.contains("\"before\":null")){   
        //解析增加数据binlog
        JSONObject json_value = JSON.parseObject(element)
        .getJSONObject("after");
        int i = 0;
        Set<Map.Entry<String, Object>> entrySet = json_value
        .entrySet();
        HashMap<String,Object> map = new HashMap<>();
        //通过迭代器获取这段json当中所有的key value
        for (Map.Entry<String, Object> entry : entrySet) {
                map.put(entry.getKey(),entry.getValue());
            }
                                      indexer.add(createIndexRequest(index,map));
         }else if (element.contains("\"after\":null")){         
         //解析删除数据binlog
         try {
             delete(index,element,"before");
            } catch (IOException e) {
             System.out.println("运行异常");
             throw new RuntimeException(e);
            }
          }else if (!element.contains("\"after\":null") && !element.contains("\"before\":null)")){
          try {
              delete(index,element,"before");  
              //解析更新数据binlog
             } catch (IOException e) {
               throw new RuntimeException(e);
            }

           JSONObject json_value = JSON.parseObject(element)
           .getJSONObject("after");

           Set<Map.Entry<String, Object>> entrySet = json_value
           .entrySet();

           HashMap<String,Object> map = new HashMap<>();
            //通过迭代器获取这段json当中所有的key值
           for (Map.Entry<String, Object> entry : entrySet) {
                  map.put(entry.getKey(),entry.getValue());
               }
                            indexer.add(createIndexRequest(index,map));
           }else {
           System.out.println("binlog不在判断范围内");
             }
            }
          }
        );
4. 配置ES sink
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

// provide a RestClientFactory for custom configuration on the internally created REST client
        esSinkBuilder.setRestClientFactory(
                restClientBuilder -> {}
        );

// finally, build and add the sink to the job's pipeline
        input.addSink(esSinkBuilder.build());

        env.execute();
程序局限
  1. 不适用mysql数据库内有相同数据场景,mysql表需要有主键
  2. 不支持断点进行,每次启动程序重新同步
  3. 未考虑字段嵌套场景(多层JSON)

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
35. Search Insert Position(二分法)
Given a sorted array and a target value, return the index if the target is found. If not, return the index where it would be if it were inserted in order.
yesr
2019/03/14
3210
二分法还需要练习练习
力扣题目链接:https://leetcode-cn.com/problems/search-insert-position/
代码随想录
2021/10/20
4410
二分法其实很简单,为什么老是写不对!!
相信很多人对二分法是又爱又恨,爱是在于它思想简单,效率确实高, 恨是恨在为什么总是写不对呢
代码随想录
2020/06/11
1K0
二分法题型小结
在刷题的过程中,二分法用的还是挺多的,有时候超时了往往是你没有用上二分法,今天我就来稍微总结下用的最多的三种二分法搜索。
帅地
2019/10/30
4830
Python|再认识,二分法
在初步学习认识了二分法后,刷题时还是会觉得解决二分法类题有些难度,看题解也会有很多疑问,下面小编将对疑问多的问题做回答。
算法与编程之美
2021/07/09
4510
LeetCode <二分搜索>34.Find First and Last Position of Element in Sorted Array
Given an array of integers nums sorted in ascending order, find the starting and ending position of a given target value.
大学里的混子
2018/10/30
1.2K0
二分查找有序重复元素
给定一个按照升序排列的整数数组 nums,和一个目标值 target。找出给定目标值在数组中的开始位置和结束位置。
MickyInvQ
2021/03/02
1.2K0
二分查找有序重复元素
二分法的左右边界
二分法用起来还是挺好用的,就是每次我总是纠结边界条件到底如何确定,用小于号还是小于等于号,满足条件后left是mid还是mid+1,为此专门做了两道简单题,整理了下思路。
伯约同学
2022/03/02
4620
Leetcode No.35 搜索插入位置(二分法)
给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。
week
2022/01/07
2680
算法学习笔记-二分法
leetcode875爱吃香蕉的珂珂https://leetcode-cn.com/problems/koko-eating-bananas/
买唯送忧
2021/05/09
4320
搞定大厂算法面试之leetcode精讲5.二分查找
搞定大厂算法面试之leetcode精讲5.二分查找 视频教程(高效学习):点击学习 目录: 1.开篇介绍 2.时间空间复杂度 3.动态规划 4.贪心 5.二分查找 6.深度优先&广度优先 7.双指针 8.滑动窗口 9.位运算 10.递归&分治 11剪枝&回溯 12.堆 13.单调栈 14.排序算法 15.链表 16.set&map 17.栈 18.队列 19.数组 20.字符串 21.树 22.字典树 23.并查集 24.其他类型题 二分搜索 时间复杂度O(logn) 步骤: 从数组中间的元素开始,如果中
全栈潇晨
2021/11/24
3540
【每日一题】35. Search Insert Position
Given a sorted array and a target value, return the index if the target is found. If not, return the index where it would be if it were inserted in order.
公众号-不为谁写的歌
2020/08/11
2270
【二分查找】详细图解[通俗易懂]
比如在一个有序的数组并且无重复元素的数组中,例如[1, 2, 3, 4, 5, 6],需要查找3的位置就可以使用二分查找。
全栈程序员站长
2022/09/27
5.4K0
【二分查找】详细图解[通俗易懂]
简单二分法查找(binary search)
有一个面试题是对一个1000万的数字进行快速查找,并且使用内存不能查过100M 答: 现在有1000个数字 每个数子大小为8Kb(为long基本类型) 那么现在占据用的内存 为800M 我们进行算法设计 ,将这个数据进行有序排列,组成为一个数组, 进而进行 折中查找,每一次在查找的时候取中间位置的数据,如下图(图片来源极客时间):
袁新栋-jeff.yuan
2020/08/26
6120
简单二分法查找(binary search)
二分查找法的实现和应用汇总
在学习算法的过程中,我们除了要了解某个算法的基本原理、实现方式,更重要的一个环节是利用big-O理论来分析算法的复杂度。在时间复杂度和空间复杂度之间,我们又会更注重时间复杂度。 时间复杂度按优劣排差不多集中在: O(1), O(log n), O(n), O(n log n), O(n2), O(nk), O(2n) 到目前位置,似乎我学到的算法中,时间复杂度是O(log n),好像就数二分查找法,其他的诸如排序算法都是 O(n log n)或者O(n2)。但是也正是因为有二分的 O(log n), 才让很
猿人谷
2018/01/17
1.2K0
Leetcode No.33 搜索旋转排序数组(二分法)
升序排列的整数数组 nums 在预先未知的某个点上进行了旋转(例如, [0,1,2,4,5,6,7] 经旋转后可能变为 [4,5,6,7,0,1,2] )。
week
2022/01/07
1960
Leetcode No.33 搜索旋转排序数组(二分法)
二分法查找
二分法查找又称为折半法查找,基本原理:与数组元素的中点比较,逐步定位到元素X所在的区域,最终查找到该元素。前提是:该元素必需按从小到大或者从大到小的顺序排列。实际当中要查找某个元素,可以先排序,再使用二分法查找。
用户4148957
2022/06/14
3050
python面试题-【二分法查找】给定一个已排序的非重复整数数组和一个目标值,如果找到目标,则返回索引。
前言 给定一个已排序的非重复整数数组和一个目标值,如果找到目标,则返回索引。如果不是,返回索引按顺序插入时的位置。 题目 给定一个已排序的非重复整数数组和一个目标值,如果找到目标,则返回索引。如果不是,返回索引按顺序插入时的位置。 (用二分法查找解决) 示例 1: 输入: [1,3,5,6], 5 输出: 2 示例 2: 输入: [1,3,5,6], 2 输出: 1 示例 3: 输入: [1,3,5,6], 7 输出: 4 示例 4: 输入: [1,3,5,6], 0 输出: 0 二分法查找 二分查找也称折
上海-悠悠
2022/07/19
1K0
python面试题-【二分法查找】给定一个已排序的非重复整数数组和一个目标值,如果找到目标,则返回索引。
面试手撕算法系列:二分法
这些都是LeetCode上有的题目 手撕无非就是 树、链表、二分、字符串这些常用的数据结构
程序员小猿
2021/01/19
5940
面试手撕算法系列:二分法
你写的二分法可能有个bug
在公众号里写了有 80 多篇原创文章了,大家大多都是利用碎片时间来阅读公众号文章,所以我后面的文章也尽量使用更通俗、更简短的文字。
谭小谭
2019/07/01
5660
你写的二分法可能有个bug
推荐阅读
相关推荐
35. Search Insert Position(二分法)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档