首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Canal连接kafka实现实时同步mysql数据

Canal连接kafka实现实时同步mysql数据

作者头像
王知无-import_bigdata
发布于 2019-10-15 09:27:15
发布于 2019-10-15 09:27:15
2.8K02
代码可运行
举报
运行总次数:2
代码可运行

canal-kafka是阿里云最近更新的一个新的安装包。主要功能是实现canal与kafka的对接,实现海量的消息传输同步。在canal-kafka中,消息是以ByteString进行传输的,并且用户只能通过配置来指定一些kafka的配置,从某种程度上有一定的局限性,所以我们使用canal来自定义客户端kafka,会有更好的灵活性,但维护成本会更大,所以如何选择根据实际情况而定。

构建maven依赖

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.25</version>
</dependency> 
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>
SimpleCanalClient
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.unigroup.client.canal;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.core.canal.CanalToKG;

/**   
* @Title: SimpleCanalClient.java 
* @Package com.unigroup.canal 
* @Description: canal单实例接口
*/
public class SimpleCanalClient {

    private CanalConnector connector=null;

    public SimpleCanalClient(String ip,String port,String instance) {

        // 创建链接
        connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, Integer.parseInt(port)),instance, "", "");
    }
    public List<Entry> execute(int batchSize,Class<?> clazz ) throws InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException {

        //int batchSize = 1;
        int emptyCount = 0;
        Object obj = clazz.newInstance();
        Method method = clazz.getMethod("send",Message.class);
        try {
            connector.connect();
            // connector.subscribe(".*\\..*");
            connector.subscribe("test.test1");

            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    method.invoke(obj, message);            
                }
                connector.ack(batchId); // 提交确认

                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } catch (IllegalAccessException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalArgumentException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
        return null;
    }
}
CanalKafkaProducer
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.unigroup.kafka.producer;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.kafka.producer.KafkaProperties.Topic;
import com.unigroup.utils.MessageSerializer;

/**   
* @Title: CanalKafkaProducer.java 
* @Package com.unigroup.kafka.producer 
* @version V1.0   
*/
public class CanalKafkaProducer {

    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);

    private Producer<String, Message> producer;

    public void init(KafkaProperties kafkaProperties) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getServers());
        properties.put("acks", "all");
        properties.put("retries", kafkaProperties.getRetries());
        properties.put("batch.size", kafkaProperties.getBatchSize());
        properties.put("linger.ms", kafkaProperties.getLingerMs());
        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", MessageSerializer.class.getName());
        producer = new KafkaProducer<String, Message>(properties);
    }

    public void stop() {
        try {
            logger.info("## stop the kafka producer");
            producer.close();
        } catch (Throwable e) {
            logger.warn("##something goes wrong when stopping kafka producer:", e);
        } finally {
            logger.info("## kafka producer is down.");
        }
    }

    public void send(Topic topic, Message message) throws IOException {

        ProducerRecord<String, Message> record;
        if (topic.getPartition() != null) {
            record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
        } else {
            record = new ProducerRecord<String, Message>(topic.getTopic(), message);
        }
        producer.send(record);
        if (logger.isDebugEnabled()) {
            logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
        }
    }
}
canalToKafkaServer
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.unigroup.kafka.server;

import com.unigroup.client.canal.SimpleCanalClient;
import com.unigroup.kafka.producer.CanalKafkaProducer;
import com.unigroup.utils.GetProperties;

/**   
* @Title: canal.java 
* @Package com.unigroup.kafka.server 
* @version V1.0   
*/
public class canalToKafkaServer {
    public static void execute() {
        SimpleCanalClient simpleCanalClient = new SimpleCanalClient(GetProperties.getValue("MYSQL_HOST"),
                GetProperties.getValue("MTSQL_PORT"), GetProperties.getValue("INSTANCE"));
        try {
            simpleCanalClient.execute(1,CanalKafkaProducer.class);
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}

至此一个简单的canal到kafka的demo已经完成。这些都只是测试代码,实际应用中根据不同的情况,可以自己开发更多功能。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Canal采集MySQL Binlog——数据同步方案
简介: 针对业务库MySQL Binlog日志数据进行数据同步,从MySQL到Kafka,最终实现实时(准实时)数据采集。其中Canal可以使用且推荐使用MaxWell,此篇文档只是介绍如何通过Canal打通上下游数据点。
大王叫我来巡山、
2025/07/21
1970
Canal数据同步工具
​ Canal就是一个很好的数据库同步工具。canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。
OY
2022/03/18
1.8K0
Canal数据同步工具
Canal入门
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
王知无-import_bigdata
2019/10/15
1.2K0
Canal入门
使用canal增量订阅MySQL binlog
【转载请注明出处】:https://cloud.tencent.com/developer/article/1634327
后端老鸟
2020/05/28
3.1K0
使用canal增量订阅MySQL binlog
【Canal】数据同步的终极解决方案,阿里巴巴开源的Canal框架当之无愧!!
作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址:
冰河
2020/10/29
1.5K0
【Canal】数据同步的终极解决方案,阿里巴巴开源的Canal框架当之无愧!!
Canal 原理与实践
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,不支持全量已有数据同步。由于采用了 binlog 机制,Mysql 中的新增、更新、删除操作,对应的 Elasticsearch都能实时新增、更新、删除。
Se7en258
2021/05/18
1.1K0
Canal 原理与实践
Spark Streaming + Canal + Kafka打造Mysql增量数据实时进行监测分析
Spark中的Spark Streaming可以用于实时流项目的开发,实时流项目的数据源除了可以来源于日志、文件、网络端口等,常常也有这种需求,那就是实时分析处理MySQL中的增量数据。
王知无-import_bigdata
2021/04/21
1.6K0
Spark Streaming + Canal + Kafka打造Mysql增量数据实时进行监测分析
阿里Canal框架(数据同步中间件)初步实践
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
java架构师
2019/05/15
1.3K0
大数据Canal(三):使用Canal同步MySQL数据
使用Canal同步MySQL的数据可以直接使用Canal客户端API方式消费Canal同步的数据,详细api参照:
Lansonli
2023/03/11
3.4K0
大数据Canal(三):使用Canal同步MySQL数据
使用canal-deployer实现mysql数据同步
在shigen之前的文章当中,苦于mysql和elasticsearch之间的数据同步问题,甚至尝试开源一款mysql-es数据同步工具 - 掘金。觉得可以自己去实现这些同步。但是遇到了的问题也很多:
shigen
2024/01/10
3670
使用canal-deployer实现mysql数据同步
Canal实现MySQL数据实时同步
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
别团等shy哥发育
2023/02/25
3.4K0
Canal实现MySQL数据实时同步
聊聊CanalMQStarter
canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java
code4it
2020/04/17
3520
聊聊CanalMQStarter
数据库增量数据同步,用Canal组件好使吗?
大家好,我是小义,今天来讲一下Canal。Canal是阿里巴巴开源的一款基于MySQL数据库binlog的增量订阅和消费组件,它的主要工作原理是伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Master发送dump协议。当MySQL master收到canal发送过来的dump请求后,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,如MySQL,Kafka等。
程序员小义
2024/04/10
3660
数据库增量数据同步,用Canal组件好使吗?
SpringBoot系列之canal和kafka实现异步实时更新
canal是阿里开源的, 对数据库增量日志解析,提供增量数据订阅和消费的组件。引用官网的图片,canal的工作原理主要是模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向master发送dump 协议,获取到数据后,解析 binary log 对象数据。
SmileNicky
2022/01/04
2.1K1
SpringBoot系列之canal和kafka实现异步实时更新
2 监听mysql表内容变化,使用canal
mysql本身是支持主从的(master slave),原理就是master产生的binlog日志记录了所有的增删改语句,将binlog发送到slave节点进行执行即可完成数据的同步。
天涯泪小武
2019/01/17
7K0
Canal原理及其使用
  canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)
用户4283147
2022/10/27
1.7K0
Canal原理及其使用
我是如何一步一步监控公司MySQL的每一个操作?
canal是一款基于数据库增量日志解析,提供增量数据订阅与消费的框架,整个框架纯JAVA开发,目前仅支持Mysql和MariaDB(和mysql类似)。
程序员小富
2020/07/13
7170
我是如何一步一步监控公司MySQL的每一个操作?
kafka连接异常 原
kafka版本是0.10.2.1 本地java客户端版本是0.8.1.1 主要两个错误 第一个是连接拒绝 kafka Connection refused: no further information 然后发现第二个错误 Selector.poll(Selector.java:276) - Error in I/O with localhost/127.0.0.1 怀疑是ip绑定有问题,编辑server.properties,指定ip地址 advertised.host.name=ip地址 重启后,运行
尚浩宇
2018/08/17
1.9K0
聊聊canal的CanalAdapterWorker
canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
code4it
2020/04/05
1K0
canal-adapter消费Kafka中MySQL的binlog数据,却没有同步更新Elastic search
在同步的时候发现canal-adapter中canal-adapter/conf/es7/product.yml 配置文件中sql 语句连表查询的时候会出现无法更新Elasticsearch 中数据的情况,而且日志没有提示异常(idea启动的时候有错误日志),令人百思不得其解。
iiopsd
2022/12/23
1.9K0
canal-adapter消费Kafka中MySQL的binlog数据,却没有同步更新Elastic search
相关推荐
Canal采集MySQL Binlog——数据同步方案
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档