接上节继续,Checkpoint(检查点)的主要作用是保存图执行过程中的状态,让图可以在需要时暂停并在之后从断点恢复执行,通常需要与interrupt结合使用。
先定义1个图
public static StateGraph<MessagesState<String>> getGraph() throws GraphStateException {
return new StateGraph<>(MessagesState.SCHEMA, MessagesState<String>::new)
.addNode("node-1", node_async(state -> Map.of(MESSAGES_STATE, "have")))
.addNode("node-2", node_async(state -> Map.of(MESSAGES_STATE, "a")))
.addNode("node-3", node_async(state -> Map.of(MESSAGES_STATE, "good")))
.addNode("node-4", node_async(state -> Map.of(MESSAGES_STATE, "trip")))
.addEdge(GraphDefinition.START, "node-1")
.addEdge("node-1", "node-2")
.addEdge("node-2", "node-3")
.addEdge("node-3", "node-4")
.addEdge("node-4", GraphDefinition.END);
}
常规执行后,预期应该是 输出 [have a good trip]
现在我们小改一下,在node-3节点进入前,设置1个打断,测试interrupt效果
static void startWithoutCheckpoint() throws Exception {
StateGraph<MessagesState<String>> graph = getGraph();
graph.addBeforeCallNodeHook((String node, MessagesState<String> data, RunnableConfig config) -> {
out.println("Before calling node: " + node + ", data: " + data.data());
return CompletableFuture.completedFuture(data.data());
});
//node-3进入前,被打断
CompileConfig cc = CompileConfig.builder()
.interruptBefore("node-3")
.build();
RunnableConfig rc = RunnableConfig.builder()
.threadId("test-interrupt")
.build();
CompiledGraph<MessagesState<String>> workflow = graph.compile(cc);
//运行完后,最终只会输出[have a] - node-3被打断,执行中止
workflow.invoke(Map.of(), rc)
.ifPresent(state -> System.out.println(state.value(MESSAGES_STATE).orElse(null)));
}运行效果
Before calling node: node-1, data: {messages=[]}
Before calling node: node-2, data: {messages=[have]}
[have, a]可见,interrupt会让图提前中止。
仅仅设置interrupt断点通常没有太大的实际意义 ,可以结合CheckpointSaver在断点时,将图的状态保存下来,便于后续恢复。

LangGraph4j提供了几种常用的CheckPointSaver实现,如上图。 可以先拿最简单的MemorySaver测试一下,基本用法如下:
static BaseCheckpointSaver getSaver() {
return new MemorySaver();
// return new FileSystemSaver(Path.of("output"), new ObjectStreamStateSerializer<>(MessagesState<String>::new));
// return new JsonFileSystemSaver(Path.of("output"));
}
static void startWithCheckpoint(BaseCheckpointSaver saver) throws Exception {
StateGraph<MessagesState<String>> graph = getGraph();
graph.addBeforeCallNodeHook((String node, MessagesState<String> data, RunnableConfig config) -> {
out.println("Before calling node: " + node + ", data: " + data.data());
return CompletableFuture.completedFuture(data.data());
});
//node-3进入前,被打断
CompileConfig cc = CompileConfig.builder()
.checkpointSaver(saver)
.interruptBefore("node-3")
.build();
RunnableConfig rc = RunnableConfig.builder().threadId("test-interrupt")
.build();
CompiledGraph<MessagesState<String>> workflow = graph.compile(cc);
//运行完后,最终只会输出[have a] - node-3被打断,执行中止
workflow.invoke(Map.of(), rc)
.ifPresent(state -> System.out.println(state.value(MESSAGES_STATE).orElse(null)));
}
static void recoverFromCheckpoint(BaseCheckpointSaver saver) throws Exception {
StateGraph<MessagesState<String>> graph = getGraph();
graph.addBeforeCallNodeHook((String node, MessagesState<String> data, RunnableConfig config) -> {
out.println("Before calling node: " + node + ", data: " + data.data());
return CompletableFuture.completedFuture(data.data());
});
CompileConfig cc = CompileConfig.builder()
.checkpointSaver(saver)
.interruptBefore("node-3")
.build();
RunnableConfig rc = RunnableConfig.builder().threadId("test-interrupt")
.build();
CompiledGraph<MessagesState<String>> workflow = graph.compile(cc);
//取出interrupt前的状态快照
StateSnapshot<MessagesState<String>> snapshot = workflow.getState(rc);
System.out.println("snapshot=>" + snapshot.state().data());
//将图的状态,更新到interrupt前的状态快照
RunnableConfig runnableConfig = workflow.updateState(rc, snapshot.state().data());
//从断点恢复运行
workflow.invoke(GraphInput.resume(), runnableConfig)
.ifPresent(state -> System.out.println(state.value(MESSAGES_STATE).orElse(null)));
}17行这里,在打断时,同时设置了CheckPointSaver,这样就能在将打断时的状态保存起来。
接下来的recoverFromCheckpoint方法中,行47将先前保存的状态取出来,然后更新到图中(相当于恢复打断前的现场),最后54行继续从断点处运行。
串在一起测试下:
public static void main(String[] args) throws Exception {
startWithoutCheckpoint();
out.println("\n------------------------\n");
BaseCheckpointSaver saver = getSaver();
startWithCheckpoint(saver);
out.println("\n------------------------\n");
recoverFromCheckpoint(saver);
}运行结果:
Before calling node: node-1, data: {messages=[]}
Before calling node: node-2, data: {messages=[have]}
[have, a]
------------------------
Before calling node: node-1, data: {messages=[]}
Before calling node: node-2, data: {messages=[have]}
[have, a]
------------------------
snapshot=>{messages=[have, a]}
Before calling node: node-3, data: {messages=[have, a]}
Before calling node: node-4, data: {messages=[have, a, good]}
[have, a, good, trip]13-16行的输出可以看出,node-3, node-4从断点处继续运行,直至结束。