
翻译:https://medium.com/@ia_taras/dont-let-the-stream-overwhelm-you-effectively-managing-backpressure-in-spring-webflux-4404b1f726d3
在当今时代,响应式编程在现代网络应用开发者中备受追捧。而这一领域最吸引人的概念之一就是背压,它是一种数据流控制机制,用于数据生产者和消费者之间。
本文将解释什么是背压,它在 Spring WebFlux 中如何工作,以及可以采用哪些策略来实现背压。
当数据生产速率超过消费者消费速率时,背压就会出现,它是系统中的流量控制机制。在响应式编程中,背压是一种告知生产者消费者准备好处理的数据量的技术。
可以想象一个管道系统:当水在管道中以很大压力流动时,管道可能会破裂。背压的理念更像是一个调节阀,降低水压,以确保水流不仅安全,而且以最有效的方式运行。
要理解 Spring WebFlux 中的背压,首先要了解在网络协议层面,如 TCP,这一概念的含义。
TCP(传输控制协议)是一种可靠的传输层协议。它保证数据无损且按正确顺序交付。其主要机制之一就是流量控制,防止发送方传输数据的速度超过接收方的处理能力。
主要特征:
这是字节层面上背压的基本形式。
欲了解更多信息,请参阅这篇详细文章。
Spring WebFlux 在底层使用 Netty,而 Netty 是建立在 TCP 之上的。TCP 操作的是字节,而应用程序操作的是逻辑对象或消息。
WebFlux 中背压的主要特征:
为了理解 Spring WebFlux 中的背压工作原理,让我们通过客户端和服务器交互的方式,可视化背压的应用过程。
1. 客户端发送请求:客户端应用向服务器发起请求,例如,发起 HTTP 请求以获取特定数据,如大型对象集合或流数据。
2. 服务器响应:请求到达服务器,服务器开始构建响应。数据开始通过 TCP 连接作为字节流传输。
3. 解码并添加到 Flux:客户端应用通过 TCP 接收字节流。这些字节被解构成逻辑对象——消息并排队在 Flux 中。
4. 消费者提取和处理项目:消费者(例如,客户端应用中的一个处理程序)订阅 Flux 并开始拉取项目进行进一步处理。处理可能涉及大量计算、I/O 或数据库操作。
5. 慢消费者填满队列:如果消费者消费速度慢,它会开始填满提供给 Flux 的队列——换句话说,数据堆积起来等待消费。
6. 缓冲区溢出信号:当 Flux 无法再累积数据时(例如队列已满),此信号会传播回 TCP 层。TCP 中的流量控制向服务器发出信号,表明接收方已过载。
7. 服务器停止发送数据:一旦收到接收方无法处理更多数据的信号,服务器停止发送数据。然后,它等待接收方在缓冲区中腾出一些空间。
8. 消费者释放缓冲区空间:在消费数据时,它会释放 Flux 队列中的空间。现在它已准备好接收更多数据。
9. 恢复数据传输:服务器恢复数据传输,但保持传输速率与消费者的处理速度相平衡,以维持反馈循环而不使其过载。
现在,了解了背压的重要性和机制后,让我们继续介绍在 Spring WebFlux 中实现背压的实际操作。我们将审查一个简单项目:一个使用 WebClient 从服务接收数据流的客户端应用。
假设有一个服务,它应该能够处理从外部 API 提供的实时数据。目标是让该服务能够处理 API 不同的数据速率。
假设我们正在检索一个数据流:
@Service
publicclass DataService {
privatestaticfinal Logger log = LoggerFactory.getLogger(DataService.class);
privatefinal WebClient webClient;
public DataService(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.baseUrl("https://external-api.com").build();
}
public Flux<Data> getData() {
var dataFlux = webClient.get()
.uri("/data-stream")
.retrieve()
.bodyToFlux(Data.class);
return dataFlux;
}
}
Flux<Data>:一个数据对象的响应式数据流。/data-stream:外部 API 中一个返回数据流的端点。默认情况下,Flux 有一个内部缓冲区,最多可容纳 256 个元素。这意味着,对于处理元素速度慢于生产者发出速度的消费者,Flux 会在要求背压策略之前开始缓冲最多 256 个元素。
Spring WebFlux 提供了多个操作符来管理背压发生时的行为:
在缓冲区中累积数据,并处理溢出。
public Flux<Data> getDataBuffer() {
var dataFlux =
webClient
.get()
.uri("/data-stream")
.retrieve()
.bodyToFlux(Data.class)
.onBackpressureBuffer(
100, // 缓冲区大小
data -> {
// 溢出处理
log.error("缓冲区溢出,数据丢失:{}", data);
}); return dataFlux;
}
当消费者无法足够快地处理新元素时将其丢弃。在缓冲区溢出不可接受且数据丢失可容忍时非常有用。
public Flux<Data> getDataDrop() {
var dataFlux =
webClient
.get()
.uri("/data-stream")
.retrieve()
.bodyToFlux(Data.class)
.onBackpressureDrop(
data -> {
// 处理丢弃的数据
log.warn("由于溢出,数据被丢弃:{}", data);
}); return dataFlux;
}
只保留最新元素,丢弃之前未处理的元素。适用于只有最新数据相关的场景。
public Flux<Data> getDataLatest() {
var dataFlux =
webClient
.get()
.uri("/data-stream")
.retrieve()
.bodyToFlux(Data.class)
.onBackpressureLatest(); return dataFlux;
}
当发生溢出时生成 OverflowException。在需要立即通知和纠正措施时非常有用。
public Flux<Data> getDataError() {
var dataFlux =
webClient
.get()
.uri("/data-stream")
.retrieve()
.bodyToFlux(Data.class)
.onBackpressureError(); return dataFlux;
}
今天,我们讨论了背压是什么,它允许在生产者和消费者之间高效地管理数据流。理解背压操作符及其正确使用将防止缓冲区溢出问题,并确保系统在高负载条件下的稳定性。您可以借助 Spring WebFlux 提供的操作符和配置,确保响应式流的行为,以实现应用程序的最大性能。希望本文和可视化示例有助于您理解 Spring WebFlux 中的背压概念,现在您可以将这些知识应用到您的项目中。