前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Java进阶实录—pringmvc+kafka分布式消息中间件集成方案

Java进阶实录—pringmvc+kafka分布式消息中间件集成方案

作者头像
慕容千语
发布于 2019-06-11 15:13:50
发布于 2019-06-11 15:13:50
44400
代码可运行
举报
运行总次数:0
代码可运行

Honghu的消息服务平台已经抛弃了之前的ActiveMQ,改用高吞吐量比较大的Kafka分布式消息中间件方案: kafka消息平台使用spring+kafka的集成方案, 详情如下: 1 . 使用最高版本2.1.0.RELEASE集成jar包:spring-integration-kafka 2 . Zookeeper、Kafka分布式集群使用init.properties配置化方案。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kafka.servers=127.0.0.1:9092    
kafka.topic=xxxooo  

3 . 使用消息生产者spring-context-producer配置化方案。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                <entry key="group.id" value="2" />
                <entry key="retries" value="10" />
                <entry key="batch.size" value="16384" />
                <entry key="linger.ms" value="1" />
                <entry key="buffer.memory" value="33554432" />
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer" />
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            </map>
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="true" />
        <property name="defaultTopic" value="test" />
    </bean>
</beans>

4 . 使用消息消费者spring-context-producer配置化方案。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- 定义consumer的参数 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                <entry key="group.id" value="0" />
                <entry key="enable.auto.commit" value="true" />
                <entry key="auto.commit.interval.ms" value="1000" />
                <entry key="session.timeout.ms" value="15000" />
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer" />
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>
    <!-- 创建consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>
    <!-- 实际执行消息消费的类 -->
    <bean id="messageListernerConsumerService" class="com.sml.sz.kafka.KafKaConsumer" />
    <!-- 消费者容器配置信息 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="test" />
        <property name="messageListener" ref="messageListernerConsumerService" />
    </bean>
    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
    </bean>
</beans>

5 . 使用注解方式注入消息类型

@Autowired private KafkaTemplate<xxx, ooo> kafkaTemplate;

6 . 重写MessageListener 的getMessage方法获取消息(业务实现)

7 . RestFul服务方式测试消息服务

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@CrossOrigin(origins = "*", maxAge = 3600, methods = { RequestMethod.GET, RequestMethod.POST, RequestMethod.DELETE,  
        RequestMethod.PUT })  
@RestController  
@RequestMapping(value = "/rest/kafka")  
public class KafKaProducer {  
      
    @RequestMapping(value = "/send", method = RequestMethod.GET)  
    public JSONObject save() {  
        System.out.println("+++++++++++++++++++++++++++++++");  
        kafkaTemplate.sendDefault("HongHu KAFKA分布式消息服务测试");    
        return null;  
    }  
      
    @Autowired    
    private KafkaTemplate<Integer, String> kafkaTemplate;  
}  
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@RestController
public class KafKaConsumer implements MessageListener<Integer, String> {
    @Autowired
    private LogService logService;
    public void onMessage( ConsumerRecord<Integer, String> records )
    {
        System.out.println( "====================" + records );
        Object  o   = records.value();
        Log log = new Log();
        log.setIsNewRecord( true );
        log.setId( IdGen.uuid() );
        log.setTitle( String.valueOf( o ) );
        logService.save( log );
    }
}

接受消息了------------------:ConsumerRecord(topic = xxxooo, partition = 0, offset = 2489, CreateTime = 1479647648299, checksum = 3372898135, serialized key size = -1, serialized value size = 40, key = null, value = HongHu KAFKA分布式消息服务测试)

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.04.03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
如何优化 Ansible Playbook 执行速度
运行最新版本的 Ansible 可帮助提高使用 Ansible 核心模块的 Playbook 的性能。同时尽可能让控制节点靠近受管节点。Ansible严重依赖网络通信和数据传输。
山河已无恙
2023/01/30
2K0
ansible调优
解决方法: 修改ansible.cfg host_key_checking = False
buiu
2021/11/26
3390
《Ansible自动化运维:技术与最佳实践》第二章读书笔记
当更新 Ansible 版本时,要更新 git 源码树以及 git 中指向 Ansible 自身的模块(称为 submodules)
武培轩
2019/09/12
1.2K0
002. Ansible部署及配置介绍
提示:建议将PIP升级到最新:pip install --upgrade pip。
木二
2019/07/01
7640
Ansible
172.10.0.18主机不可达,ansible是基于ssh的,ansible不知道172.10.0.18这台主机的用户名和密码,所以ansible无法连接到它。 我们可以在清单文件中加入对应主机的用户名和密码,也可以基于密钥的方式
陳斯托洛夫斯記
2022/10/27
8770
Ansible
ansible服务部署与使用
第1章 ssh+key实现基于密钥连接(ansible使用前提) 说明:    ansible其功能实现基于SSH远程连接服务 使用ansible需要首先实现ssh密钥连接 1.1 部署ssh ke
惨绿少年
2017/12/27
4.2K0
ansible服务部署与使用
02 . Ansible高级用法(运维开发篇)
ansible是python中的一套模块,系统中的一套自动化工具,可以用作系统管理,自动化命令等任务
iginkgo18
2020/09/27
3.8K0
02 . Ansible高级用法(运维开发篇)
Ansible自动化部署服务
1、安装部署 yum inatsll ansible -y 2、配置文件 Ansible常用参数详解 [defaults] #通用默认配置 inventory = /etc/ansible/hosts #被控制端IP或者DNS列表 library = /usr/share/my_modules/ ##默认搜寻模块的位置 remote_tmp = ~/.ansible/tmp #远程执行临时文件 local_tmp
@凌晨
2021/01/22
9770
ansible.cfg配置详解
一.简单配置 [defaults] inventory = /etc/ansible/hosts sudo_user=root remote_port=22 host_key_checking=False remote_user=root log_path=/var/log/ansible.log module_name=command private_key_file=/root/.ssh/id_rsa no_log:True 二.详细配置 # config file for ansible -
陈不成i
2021/08/02
1.9K0
Ansible 以及 Ansible-playbook介绍
Anasible 是基于Python2-Paramiko 模块开发的自动化维护工具,实现了批量系统配置、部署、运行等功能。Ansible是基于模块工作的,本身不具备批量部署的功能,如果想要实现批量自动化部署,是Ansible自身的各种模块的集合。
jwangkun
2021/12/23
6.2K0
Ansible 以及 Ansible-playbook介绍
ansible超详细使用指南
来源:ansible一词源于科幻小说,是一种超光速通信设备。 Ansible is the simplest way to automate apps and IT infrastructure。 750+模块,19000+ github stars。
sunsky
2020/08/20
2.7K0
ansible超详细使用指南
Ansible安装配置
ansible基于python开发,集合了众多优秀运维工具的优点,实现了批量运行命令、部署程序、配置系统等功能。默认通过SSH协议进行远程命令执行或下发配置,无需部署任何客户端代理软件,从而使得自动化环境部署变得更加简单。可同时支持多台主机并进行管理,使得管理主机更加便捷。主版本大概每2个月发布一次。
胡齐
2019/11/12
5570
自动化运维—Ansible(上)
  ansible甚至都不用启动服务,仅仅只是一个工具,可以很轻松的实现分布式扩展
yaohong
2019/09/11
2.7K0
自动化运维—Ansible(上)
Ansible自动化运维工具
Ansible 是一个极其简单的 IT 自动化平台,可让您的应用程序和系统更易于部署和维护。从代码部署到网络配置再到云管理,使用一种接近简单英语的语言,使用 SSH 实现一切自动化,无需在远程系统上安装代理。
BoyChai
2022/06/22
2.6K0
Ansible自动化运维工具
Ansible PlayBook的中变量优先级分析及清单变量解耦总结
「 傍晚时分,你坐在屋檐下,看着天慢慢地黑下去,心里寂寞而凄凉,感到自己的生命被剥夺了。当时我是个年轻人,但我害怕这样生活下去,衰老下去。在我看来,这是比死亡更可怕的事。--------王小波」
山河已无恙
2023/01/30
5.4K0
Ansible PlayBook的中变量优先级分析及清单变量解耦总结
Ansible3:ansible.cfg
    Ansible默认安装好后有一个配置文件/etc/ansible/ansible.cfg,该配置文件中定义了ansible的主机的默认配置部分,如默认是否需要输入密码、是否开启sudo认证、action_plugins插件的位置、hosts主机组的位置、是否开启log功能、默认端口、key文件位置等等。
py3study
2020/01/08
9800
Ansible服务端安装与配置
1、ansible是自动化运维工具,基于Python开发,实现批量部署、配置、运行等。
用户5760343
2022/05/24
1.1K0
Ansible服务端安装与配置
ansible
  ansible是目前最受运维欢迎的自动化运维工具,基于Python开发,集合了众多运维工具(SaltStack puppet、chef、func、fabric)的优点,实现了批量系统配置、批量程序部署、批量运行命令等功能。   ansible是基于 paramiko 开发的,并且基于模块化工作,本身没有批量部署的能力。真正具有批量部署的是ansible所运行的模块,ansible只是提供一种框架。ansible不需要在远程主机上安装client/agents,因为它们是基于ssh来和远程主机通讯的。ansible目前已经已经被红帽官方收购,是自动化运维工具中大家认可度最高的,并且上手容易,学习简单。是每位运维工程师必须掌握的技能之一。
Cyylog
2020/08/19
4K1
Ansible常用功能说明 [异步、并发、委托等]
Ansible的同步模式与异步模式 同步模式: 如果节点数太多,ansible无法一次在所有远程节点上执行任务,那么将先在一部分节点上执行一个任务(每一批节点的数量取决于fork进程数量,默认为5个,可设置),直到这一批所有节点上该任务完全执行完毕才会接入下一个批节点,直到所有节点将该任务都执行完毕,然后重新回到第一批节点开始执行第二个任务。依次类推,直到所有节点执行完所有任务,ansible端才会释放shell。这是默认同步模式,也就是说在未执行完毕时,ansible是占用当前shell的,任务执行完后,释放shell了才可以输入其他命令做其他动作。
洗尽了浮华
2019/10/11
8.2K1
ansible使用
Ansible 是一个开源的、自动化运维的强大工具,早前被红帽收购,通过它可实现实现批量系统配置、批量程序部署、批量运行命令等功能。
雪人
2023/01/04
5590
相关推荐
如何优化 Ansible Playbook 执行速度
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验