在 Java 中使用 WebClient + SSE(Server-Sent Events) 来消费服务端流式响应是 Spring WebFlux 中推荐的方式。下面是完整的示例结构,展示如何使用 WebClient 接收 SSE 数据流,适用于对接 OpenAI、LangChain、Spring SSE 服务等流式响应。
一、服务端返回 SSE(text/event-stream)
// 示例 Controller - 模拟返回 SSE 流数据@RestController@RequestMapping("/sse")public class SseServerController { @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamEvents() { return Flux.interval(Duration.ofMillis(500)) .map(i -> "服务器消息: " + i) .take(10); }}php364 Bytes© 菜鸟-创作你的创作
二、客户端 WebClient 接收 SSE 流(推荐方式)
你可以使用 WebClient
来发起 SSE 请求,并以 Flux 方式处理:
@Componentpublic class SseClient { private final WebClient webClient = WebClient.create("http://localhost:8080"); public void consumeSseStream() { webClient.get() .uri("/sse/stream") .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(String.class) .doOnNext(msg -> System.out.println("收到消息: " + msg)) .blockLast(); // 阻塞直到流完成(可改为 subscribe 异步) }}php476 Bytes© 菜鸟-创作你的创作
或者异步监听(非阻塞):
public void asyncSseListen() { webClient.get() .uri("/sse/stream") .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(String.class) .subscribe(msg -> { System.out.println("接收到: " + msg); }, error -> { System.err.println("错误: " + error.getMessage()); }, () -> { System.out.println("接收结束"); });}php463 Bytes© 菜鸟-创作你的创作
三、使用 WebClient 对接 OpenAI 流式接口(SSE)
如果你使用的是 OpenAI Chat Completion 接口并启用 stream=true
,格式类似 SSE:
public void callOpenAIStream(String prompt) { WebClient client = WebClient.builder() .baseUrl("https://api.openai.com/v1") .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer YOUR_API_KEY") .build(); Map<String, Object> requestBody = Map.of( "model", "gpt-3.5-turbo", "stream", true, "messages", List.of(Map.of("role", "user", "content", prompt)) ); client.post() .uri("/chat/completions") .contentType(MediaType.APPLICATION_JSON) .accept(MediaType.TEXT_EVENT_STREAM) .bodyValue(requestBody) .retrieve() .bodyToFlux(String.class) // 返回每一行 SSE 数据 .takeUntil(msg -> msg.contains("[DONE]")) // OpenAI 特有结束标志 .map(msg -> parseOpenAIChunk(msg)) // 你可在此解析 JSON 结构 .doOnNext(System.out::println) .blockLast();}php848 Bytes© 菜鸟-创作你的创作
四、注意事项
bodyToFlux(String.class)
获取每个 SSE 数据块;data:
),需解析;data: {json...}
行。https://www.52runoob.com/archives/4332
五、前端配合(原生 JS)
<script> const source = new EventSource("/sse/stream"); source.onmessage = function(e) { console.log("前端收到数据:", e.data); };</script>php143 Bytes© 菜鸟-创作你的创作
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。