首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >LangGraph4j 学习系列(7)-流式响应

LangGraph4j 学习系列(7)-流式响应

作者头像
菩提树下的杨过
发布2026-03-02 09:27:02
发布2026-03-02 09:27:02
230
举报

上节继续,流式响应在LLM应用中是改善用户体验的重要手段之一,可以有效缓解长耗时应用的用户焦虑感。

Stream基础示例

定义节点
代码语言:javascript
复制
/**
 * @author junmingyang
 */
public class ChatNode implements NodeAction<MessagesState<String>> {

    private final String message;

    public ChatNode(String message) {
        this.message = message;
    }

    @Override
    public Map<String, Object> apply(MessagesState<String> state) throws Exception {
//        System.out.println("current messages => " + state.messages());
        //模拟节点执行耗时
        Thread.sleep(2000);
        return Map.of(MessagesState.MESSAGES_STATE, this.message);
    }
}
示例代码
代码语言:javascript
复制
/**
 * @author junmingyang
 */
public class StreamGraphApplication {

    public static void main(String[] args) throws GraphStateException {

    //流式模式
    StateGraph<MessagesState<String>> graph1 = getGraph();

    RunnableConfig rc = RunnableConfig.builder()
            .threadId("conversation-1")
            .streamMode(CompiledGraph.StreamMode.VALUES)
            .build();

    AsyncGenerator.Cancellable<NodeOutput<MessagesState<String>>> result = graph1.compile()
            //这里调用stream方法,而不是invoke方法
            .stream(Map.of(), rc);

    System.out.println("=========流式stream模式========");
    for (NodeOutput<MessagesState<String>> output : result) {
        System.out.println("Node: " + output.node());
        if (!CollectionUtils.isEmpty(output.state().messages())) {
            System.out.println(output.state().messages().toString());
        }

        //流式模式下,可以通过取消来停止流式执行
        if ("node-3".equalsIgnoreCase(output.node())) {
            result.cancel(true);
        }
    }

    System.out.println("=========常规invoke模式========");

    //常规模式
    StateGraph<MessagesState<String>> graph2 = getGraph();

    graph2.compile().invoke(Map.of()).ifPresent(c -> {
        System.out.println(c.data());
    });

}


    public static StateGraph<MessagesState<String>> getGraph() throws GraphStateException {
        return new StateGraph<MessagesState<String>>(MessagesState.SCHEMA, MessagesState::new)
                .addNode("node-1", node_async(new ChatNode("1")))
                .addNode("node-2", node_async(new ChatNode(" 2")))
                .addNode("node-3", node_async(new ChatNode(" 3")))
                .addNode("node-4", node_async(new ChatNode(" 4")))
                .addNode("node-5", node_async(new ChatNode(" 5")))

                .addEdge(GraphDefinition.START, "node-1")
                .addEdge("node-1", "node-2")
                .addEdge("node-2", "node-3")
                .addEdge("node-3", "node-4")
                .addEdge("node-4", "node-5")
                .addEdge("node-5", GraphDefinition.END);
    }
}
运行结果
stream_simple_output
stream_simple_output

代码中的org.bsc.async.AsyncGenerator是实现流式的关键接口,Graph调用stream()方法后,会得到该接口的实例,该接口的主要类图如下:

deepseek_mermaid_20260301_a8e9df
deepseek_mermaid_20260301_a8e9df

与LangChain4j整合

从上面的类图可以看到,langgraph4j-langchain4j 包下,StreamingChatGenerator 也实现了AsyncGenerator接口,用它可以实现与LLM的流式交互

代码语言:javascript
复制
public class LLMStreamApp {

    public static void main(String[] args) {

        StreamingChatGenerator<AgentState> generator = StreamingChatGenerator.builder()
                .mapResult(r -> Map.of("content", r.aiMessage().text()))
                .build();

        OllamaStreamingChatModel model = OllamaStreamingChatModel.builder()
                .baseUrl("http://localhost:11434")
                .temperature(0.0)
                .logRequests(true)
                .logResponses(true)
                .modelName("glm-4.6:cloud")
                .build();

        ChatRequest request = ChatRequest.builder()
                .messages(UserMessage.from("李清照的成名作有哪些?"))
                .build();

        model.chat(request, generator.handler());

        for (StreamingOutput<AgentState> output : generator) {
            System.out.print(output.chunk());
        }
    }
}
llm_stream2
llm_stream2

时序图 - 由Cursor生成

deepseek_mermaid_20260301_3d78d0
deepseek_mermaid_20260301_3d78d0

上述的代码,并未体现出Graph图的特性,也可以改成下面的写法:

代码语言:javascript
复制
public class LLMStreamGraphApp {

    public static void main(String[] args) throws GraphStateException {

        OllamaStreamingChatModel model = OllamaStreamingChatModel.builder()
                .baseUrl("http://localhost:11434")
                .temperature(0.0)
                .logRequests(true)
                .logResponses(true)
                .modelName("qwen3:1.7b")
                .build();

        //定义Agent节点
        NodeAction<MessagesState<ChatMessage>> callModel = state -> {
            StreamingChatGenerator<MessagesState<ChatMessage>> generator = StreamingChatGenerator.<MessagesState<ChatMessage>>builder()
                    .mapResult(response -> Map.of(MESSAGES_STATE, response.aiMessage()))
                    .startingNode("agent")
                    .startingState(state)
                    .build();

            ChatRequest request = ChatRequest.builder()
                    .messages(state.messages())
                    .build();

            model.chat(request, generator.handler());
            //注:key名可以改成其它,不一定非要是“_streaming_messages”
            return Map.of("_streaming_messages", generator);
        };


        //定义图
        CompiledGraph<MessagesState<ChatMessage>> graph = new MessagesStateGraph<>(new LC4jStateSerializer<MessagesState<ChatMessage>>(MessagesState::new))
                .addNode("agent", node_async(callModel))
                .addEdge(START, "agent")
                .addEdge("agent", END)
                .compile();

        System.out.println(graph.getGraph(GraphRepresentation.Type.MERMAID, "LLM Stream Graph", true).content());

        //流式执行
        AsyncGenerator<NodeOutput<MessagesState<ChatMessage>>> stream = graph.stream(Map.of(MESSAGES_STATE, UserMessage.from("李清照的成名作有哪些?")));

        //输出流式结果
        for (NodeOutput<MessagesState<ChatMessage>> out : stream) {
            if (out instanceof StreamingOutput<?> streamingOut) {
                System.out.print(streamingOut.chunk());
            }
        }

    }
} 
image
image

文中源码:langgraph4j-study/src/main/java/org/bsc/langgraph4j/agent/_12_stream at main · yjmyzz/langgraph4j-study · GitHub

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-03-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Stream基础示例
    • 定义节点
    • 示例代码
    • 运行结果
  • 与LangChain4j整合
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档