SCS 在 3.x 做了很大的改动,废除了诸如 @StreamListener、@Input、@Output 等类,保留了 Binder、Binding,并提供了批量消费的支持。 本着学新不学旧的原则,本文将介绍 SCS 3.x 相关内容。 由于关于 spring cloud stream kafka 的文档比较充足,本文就此为例介绍 SCS。
Binder 是提供与外部消息中间件集成的组件,为 Binding 提供了 2 个方法,分别是 bindConsumer 和 bindProducer,它们用于构造生产者和消费者。 Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。
不要在事务中尝试重试和提交死信。重试时,事务可能已经回归。如果想要提交死信用于善后,那么可以使用 DefaultAfterRollbackProcessor
以在回滚之后提交死信。
binder 会使用 Error Channel 向消费者传递异常,同时可以配置异步生产者发生异常时将异常传递到 Error Channel。
默认情况下,某 topic 的死信队列将与原始记录存在于相同分区中。
死信队列中的消息是允许复活的,但是应该避免消息反复消费失败导致多次循环进入死信队列。
应该使用一个专门的处理程序用来对这些死信队列的信息进行善后。
顾名思义,Consumer 定义的是一个消费者,他是一个函数式接口,提供了消费消息的方法。我们可以直接在 Bean 声明中使用 lambda 表达式实现它。
值得注意的是,Consumer 还是一个泛型接口,通过泛型来绑定消息的类型。接收消息的类型我们会用到 KStream 类,他将与发送消息时定义的 KStream 对应,是键值对组成的抽象记录流,但相同 key 的记录不会被覆盖。
@Bean
public Consumer<KStream<Object, String>> consumer() {
return input -> input.foreach((key, value) -> {
do consume;
});
}
当我们在应用程序中声明返回 Consumer 的 Bean,那么这个 Bean 就会自动接入消息队列。另外,我们需要用到 spring.cloud.stream.bindings.{beanName}-in-{idx}={topic}
来设置订阅的消息主题。默认情况下,topic 与 beanName 同名。
spring.cloud.stream.bindings.consumer-in-0 = userBuy
当接收到消息时,就会调用 Consumer 定义的 accept 方法进行消息消费。
SCS 并没有对发送消息做出一个具体的封装,而是建议通过各个消息队列支持的 client 或者 template 发送消息。
kafkaTemplate.send(message);
但有时候,我们需要对数据进行加工后发送回消息队列中,这个时候就会用到 Function。
它和 Consumer 类似,但是方法多了一个返回值。同样的,这个返回值需要用到 KStream 类,这样就能够支持将处理完的数据返回到消息队列。
@Bean
public Function<KStream<Object, String>, KStream<Object, Stream>> processor() {
return input -> input.map((key, value) -> {
do process;
return new KeyValue(key, value);
})
}
spring.cloud.stream.bindings.{beanName}-out-{idx}={topic}
来设置出口的消息主题。默认情况下,topic 与 beanName 同名。
Function 相比生产者或消费者,更像是将消息进行加工,这个过程可以对消息进行一系列的处理,包括消息拆分,消息过滤和计算中间结果等。常见的一个用途就是国际化消息和多平台通知。
国际化消息就是对消息进行本地化,Function 就类似一个翻译官的功能,将翻译好的消息转达给消费者。
有时候我们也需要同时对多个平台推送通知,比如邮件、短信等。一般来说,邮件服务器和短信服务器不会写死消息的模板以提高泛用性,这个时候就需要中间人对消息进行加工,嵌入对应平台的模板。
上面提到了消息拆分,Function 允许多个 topic 的消息发送,返回值上会用到 KStream 数组,然后配置上会用到方才展示的 spring.cloud.stream.bindings.{beanName}-out-{idx}={topic}
,idx 代表的就是返回值 KStream 在数组中的索引。
多输入绑定在普通应用程序上很少用到,一般用于分布式计算。比如除法计算需要同时拥有除数和被除数。分布式计算也是 SCS 的一大用处之一,知识盲区,在此不多做介绍。
上面多次提到了 KStream,它实质上是一个顺序且可不断增长的数据集,是数据流的一种。
KTable 与 KStream 类似,但是与 KStream 不同的是,他不允许 key 的重复。 面对相同 key 的数据,会选择更新而不是插入。 KTable 实质上也是数据流,他的实现类同样继承了 AbstractStream。 可以将他看成某一时刻,KStream 的最新快照。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。