Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Java进阶实录—pringmvc+kafka分布式消息中间件集成方案

Java进阶实录—pringmvc+kafka分布式消息中间件集成方案

作者头像
慕容千语
发布于 2019-06-11 15:13:50
发布于 2019-06-11 15:13:50
44400
代码可运行
举报
运行总次数:0
代码可运行

Honghu的消息服务平台已经抛弃了之前的ActiveMQ,改用高吞吐量比较大的Kafka分布式消息中间件方案: kafka消息平台使用spring+kafka的集成方案, 详情如下: 1 . 使用最高版本2.1.0.RELEASE集成jar包:spring-integration-kafka 2 . Zookeeper、Kafka分布式集群使用init.properties配置化方案。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kafka.servers=127.0.0.1:9092    
kafka.topic=xxxooo  

3 . 使用消息生产者spring-context-producer配置化方案。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                <entry key="group.id" value="2" />
                <entry key="retries" value="10" />
                <entry key="batch.size" value="16384" />
                <entry key="linger.ms" value="1" />
                <entry key="buffer.memory" value="33554432" />
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer" />
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            </map>
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="true" />
        <property name="defaultTopic" value="test" />
    </bean>
</beans>

4 . 使用消息消费者spring-context-producer配置化方案。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- 定义consumer的参数 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                <entry key="group.id" value="0" />
                <entry key="enable.auto.commit" value="true" />
                <entry key="auto.commit.interval.ms" value="1000" />
                <entry key="session.timeout.ms" value="15000" />
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer" />
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
            </map>
        </constructor-arg>
    </bean>
    <!-- 创建consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties" />
        </constructor-arg>
    </bean>
    <!-- 实际执行消息消费的类 -->
    <bean id="messageListernerConsumerService" class="com.sml.sz.kafka.KafKaConsumer" />
    <!-- 消费者容器配置信息 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="test" />
        <property name="messageListener" ref="messageListernerConsumerService" />
    </bean>
    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
    </bean>
</beans>

5 . 使用注解方式注入消息类型

@Autowired private KafkaTemplate<xxx, ooo> kafkaTemplate;

6 . 重写MessageListener 的getMessage方法获取消息(业务实现)

7 . RestFul服务方式测试消息服务

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@CrossOrigin(origins = "*", maxAge = 3600, methods = { RequestMethod.GET, RequestMethod.POST, RequestMethod.DELETE,  
        RequestMethod.PUT })  
@RestController  
@RequestMapping(value = "/rest/kafka")  
public class KafKaProducer {  
      
    @RequestMapping(value = "/send", method = RequestMethod.GET)  
    public JSONObject save() {  
        System.out.println("+++++++++++++++++++++++++++++++");  
        kafkaTemplate.sendDefault("HongHu KAFKA分布式消息服务测试");    
        return null;  
    }  
      
    @Autowired    
    private KafkaTemplate<Integer, String> kafkaTemplate;  
}  
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@RestController
public class KafKaConsumer implements MessageListener<Integer, String> {
    @Autowired
    private LogService logService;
    public void onMessage( ConsumerRecord<Integer, String> records )
    {
        System.out.println( "====================" + records );
        Object  o   = records.value();
        Log log = new Log();
        log.setIsNewRecord( true );
        log.setId( IdGen.uuid() );
        log.setTitle( String.valueOf( o ) );
        logService.save( log );
    }
}

接受消息了------------------:ConsumerRecord(topic = xxxooo, partition = 0, offset = 2489, CreateTime = 1479647648299, checksum = 3372898135, serialized key size = -1, serialized value size = 40, key = null, value = HongHu KAFKA分布式消息服务测试)

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.04.03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Java-Spring框架-基于xml方式注入属性
在bean属性中注入空值,可以在<property>标签中添加<null>标签,来表示当前的值为null
吃猫的鱼Code
2023/03/12
3980
spring集成kafka
一、添加依赖项 compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE' 二、发消息(生产者) 2.1 xml配置 1 <?xml
菩提树下的杨过
2018/01/18
8020
spring整合kafka(配置文件方式 生产者)
Kafka官方文档有 https://docs.spring.io/spring-kafka/reference/htmlsingle/
大数据流动
2019/08/08
1.1K0
Kafka 客户端开发
前两篇文章讲述了 Kafka 的 工作机制 和 服务器集群部署。至此,Kafka 服务器已就绪,本文分别以官方API、Spring、SpringBoot三种构建方式,讲述了 Kafka 消费生产者和消费者的开发。
IT技术小咖
2019/06/26
1.3K0
SpringMVC集成redis cluster集群模式
1、cluster相对于哨兵模式是去中心化的,它的每个节点都存储了其它集群的信息,因此每个节点都可以做为集群的中心,容错能力强,具有更高的可用性和在线扩容能力。
stys35
2020/04/12
1.8K0
干货--JMS(java消息服务)整合Spring项目案例
Sprng-jms消息服务小项目 所需的包: spring的基础包 spring-jms-xx包 spring-message–xx包 commons-collection-xx包 commons-pool2-xx包 aop切面的包: spring-aop,spring-aspect,aopalliance,aspectjrt.jar,aspectjweaver.jar 配置: 1.配置ConnectionFactory 2.
汤高
2018/01/11
1.9K0
干货--JMS(java消息服务)整合Spring项目案例
Shiro+Redis实现tomcat集群session共享
  当我们使用了nginx做项目集群以后,就会出现一个很严重的问题亟待解决,那就是:tomcat集群之间如何实现session共享的问题,如果这个问题不解决,就会出现登陆过后再次请求资源依旧需要登陆的问题。这篇文章我们就解决这个问题。
阿豪聊干货
2018/08/09
8820
Spring-注入参数详解-[简化配置方式]
Spring为字面值、引用Bean和集合都提供了简化的配置方式,如果没有用到完整格式的特殊功能,大可使用简化的配置方式。
小小工匠
2021/08/16
2830
【SSM_01】Spring-IOC
一、spring 概述 1. 什么是 spring * spring 是分层的 java ee 应用,全栈(full-stact)轻量级开源框架,主要有 IOC(反转控制) 、 AOP(面向切面编程) * 反转控制:将对象的创建全交给 spring,有利于解耦,降低代码的依赖关系 2. 使用方法 ① 导包 需要导入 4 个jar包 【spring-aop 、spring-beans 、spring-core 、spring-expression】 导入 spring-context 相当
用户8250147
2021/02/04
2200
springboot kafka集成(实现producer和consumer)
本文介绍如何在springboot项目中集成kafka收发message。 1、先解决依赖 springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</ve
用户1225216
2018/03/05
3.6K0
SSM第一讲 Spring概述和基础知识详解
架构设计时的内聚高低是指,设计某个模块或者关注点时,模块或关注点内部的一系列相关功能的相关程度的高低。
易兮科技
2020/09/27
6370
SSM第一讲  Spring概述和基础知识详解
springmvc+redis实现简单消息队列
有一只柴犬
2024/01/25
1510
Spring 全家桶之 Spring Framework 5.3(二)- Part A
再次执行testGetBeanByClass(),会出现如下报错: No qualifying bean of type 'com.citi.entity.Person' available: expected single matching bean but found 2: stark,parker 这是因为配置文件中有连个bean配置,获取的时候没有指定获取哪个bean,所以会报错,修改testGetBeanByClass()
RiemannHypothesis
2022/08/19
4490
Spring 全家桶之 Spring Framework 5.3(二)- Part A
Spring RestFul Example (实例参考)
以下节选择《Netkiller Java 手札》 11.8. Spring4 Restful @RestController 首先我要禁告各位,Spring发展过程中,每个版本都有一定差异。如果你做实验失败后在网上搜索答案,切记看一下版本号还有文章帖子的发布时间。否则你可能按照Spring3配置方法去Spring4。 @RestController 默认返回 @ResponseBody, 所以@ResponseBody可加可不加 11.8.1. pom.xml Maven 增加 jackson 开发包
netkiller old
2018/03/05
1K0
spring整合中间件(kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)-kafka
上文:spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)
逍遥壮士
2021/04/13
9540
spring整合中间件(kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)-kafka
spring整合kafka(配置文件方式 消费者)
Kafka官方文档有 https://docs.spring.io/spring-kafka/reference/htmlsingle/
大数据流动
2019/08/08
1.3K0
JavaWeb项目架构之Kafka分布式日志队列
架构、分布式、日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Kafka做消息队列罢了。 kafka介绍 Kafka是由Apache软件基金会开发的一个开源流处理平台,由S
小柒2012
2018/04/13
7970
JavaWeb项目架构之Kafka分布式日志队列
如何基于消息中间件实现分布式事务?我想说的都在这儿了!!
作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址:
冰河
2020/10/29
3K0
如何基于消息中间件实现分布式事务?我想说的都在这儿了!!
ActiveMQ学习之Spring整合ActiveMQ------>消息主题
一、pom <dependencies> <!--activemq--> <dependency> <groupId>org.apache.activemq</groupI
用户5899361
2020/12/07
2970
Spring之DI依赖注入
​ name:对应bean中的属性名,要求该属性必须提供可访问的set方法(严格规范为此名称是set方法对应名称)
楠羽
2022/11/18
5210
Spring之DI依赖注入
相关推荐
Java-Spring框架-基于xml方式注入属性
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验