Spring Cloud生态圈十分丰富,在消息流操作上有个十分强大的组件Spring CloudStream,封装了一些优秀的Spring消息事件集成组件和消息中间件的配置,今天来简单使用一下。
1. 基本认识
参考英文官网:
基本概念,右侧是不同版本的文档
https://cloud.spring.io/spring-cloud-stream/
其中一个spring cloud版本的文档:
https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.SR2/reference/htmlsingle/
D版本中文文档地址:
https://springcloud.cc/spring-cloud-dalston.html#_introducing_spring_cloud_stream
Spring Cloud Stream是构建消息驱动的微服务应用程序的框架。Spring Cloud Stream基于Spring Boot建立独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。它提供了来自几家供应商的中间件的意见配置,介绍了持久发布订阅语义,消费者组和分区的概念。
Spring Cloud Stream提供了建立在已经建立和熟悉的Spring习惯用法和最佳实践之上的灵活的编程模型,应用起来更加方便,目前支持kafka和rabbitmq两种消息中间件,切换到话基本不用动代码。
看一下基本的架构图:
Spring Cloud Stream应用程序由middleware-neutral核心组成。该应用程序通过Spring Cloud Stream注入其中的输入和输出通道与外界通信。通过中间件特定的Binder实现,通道连接到外部代理。那么这个middleware目前可以是kafka,也可以是rabbitmq。
其他概念性的东西可以在官网上看到,我们先看一些简单的东西。下面从源码上看看一些有意思的东西。
2.看看简单的源码
2.1 整体结构
关于消息的处理很丰富,有几个重要的点,绑定,消息转换,消息,配置
2.2 config
看下我们要配置哪些东西
在官网文档中看到的一些配置在这个类里可以看到,我们可以绑定消息发送的目的地、分组、序列化方式、绑定的消息中间件、消费者配置、生产者配置。
2.3 支持的消息转换方式
为了传播有关生成消息的内容类型的信息,Spring Cloud Stream默认将contentType头附加到流出的消息。 对于不直接支持头部的中间件,Spring Cloud Stream提供了将流出消息自动封装在自己的包中的机制。 对于支持头的中间件,Spring Cloud Stream应用可以从非Spring Cloud Stream应用程序接收具有给定内容类型的消息。
contentType值被解析为媒体类型,例如application / json或text / plain; charset = UTF-8。
MIME类型对于指示如何转换为String或byte []内容特别有用。 Spring Cloud Stream还使用MIME类型格式来表示Java类型:使用具有type参数的常规类型application / x-java-object。 例如,可以将application / x-java-object; type = java.util.Map或application / x-java-object; type = com.bar.Foo设置为输入绑定的content-type属性。 另外,Spring Cloud Stream提供了自定义的MIME类型,值得注意的是,application / x-spring-tuple指定了一个Tuple(元组)
2.4 预定义的接口
Spring Cloud Stream已经预定义了一些接口,在message包下,我们可以直接拿来用
来看看里面的内容:
Sink:
Source:
Processor:
从代码可以看出:
三个预设的接口
Sink:接受消息的接口,定义了一个订阅频道方法
Source:输出消息的接口,定义了一个消息通道方法
Processor:绑定输入输出的接口,发布者和订阅者绑定到一起
下面我们来实战一波
3.实战
根据Spring Cloud Stream特点,我们来利用预设接口操作一波,消息中间件采用kafka,至于kafka的安装百度一大堆,这里就不多说了。
3.1主要pom依赖
3.2 bootstrap.yml配置
3.3消费者监听
3.4测试接口
在启动类上写了测试接口:
启动类注解,需要开启绑定:
由于kafka没有安装web页面,图方便在这写了测试的接口,并集成了swagger2,可以直接模拟通过output发送消息,前面已经看到processor接口继承Source、Sink,可以直接用默认接口。
3.5测试结果
发送消息与接收到消息:
4.总结
从实际例子上简单来说,就是@EnableBinding注解在你的应用上,从而立即连接到消息代理,在方法上添加@StreamListener以使其接收流处理事件。
其实要理解的更深刻最好从kafka里面直接发送一条消息到队列上来,但是kafka本身没web页面,这个就先忽略了,等找到好的kafka监控工具再来看看。
在配置文件中可以看到group分组、patitioned开启分区,那么为什么有这两个东西呢?
其实这个和kafka很像,group分组是为了解决重复消费的问题,当一个应用有多个实例的时候,都去消费这条数据的话就乱套了,绝大多数场景是不允许重复消费的。当一个应用程序不同实例放置在一个具有竞争关系的消费组中,组里面的实例中只有一个能够消费消息。
分区是个有意思的东西,kafka本身就有分区功能,估计stream是借鉴了其中的原理,Spring Cloud Stream支持在给定应用程序的多个实例之间对数据进行分区。在分区场景中,物理通信介质(例如,broker topic)被视为被构造为多个分区。一个或多个生产者应用程序实例将数据发送到多个消费者应用程序实例,并确保由共同特征标识的数据由同一个消费者实例处理。Spring Cloud Stream提供了一种通用抽象,用于以统一的方式实现分区处理用例。因此,无论代理本身是否自然分区(例如,Kafka)(例如,RabbitMQ),都可以使用分区。
Spring Cloud Stream中还有很多内容,可以自定义通道、编程模型等等,不能一一写全,在实际应用中如果碰到需要这个操作了,可以在官网上看下介绍和例子,注意要对应Spring Cloud版本。
demo地址:https://gitee.com/yuanhan93/springcloudLearning1
ps:在stream-schema-client 模块,由于我使用了注册中心和配置中心,并且写了其他的代码,config可先改为从本地文件获取配置,去掉git配置,数据库和rabbitmq、kafka都改成自己的,启动顺序为discovery->config->schema-registry->stream-schema-client,当然自己可以写个简单的模块,不用启动这么多,下次我加一个最简启动的module。
正常启动后,swagger ui地址:http://localhost:8991/swagger-ui.html
下期预告:Spring Cloud Stream Schema Avro处理大数据流简单应用
领取专属 10元无门槛券
私享最新 技术干货