接上节继续,用过Spring框架的同学都知道,Spring中的AOP,可以在无侵入的情况下,对原有的代码逻辑做增强(比如:在代码执行前后,自动记录日志、计算方法执行耗时等等)
LangGraph4j中的Hook机制跟AOP类似,可以在Node执行的before/after/wrap 这3个时机,无侵入的附加自己的代码逻辑。Edge也一样,只不过只有"条件边(ConditionalEdge)",注册到Edge上的勾子才生效。
org.bsc.langgraph4j.StateGraph类中,有如下方法:
public StateGraph<State> addWrapCallNodeHook(NodeHook.WrapCall<State> wrapCallHook ) {
nodeHooks.wrapCalls.add( wrapCallHook );
return this;
}
public StateGraph<State> addWrapCallNodeHook( String nodeId, NodeHook.WrapCall<State> wrapCallHook ) {
nodeHooks.wrapCalls.add( nodeId, wrapCallHook );
return this;
}
public StateGraph<State> addBeforeCallNodeHook(NodeHook.BeforeCall<State> beforeCallHook ) {
nodeHooks.beforeCalls.add( beforeCallHook );
return this;
}
public StateGraph<State> addBeforeCallNodeHook( String nodeId, NodeHook.BeforeCall<State> beforeCallHook ) {
nodeHooks.beforeCalls.add( nodeId, beforeCallHook );
return this;
}
public StateGraph<State> addAfterCallNodeHook(NodeHook.AfterCall<State> afterCallHook ) {
nodeHooks.afterCalls.add( afterCallHook );
return this;
}
public StateGraph<State> addAfterCallNodeHook( String nodeId, NodeHook.AfterCall<State> afterCallHook ) {
nodeHooks.afterCalls.add( nodeId, afterCallHook );
return this;
}public StateGraph<State> addWrapCallEdgeHook(EdgeHook.WrapCall<State> wrapCallHook ) {
edgeHooks.wrapCalls.add( wrapCallHook );
return this;
}
public StateGraph<State> addWrapCallEdgeHook( String nodeId, EdgeHook.WrapCall<State> wrapCallHook ) {
edgeHooks.wrapCalls.add( nodeId, wrapCallHook );
return this;
}
public StateGraph<State> addBeforeCallEdgeHook(EdgeHook.BeforeCall<State> beforeCallHook ) {
edgeHooks.beforeCalls.add( beforeCallHook );
return this;
}
public StateGraph<State> addBeforeCallEdgeHook( String nodeId, EdgeHook.BeforeCall<State> beforeCallHook ) {
edgeHooks.beforeCalls.add( nodeId, beforeCallHook );
return this;
}
public StateGraph<State> addAfterCallEdgeHook(EdgeHook.AfterCall<State> afterCallHook ) {
edgeHooks.afterCalls.add( afterCallHook );
return this;
}
public StateGraph<State> addAfterCallEdgeHook( String nodeId, EdgeHook.AfterCall<State> afterCallHook ) {
edgeHooks.afterCalls.add( nodeId, afterCallHook );
return this;
}public class Node1Action implements NodeAction<AgentState> {
@Override
public Map<String, Object> apply(AgentState state) throws Exception {
System.out.println("current Node: node-1");
//模拟节点耗时
Thread.sleep(1000);
return Map.of("myData", "node1-my-value",
"node1Key", "node1-value");
}
}public class Node2Action implements NodeAction<AgentState> {
@Override
public Map<String, Object> apply(AgentState state) throws Exception {
System.out.println("current Node: node-2");
//模拟节点耗时
Thread.sleep(2000);
return Map.of("myData", "node2-my-value",
"node2Key", "node2-value");
}
}public class HookSampleApplication {
public static void main(String[] args) throws GraphStateException {
runSequenceGraphWithOnlyStaticEdges();
out.println("\n========== 下面使用带条件边的图,Edge Hook 会执行 ==========");
runGraphWithConditionalEdge();
}
/**
* 纯静态边:只有 Node Hook 会执行,Edge Hook 不会执行
*/
private static void runSequenceGraphWithOnlyStaticEdges() throws GraphStateException {
StateGraph<AgentState> sequenceGraph = getSequenceGraph();
sequenceGraph.addBeforeCallNodeHook((String node, AgentState data, RunnableConfig config) -> {
out.println("Before calling node: " + node + ", data: " + data.data());
return CompletableFuture.completedFuture(data.data());
});
sequenceGraph.addAfterCallNodeHook((String node, AgentState data, RunnableConfig config, Map<String, Object> lastResult) -> {
out.println("After calling node: " + node + ", data: " + data.data() + ", lastResult: " + lastResult);
return CompletableFuture.completedFuture(lastResult);
});
sequenceGraph.addWrapCallNodeHook((String node, AgentState data, RunnableConfig config, AsyncNodeActionWithConfig<AgentState> action) -> {
out.println("Wrap calling node: " + node + ", data: " + data.data());
long start = System.currentTimeMillis();
return action.apply(data, config).whenComplete((result, error) -> {
var ms = System.currentTimeMillis() - start;
out.println(String.format("node '%s' took %d ms", node, ms));
});
});
sequenceGraph.addBeforeCallEdgeHook((String sourceId, AgentState state, RunnableConfig config) -> {
out.println("Before calling edge: " + sourceId);
return CompletableFuture.completedFuture(new Command(state.data()));
});
sequenceGraph.addAfterCallEdgeHook((String sourceId, AgentState state, RunnableConfig config, Command lastResult) -> {
out.println("After calling edge: " + sourceId);
return CompletableFuture.completedFuture(lastResult);
});
sequenceGraph.addWrapCallEdgeHook((String sourceId, AgentState state, RunnableConfig config, AsyncCommandAction<AgentState> action) -> {
out.println("Wrap calling edge: " + sourceId);
long start = System.currentTimeMillis();
return action.apply(state, config).whenComplete((result, error) -> {
var ms = System.currentTimeMillis() - start;
out.println(String.format("source-node '%s' took %d ms", sourceId, ms));
});
});
out.println(sequenceGraph.getGraph(GraphRepresentation.Type.MERMAID, "NodeHook Graph", true).content());
sequenceGraph.compile().invoke(Map.of("test", "test-init-value")).ifPresent(c -> {
System.out.println(c.data());
});
}
/**
* 带条件边:从 node-1 经条件边到 node-2,会触发 Edge Hook
*/
private static void runGraphWithConditionalEdge() throws GraphStateException {
StateGraph<AgentState> graph = getGraphWithConditionalEdge();
graph.addBeforeCallNodeHook((String node, AgentState data, RunnableConfig config) -> {
out.println("Before calling node: " + node + ", data: " + data.data());
return CompletableFuture.completedFuture(data.data());
});
graph.addAfterCallNodeHook((String node, AgentState data, RunnableConfig config, Map<String, Object> lastResult) -> {
out.println("After calling node: " + node + ", data: " + data.data() + ", lastResult: " + lastResult);
return CompletableFuture.completedFuture(lastResult);
});
graph.addWrapCallNodeHook((String node, AgentState data, RunnableConfig config, AsyncNodeActionWithConfig<AgentState> action) -> {
out.println("Wrap calling node: " + node + ", data: " + data.data());
long start = System.currentTimeMillis();
return action.apply(data, config).whenComplete((result, error) -> {
var ms = System.currentTimeMillis() - start;
out.println(String.format("node '%s' took %d ms", node, ms));
});
});
graph.addBeforeCallEdgeHook((String sourceId, AgentState state, RunnableConfig config) -> {
out.println("Before calling edge: " + sourceId);
return CompletableFuture.completedFuture(new Command(state.data()));
});
graph.addAfterCallEdgeHook((String sourceId, AgentState state, RunnableConfig config, Command lastResult) -> {
out.println("After calling edge: " + sourceId);
return CompletableFuture.completedFuture(lastResult);
});
graph.addWrapCallEdgeHook((String sourceId, AgentState state, RunnableConfig config, AsyncCommandAction<AgentState> action) -> {
out.println("Wrap calling edge: " + sourceId);
long start = System.currentTimeMillis();
return action.apply(state, config).whenComplete((result, error) -> {
var ms = System.currentTimeMillis() - start;
out.println(String.format("source-node '%s' took %d ms", sourceId, ms));
});
});
out.println(graph.getGraph(GraphRepresentation.Type.MERMAID, "NodeHook And EdgeHook Graph", true).content());
graph.compile().invoke(Map.of("test", "test-init-value")).ifPresent(c -> System.out.println(c.data()));
}
public static StateGraph<AgentState> getSequenceGraph() throws GraphStateException {
return new StateGraph<>(AgentState::new)
.addNode("node-1", node_async(new Node1Action()))
.addNode("node-2", node_async(new Node2Action()))
.addEdge(GraphDefinition.START, "node-1")
.addEdge("node-1", "node-2")
.addEdge("node-2", GraphDefinition.END);
}
/**
* 含一条条件边:node-1 通过条件边到 node-2,用于演示 Edge Hook 触发
*/
public static StateGraph<AgentState> getGraphWithConditionalEdge() throws GraphStateException {
return new StateGraph<>(AgentState::new)
.addNode("node-1", node_async(new Node1Action()))
.addNode("node-2", node_async(new Node2Action()))
.addEdge(GraphDefinition.START, "node-1")
.addConditionalEdges("node-1", state -> CompletableFuture.completedFuture("toNode2"), Map.of("toNode2", "node-2"))
.addEdge("node-2", GraphDefinition.END);
}
}Before calling node: node-1, data: {test=test-init-value}
Wrap calling node: node-1, data: {test=test-init-value}
current Node: node-1
node 'node-1' took 1002 ms
After calling node: node-1, data: {test=test-init-value}, lastResult: {myData=node1-my-value, node1Key=node1-value}
Before calling node: node-2, data: {node1Key=node1-value, test=test-init-value, myData=node1-my-value}
Wrap calling node: node-2, data: {node1Key=node1-value, test=test-init-value, myData=node1-my-value}
current Node: node-2
node 'node-2' took 1999 ms
After calling node: node-2, data: {node1Key=node1-value, test=test-init-value, myData=node1-my-value}, lastResult: {myData=node2-my-value, node2Key=node2-value}
{node1Key=node1-value, test=test-init-value, node2Key=node2-value, myData=node2-my-value}
========== 下面使用带条件边的图,Edge Hook 会执行 ==========
Before calling node: node-1, data: {test=test-init-value}
Wrap calling node: node-1, data: {test=test-init-value}
current Node: node-1
node 'node-1' took 1001 ms
After calling node: node-1, data: {test=test-init-value}, lastResult: {myData=node1-my-value, node1Key=node1-value}
Before calling edge: node-1
Wrap calling edge: node-1
source-node 'node-1' took 1 ms
After calling edge: node-1
Before calling node: node-2, data: {node1Key=node1-value, test=test-init-value, myData=node1-my-value}
Wrap calling node: node-2, data: {node1Key=node1-value, test=test-init-value, myData=node1-my-value}
current Node: node-2
node 'node-2' took 2000 ms
After calling node: node-2, data: {node1Key=node1-value, test=test-init-value, myData=node1-my-value}, lastResult: {myData=node2-my-value, node2Key=node2-value}
{node1Key=node1-value, test=test-init-value, node2Key=node2-value, myData=node2-my-value}