首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >别让数据流淹没你:在 Spring WebFlux 中高效管理背压

别让数据流淹没你:在 Spring WebFlux 中高效管理背压

作者头像
崔认知
发布2025-07-16 16:03:42
发布2025-07-16 16:03:42
6120
举报
文章被收录于专栏:nobodynobody

翻译:https://medium.com/@ia_taras/dont-let-the-stream-overwhelm-you-effectively-managing-backpressure-in-spring-webflux-4404b1f726d3

引言

在当今时代,响应式编程在现代网络应用开发者中备受追捧。而这一领域最吸引人的概念之一就是背压,它是一种数据流控制机制,用于数据生产者和消费者之间。

本文将解释什么是背压,它在 Spring WebFlux 中如何工作,以及可以采用哪些策略来实现背压。

什么是背压?

当数据生产速率超过消费者消费速率时,背压就会出现,它是系统中的流量控制机制。在响应式编程中,背压是一种告知生产者消费者准备好处理的数据量的技术。

可以想象一个管道系统:当水在管道中以很大压力流动时,管道可能会破裂。背压的理念更像是一个调节阀,降低水压,以确保水流不仅安全,而且以最有效的方式运行。

背压策略

  • 缓冲:将数据存储在缓冲区中,直到消费者准备好消费。
  • 丢弃:如果消费者无法跟上生产者的速度,则丢弃额外数据。
  • 最新:只保留最新信息,丢弃之前未处理的数据。
  • 错误:当系统过载时抛出错误,表明无法再处理更多数据。

网络协议中的背压

要理解 Spring WebFlux 中的背压,首先要了解在网络协议层面,如 TCP,这一概念的含义。

TCP 中的流量控制

TCP(传输控制协议)是一种可靠的传输层协议。它保证数据无损且按正确顺序交付。其主要机制之一就是流量控制,防止发送方传输数据的速度超过接收方的处理能力。

主要特征:

  • 接收窗口:告知发送方其接收窗口的大小,即它愿意接收的数据量。
  • 传输速率控制:发送方根据接收方的窗口大小设置传输速率。
  • 反馈:如果接收方的缓冲区满了,窗口大小会减小,指示发送方降低传输速度。

这是字节层面上背压的基本形式。

欲了解更多信息,请参阅这篇详细文章。

WebFlux 中的背压

Spring WebFlux 在底层使用 Netty,而 Netty 是建立在 TCP 之上的。TCP 操作的是字节,而应用程序操作的是逻辑对象或消息。

WebFlux 中背压的主要特征:

  • 事件到字节的转换:WebFlux 负责将高级事件或消息转换为要发送的字节。
  • 事件排队:如果消费者未请求新项目(忙于长时间处理),事件会在队列中累积。
  • 缺乏对象级控制:TCP 在字节层面进行流量控制,不了解应用对象或消息的逻辑边界。
  • 慢消费者问题:在没有应用级背压机制的情况下,快速生产者可能会使事件队列溢出,导致延迟增加和资源消耗。

背压过程的可视化

为了理解 Spring WebFlux 中的背压工作原理,让我们通过客户端和服务器交互的方式,可视化背压的应用过程。

过程步骤:

1. 客户端发送请求:客户端应用向服务器发起请求,例如,发起 HTTP 请求以获取特定数据,如大型对象集合或流数据。

2. 服务器响应:请求到达服务器,服务器开始构建响应。数据开始通过 TCP 连接作为字节流传输。

3. 解码并添加到 Flux:客户端应用通过 TCP 接收字节流。这些字节被解构成逻辑对象——消息并排队在 Flux 中。

4. 消费者提取和处理项目:消费者(例如,客户端应用中的一个处理程序)订阅 Flux 并开始拉取项目进行进一步处理。处理可能涉及大量计算、I/O 或数据库操作。

5. 慢消费者填满队列:如果消费者消费速度慢,它会开始填满提供给 Flux 的队列——换句话说,数据堆积起来等待消费。

6. 缓冲区溢出信号:当 Flux 无法再累积数据时(例如队列已满),此信号会传播回 TCP 层。TCP 中的流量控制向服务器发出信号,表明接收方已过载。

7. 服务器停止发送数据:一旦收到接收方无法处理更多数据的信号,服务器停止发送数据。然后,它等待接收方在缓冲区中腾出一些空间。

8. 消费者释放缓冲区空间:在消费数据时,它会释放 Flux 队列中的空间。现在它已准备好接收更多数据。

9. 恢复数据传输:服务器恢复数据传输,但保持传输速率与消费者的处理速度相平衡,以维持反馈循环而不使其过载。

在 Spring WebFlux 中使用背压

现在,了解了背压的重要性和机制后,让我们继续介绍在 Spring WebFlux 中实现背压的实际操作。我们将审查一个简单项目:一个使用 WebClient 从服务接收数据流的客户端应用。

设置项目

假设有一个服务,它应该能够处理从外部 API 提供的实时数据。目标是让该服务能够处理 API 不同的数据速率。

使用 WebClient 获取数据

假设我们正在检索一个数据流:

代码语言:javascript
复制
@Service
publicclass DataService {
privatestaticfinal Logger log = LoggerFactory.getLogger(DataService.class); 
privatefinal WebClient webClient; 
public DataService(WebClient.Builder webClientBuilder) {
    this.webClient = webClientBuilder.baseUrl("https://external-api.com").build();
  } 
public Flux<Data> getData() {
    var dataFlux = webClient.get()
              .uri("/data-stream")
             .retrieve()
             .bodyToFlux(Data.class);
    return dataFlux;
  }
}
  • Flux<Data>:一个数据对象的响应式数据流。
  • /data-stream:外部 API 中一个返回数据流的端点。

处理 Spring WebFlux 中的背压

默认情况下,Flux 有一个内部缓冲区,最多可容纳 256 个元素。这意味着,对于处理元素速度慢于生产者发出速度的消费者,Flux 会在要求背压策略之前开始缓冲最多 256 个元素。

使用 onBackpressure* 操作符

Spring WebFlux 提供了多个操作符来管理背压发生时的行为:

onBackpressureBuffer

在缓冲区中累积数据,并处理溢出。

代码语言:javascript
复制
public Flux<Data> getDataBuffer() {
    var dataFlux =
        webClient
            .get()
            .uri("/data-stream")
            .retrieve()
            .bodyToFlux(Data.class)
            .onBackpressureBuffer(
                100, // 缓冲区大小
                data -> {
                  // 溢出处理
                  log.error("缓冲区溢出,数据丢失:{}", data);
                });        return dataFlux;
}

onBackpressureDrop

当消费者无法足够快地处理新元素时将其丢弃。在缓冲区溢出不可接受且数据丢失可容忍时非常有用。

代码语言:javascript
复制
public Flux<Data> getDataDrop() {
    var dataFlux =
        webClient
            .get()
            .uri("/data-stream")
            .retrieve()
            .bodyToFlux(Data.class)
            .onBackpressureDrop(
                data -> {
                  // 处理丢弃的数据
                  log.warn("由于溢出,数据被丢弃:{}", data);
                });    return dataFlux;
}

onBackpressureLatest

只保留最新元素,丢弃之前未处理的元素。适用于只有最新数据相关的场景。

代码语言:javascript
复制
public Flux<Data> getDataLatest() {
    var dataFlux =
        webClient
            .get()
            .uri("/data-stream")
            .retrieve()
            .bodyToFlux(Data.class)
            .onBackpressureLatest();    return dataFlux;
}

onBackpressureError

当发生溢出时生成 OverflowException。在需要立即通知和纠正措施时非常有用。

代码语言:javascript
复制
public Flux<Data> getDataError() {
        var dataFlux =
                webClient
                        .get()
                        .uri("/data-stream")
                        .retrieve()
                        .bodyToFlux(Data.class)
                        .onBackpressureError();        return dataFlux;
}

结论

今天,我们讨论了背压是什么,它允许在生产者和消费者之间高效地管理数据流。理解背压操作符及其正确使用将防止缓冲区溢出问题,并确保系统在高负载条件下的稳定性。您可以借助 Spring WebFlux 提供的操作符和配置,确保响应式流的行为,以实现应用程序的最大性能。希望本文和可视化示例有助于您理解 Spring WebFlux 中的背压概念,现在您可以将这些知识应用到您的项目中。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 认知科技技术团队 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • 什么是背压?
  • 背压策略
  • 网络协议中的背压
  • TCP 中的流量控制
  • WebFlux 中的背压
  • 背压过程的可视化
  • 过程步骤:
  • 在 Spring WebFlux 中使用背压
  • 设置项目
  • 使用 WebClient 获取数据
  • 处理 Spring WebFlux 中的背压
  • 使用 onBackpressure* 操作符
  • onBackpressureBuffer
  • onBackpressureDrop
  • onBackpressureLatest
  • onBackpressureError
  • 结论
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档