Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >SpringBoot+Nacos+Kafka简单实现微服务流编排

SpringBoot+Nacos+Kafka简单实现微服务流编排

作者头像
芋道源码
发布于 2022-06-29 08:54:49
发布于 2022-06-29 08:54:49
82600
代码可运行
举报
文章被收录于专栏:芋道源码1024芋道源码1024
运行总次数:0
代码可运行

点击上方“芋道源码”,选择“设为星标

管她前浪,还是后浪?

能浪的浪,才是好浪!

每天 10:33 更新文章,每天掉亿点点头发...

源码精品专栏

来源:c1n.cn/RWt2e


前言

最近一直在做微服务开发,涉及了一些数据处理模块的开发,每个处理业务都会开发独立的微服务,便于后面拓展和流编排。

学习了 SpringCloud Data Flow 等框架,感觉这个框架对于我们来说太重了,维护起来也比较麻烦,于是根据流编排的思想,基于我们目前的技术栈实现简单的流编排功能。

简单的说,我们希望自己的流编排就是微服务可插拔,微服务数据入口及输出可不停机修改。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能。 项目地址:https://github.com/YunaiV/ruoyi-vue-pro

准备工作

Nacos 安装及使用入门

自己学习的话推荐使用 docker 安装,命令如下:

拉取镜像:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
docker pull nacos/nacos-server

创建服务:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-server

然后在浏览器输入 ip:8848/nacos,账号 nacos;密码 nacos。

docker 能够帮助我们快速安装服务,减少再环境准备花的时间。

准备三个 SpringBoot 服务,引入 Nacos 及 Kafka

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.1.0.RELEASE</version>
</parent>

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
   <groupId>com.alibaba.boot</groupId>
   <artifactId>nacos-config-spring-boot-starter</artifactId>
   <version>0.2.1</version>
</dependency>

配置文件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spring:
  kafka:
    bootstrap-servers: kafka-server:9092
    producer:
      acks: all
    consumer:
      group-id: node1-group #三个服务分别为node1 node2 node3
      enable-auto-commit: false

> 基于微服务的思想,构建在 B2C 电商场景下的项目实战。核心技术栈,是 Spring Boot + Dubbo 。未来,会重构成 Spring Cloud Alibaba 。
>
> 项目地址:<https://github.com/YunaiV/onemall>

# 部署的nacos服务
nacos:
  config:
    server-addr: nacos-server:8848

建议配置本机 host 就可以填写 xxx-server 不用填写服务 ip。

业务解读

我们现在需要对三个服务进行编排,保障每个服务可以插拔,也可以调整服务的位置。

示意图如上:

  • node1 服务监听前置服务发送的数据流,输入的 topic 为前置数据服务输出 topic
  • node2 监听 node1 处理后的数据,所以 node2 监听的 topic 为 node1 输出的 topic,node3 同理,最终 node3 处理完成后将数据发送到数据流终点
  • 我们现在要调整流程移除 node2-server,我们只需要把 node1-sink 改变成 node2-sink 即可,这样我们这几个服务就可以灵活的嵌入的不同项目的数据流处理业务中,做到即插即用(当然,数据格式这些业务层面的都是需要约定好的)
  • 动态可调还可以保证服务某一节点出现问题时候,即时改变数据流向,比如发送到数暂存服务,避免 Kafka 中积累太多数据,吞吐不平衡

Nacos 配置

①创建配置

通常流编排里面每个服务都有一个输入及输出,分别为 input 及 sink,所以每个服务我们需要配置两个 topic,分别是 input-topic output-topic,我们就在 nacos 里面添加输入输出配置。

nacos 配置项需要配置 groupId,dataId,通常我们用服务名称作为 groupId,配置项的名称作为 dataId。

node1-server 服务有一个 input 配置项,配置如下:

完成其中一个服务的配置,其它服务参考下图配置即可:

②读取配置

代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
// autoRefreshed=true指的是nacos中配置发生改变后会刷新,false代表只会使用服务启动时候读取到的值
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {

    @NacosValue(value = "${input:}", autoRefreshed = true)
    private String input;

    @NacosValue(value = "${sink:}", autoRefreshed = true)
    private String sink;

    public String getInput() {
        return input;
    }

    public String getSink() {
        return sink;
    }
}
③监听配置改变

服务的输入需要在服务启动时候创建消费者,在 topic 发生改变时候重新创建消费者,移除旧 topic 的消费者,输出是业务驱动的,无需监听改变,在每次发送时候读取到的都是最新配置的 topic。

因为在上面的配置类中 autoRefreshed = true,这个只会刷新 nacosConfig 中的配置值,服务需要知道配置改变去驱动消费的创建业务,需要创建 nacos 配置监听。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 监听Nacos配置改变,创建消费者,更新消费
 */
@Component
public class ConsumerManager {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${spring.kafka.consumer.group-id}")
    private boolean groupId;

    @Autowired
    private NacosConfig nacosConfig;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 用于存放当前消费者使用的topic
    private String topic;

    // 用于执行消费者线程
    private ExecutorService executorService;

    /**
     * 监听input
     */
    @NacosConfigListener(dataId = "node1-server", groupId = "input")
    public void inputListener(String input) {
        // 这个监听触发的时候 实际NacosConfig中input的值已经是最新的值了 我们只是需要这个监听触发我们更新消费者的业务
        String inputTopic = nacosConfig.getInput();
        // 我使用nacosConfig中读取的原因是因为监听到内容是input=xxxx而不是xxxx,如果使用需要自己截取一下,nacosConfig中的内容框架会处理好,大家看一下第一张图的配置内容就明白了
        // 先检查当前局部变量topic是否有值,有值代表是更新消费者,没有值只需要创建即可
        if(topic != null) {
            // 停止旧的消费者线程
            executorService.shutdownNow();
            executorService == null;
        }
        // 根据为新的topic创建消费者
        topic = inputTopic;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(topic + "-pool-%d").build();
        executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2), threadFactory);
        // 执行消费业务
        executorService.execute(() -> consumer(topic));
    }

    /**
     * 创建消费者
     */
    public void consumer(String topic) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", servers);
        properties.put("enable.auto.commit", enableAutoCommit);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", groupId);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic));
        try {
            while (!Thread.currentThread().isInterrupted()) {
                Duration duration = Duration.ofSeconds(1L);
                ConsumerRecords<String, String> records = consumer.poll(duration);
                for (ConsumerRecord<String, String> record : records) {
                    String message = record.value();
                    // 执行数据处理业务 省略业务实现
                    String handleMessage =  handle(message);
                    // 处理完成后发送到下一个节点
                    kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
                }
            }
            consumer.commitAsync();
        }
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}

总结

流编排的思路整体来说就是数据流方向可调,我们以此为需求,根据一些主流框架提供的 api 实现自己的动态调整方案,可以帮助自己更好的理解流编码思想及原理。

在实际业务中,还有许多业务问题需要去突破,我们这样处理更多是因为服务可插拔,便于流处理微服务在项目灵活搭配。

因为我现在工作是在传统公司,由于一些原因很难去推动新框架的使用,经常会用一些现有技术栈组合搞一些 sao 操作,供大家参考,希望大家多多指教。



欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、RedisMongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
文章有帮助的话,在看,转发吧。谢谢支持哟 (*^__^*
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 芋道源码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
SpringBoot 2.0 + Nacos + Sentinel 流控规则集中存储
Sentinel 原生版本的规则管理通过API 将规则推送至客户端并直接更新到内存中,并不能直接用于生产环境。不过官方也提供了一种 Push模式,扩展读数据源ReadableDataSource,规则中心统一推送,客户端通过注册监听器的方式时刻监听变化,比如使用 Nacos、Zookeeper 等配置中心。这种方式有更好的实时性和一致性保证。这里我们通过配置 Nacos 来实现流控规则的统一存储配置。
小柒2012
2019/12/05
1.2K0
微服务新秀之Nacos,看了就会,我说的!
大家好,我是小菜,一个渴望在互联网行业做到蔡不菜的小菜。可柔可刚,点赞则柔,白嫖则刚!「死鬼~看完记得给我来个三连哦!」
蔡不菜丶
2020/11/11
6380
微服务新秀之Nacos,看了就会,我说的!
什么是Nacos及实战使用教程
注册中心加配置中心的集合体,Nacos提供了统一配置管理、服务发现与注册。 其中服务注册和发现的功能,相当于dubbo里面使用到的zookeeper、 或者spring cloud里面应用到的consoul以及eureka。
向着百万年薪努力的小赵
2022/12/02
5980
什么是Nacos及实战使用教程
SpringBoot连接kafka——JavaDemo
Spring Boot是一个用于快速构建基于Spring框架的Java应用程序的框架。Kafka是一种分布式流处理平台,用于实时传输和处理大规模数据。通过Spring Boot与Kafka的连接,可以轻松地在Spring应用程序中使用Kafka进行数据流处理。
小明爱吃火锅
2023/10/06
8490
Sentinel如何持久化数据到Nacos?
默认情况下 Sentinel 只能接收到 Nacos 推送的消息,但不能将自己控制台修改的信息同步给 Nacos,如下图所示:
磊哥
2024/04/25
4990
微服务灵魂摆渡者Nacos
Nacos是阿里巴巴开源的服务注册中心以及配置中心,致力于给开发者提供一款便捷、简单上手的开源框架。
爱撒谎的男孩
2022/12/30
4320
Sentinel源码改造,实现Nacos双向通信!
Sentinel Dashboard(控制台)默认情况下,只能将配置规则保存到内存中,这样就会导致 Sentinel Dashboard 重启后配置规则丢失的情况,因此我们需要将规则保存到某种数据源中,Sentinel 支持的数据源有以下这些:
磊哥
2023/10/18
4900
Sentinel源码改造,实现Nacos双向通信!
【Nacos系列第一篇】-Nacos之Spring Discovery
个人比较看好Spring Cloud Alibaba家族。此系列以Nacos为主题,从Spring、Spring boot、Spring Cloud多个方面逐步进行演示,源码解读。目前来看官方文档还有待完善。网络上除了官网外缺少Nacos系列文章。都是零零散散的知识点。如此系列文章哪里写的有不周全,错误之处。欢迎大家指正。谢谢。
胖虎
2019/06/26
1.1K0
【Nacos系列第一篇】-Nacos之Spring Discovery
24.Sentinel 同步流控规则到nacos
想将修改后的流控规则同步到nacos,需要修改sentinel源码,所以大家需要提前讲sentinel源码下载下来:
AI码师
2023/12/11
2520
24.Sentinel 同步流控规则到nacos
微服务同时接入多个Kafka
kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息 kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中, producerFactory 生产者工厂 consumerFactory 消费者工厂 producerConfigs 生产者配置 consumerConfigs 消费者配置
阿提说说
2022/11/18
1.3K0
微服务同时接入多个Kafka
SpringBoot2 整合Nacos组件,环境搭建和入门案例详解
一、Nacos基础简介 1、概念简介 Nacos 是构建以“服务”为中心的现代应用架构,如微服务范式、云原生范式等服务基础设施。聚焦于发现、配置和管理微服务。Nacos提供一组简单易用的特性集,帮助开发者快速实现动态服务发现、服务配置、服务元数据及流量管理。敏捷构建、交付和管理微服务平台。 2、关键特性 动态配置服务 服务发现和服务健康监测 动态 DNS 服务 服务及其元数据管理 3、专业术语解释 命名空间 用于进行租户粒度的配置隔离。不同的命名空间下,可以存在相同的 Group 或 Data ID 的配置
知了一笑
2020/02/17
1.6K0
SpringBoot2 整合Nacos组件,环境搭建和入门案例详解
Sentinel Dashboard 中修改规则同步到 Nacos
上一篇我们介绍了如何通过改造Sentinel Dashboard来实现修改规则之后自动同步到Apollo。下面通过这篇,详细介绍当使用Nacos作为配置中心之后,如何实现Sentinel Dashboard中修改规则同步到Nacos。关于下面改造的原理和分析可以见上一篇《Sentinel Dashboard中修改规则同步到Apollo》的头两节内容,本篇就不重复介绍了。
程序猿DD
2019/05/24
1.4K0
SpringBoot使用Nacos进行服务注册发现与配置管理
最近由于业务发展,需要调研一套完善和主流的基础架构,进行中台化(微服务)的实施,考虑到技术栈切换到SOFAStack。既然整个体系都切换到蚂蚁金服的技术栈,那么自然考虑一些基础组件如服务注册发现、配置管理等都切换为阿里的技术栈。考虑到目前比较热的服务发现组件是Nacos,需要调研SpringBoot服务接入Nacos的可行性,为以后强制要求新服务使用SOFAStack + Nacos的技术栈进行服务开发打下基础。
Throwable
2020/06/23
4.3K0
SpringBoot使用Nacos进行服务注册发现与配置管理
SpringBoot整合Nacos服务注册中心
Spring Boot整合Nacos与在Spring Cloud整合不太一样,具体请看:https://nacos.io/zh-cn/ 1.添加依赖 <!-- Nacos 组件依赖 --> <dependency> <groupId>com.alibaba.boot</groupId> <artifactId>nacos-discovery-spring-boot-starter</artifactId> <version>0.2.3</version> </dependency>
甄士隐
2022/01/26
7620
SpringBoot整合Nacos服务注册中心
Kafka及Spring Cloud Stream
下载kafka http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
HUC思梦
2020/09/03
1.2K0
微服务配置中心-Nacos
目前是github的一个明星项目,高达6k+的star。有大量组织在现网使用Nacos,详见官方issue:https://github.com/alibaba/nacos/issues/273
皮皮熊
2019/06/17
2.9K0
微服务配置中心-Nacos
SpringBoot基础(五、整合Kafka及原生api使用)
只管发送, 不管结果: 只调用接口发送消息到 Kafka 服务器, 但不管成功写入与否。 由于 Kafka 是高可用的, 因此大部分情 况下消息都会写入, 但在异常情况下会丢消息。
营琪
2019/11/12
8710
【云原生】nacos与springboot结合
本文主要面向 Spring Boot 的使用者,通过两个示例来介绍如何使用 Nacos 来实现分布式环境下的配置管理和服务发现。
一个风轻云淡
2023/10/15
3300
【云原生】nacos与springboot结合
SpringBoot使用Nacos配置中心
Nacos是阿里巴巴集团开源的一个易于使用的平台,专为动态服务发现,配置和服务管理而设计。它可以帮助您轻松构建云本机应用程序和微服务平台。
lyb-geek
2019/03/07
3.6K0
SpringBoot使用Nacos配置中心
Asynchronous Servlet 在 Nacos 1.X 动态配置管理中的应用
Nacos/nɑ:kəʊs/脱胎于阿里巴巴内部的 Config Server、VIPServer 和 Diamond,成长于多年双十一的洪峰考验,沉淀了简单易用、稳定可靠、性能卓越的核心竞争力。于 2018 年正式开源,其核心特性有:服务发现、动态配置管理 和 动态 DNS 服务。
程序猿杜小头
2022/12/01
6740
Asynchronous Servlet 在 Nacos 1.X 动态配置管理中的应用
相关推荐
SpringBoot 2.0 + Nacos + Sentinel 流控规则集中存储
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验