接上节继续,流式响应在LLM应用中是改善用户体验的重要手段之一,可以有效缓解长耗时应用的用户焦虑感。
/**
* @author junmingyang
*/
public class ChatNode implements NodeAction<MessagesState<String>> {
private final String message;
public ChatNode(String message) {
this.message = message;
}
@Override
public Map<String, Object> apply(MessagesState<String> state) throws Exception {
// System.out.println("current messages => " + state.messages());
//模拟节点执行耗时
Thread.sleep(2000);
return Map.of(MessagesState.MESSAGES_STATE, this.message);
}
}/**
* @author junmingyang
*/
public class StreamGraphApplication {
public static void main(String[] args) throws GraphStateException {
//流式模式
StateGraph<MessagesState<String>> graph1 = getGraph();
RunnableConfig rc = RunnableConfig.builder()
.threadId("conversation-1")
.streamMode(CompiledGraph.StreamMode.VALUES)
.build();
AsyncGenerator.Cancellable<NodeOutput<MessagesState<String>>> result = graph1.compile()
//这里调用stream方法,而不是invoke方法
.stream(Map.of(), rc);
System.out.println("=========流式stream模式========");
for (NodeOutput<MessagesState<String>> output : result) {
System.out.println("Node: " + output.node());
if (!CollectionUtils.isEmpty(output.state().messages())) {
System.out.println(output.state().messages().toString());
}
//流式模式下,可以通过取消来停止流式执行
if ("node-3".equalsIgnoreCase(output.node())) {
result.cancel(true);
}
}
System.out.println("=========常规invoke模式========");
//常规模式
StateGraph<MessagesState<String>> graph2 = getGraph();
graph2.compile().invoke(Map.of()).ifPresent(c -> {
System.out.println(c.data());
});
}
public static StateGraph<MessagesState<String>> getGraph() throws GraphStateException {
return new StateGraph<MessagesState<String>>(MessagesState.SCHEMA, MessagesState::new)
.addNode("node-1", node_async(new ChatNode("1")))
.addNode("node-2", node_async(new ChatNode(" 2")))
.addNode("node-3", node_async(new ChatNode(" 3")))
.addNode("node-4", node_async(new ChatNode(" 4")))
.addNode("node-5", node_async(new ChatNode(" 5")))
.addEdge(GraphDefinition.START, "node-1")
.addEdge("node-1", "node-2")
.addEdge("node-2", "node-3")
.addEdge("node-3", "node-4")
.addEdge("node-4", "node-5")
.addEdge("node-5", GraphDefinition.END);
}
}
代码中的org.bsc.async.AsyncGenerator是实现流式的关键接口,Graph调用stream()方法后,会得到该接口的实例,该接口的主要类图如下:

从上面的类图可以看到,langgraph4j-langchain4j 包下,StreamingChatGenerator 也实现了AsyncGenerator接口,用它可以实现与LLM的流式交互
public class LLMStreamApp {
public static void main(String[] args) {
StreamingChatGenerator<AgentState> generator = StreamingChatGenerator.builder()
.mapResult(r -> Map.of("content", r.aiMessage().text()))
.build();
OllamaStreamingChatModel model = OllamaStreamingChatModel.builder()
.baseUrl("http://localhost:11434")
.temperature(0.0)
.logRequests(true)
.logResponses(true)
.modelName("glm-4.6:cloud")
.build();
ChatRequest request = ChatRequest.builder()
.messages(UserMessage.from("李清照的成名作有哪些?"))
.build();
model.chat(request, generator.handler());
for (StreamingOutput<AgentState> output : generator) {
System.out.print(output.chunk());
}
}
}
时序图 - 由Cursor生成

上述的代码,并未体现出Graph图的特性,也可以改成下面的写法:
public class LLMStreamGraphApp {
public static void main(String[] args) throws GraphStateException {
OllamaStreamingChatModel model = OllamaStreamingChatModel.builder()
.baseUrl("http://localhost:11434")
.temperature(0.0)
.logRequests(true)
.logResponses(true)
.modelName("qwen3:1.7b")
.build();
//定义Agent节点
NodeAction<MessagesState<ChatMessage>> callModel = state -> {
StreamingChatGenerator<MessagesState<ChatMessage>> generator = StreamingChatGenerator.<MessagesState<ChatMessage>>builder()
.mapResult(response -> Map.of(MESSAGES_STATE, response.aiMessage()))
.startingNode("agent")
.startingState(state)
.build();
ChatRequest request = ChatRequest.builder()
.messages(state.messages())
.build();
model.chat(request, generator.handler());
//注:key名可以改成其它,不一定非要是“_streaming_messages”
return Map.of("_streaming_messages", generator);
};
//定义图
CompiledGraph<MessagesState<ChatMessage>> graph = new MessagesStateGraph<>(new LC4jStateSerializer<MessagesState<ChatMessage>>(MessagesState::new))
.addNode("agent", node_async(callModel))
.addEdge(START, "agent")
.addEdge("agent", END)
.compile();
System.out.println(graph.getGraph(GraphRepresentation.Type.MERMAID, "LLM Stream Graph", true).content());
//流式执行
AsyncGenerator<NodeOutput<MessagesState<ChatMessage>>> stream = graph.stream(Map.of(MESSAGES_STATE, UserMessage.from("李清照的成名作有哪些?")));
//输出流式结果
for (NodeOutput<MessagesState<ChatMessage>> out : stream) {
if (out instanceof StreamingOutput<?> streamingOut) {
System.out.print(streamingOut.chunk());
}
}
}
} 