MyBatis 插件本质上是一种基于 Java 动态代理和责任链模式的拦截器机制,其核心原理是通过拦截器(Interceptor)对 MyBatis 四大核心组件(Executor、StatementHandler、ParameterHandler、ResultSetHandler)的方法调用进行拦截和增强。具体实现需通过 @Intercepts 和 @Signature 注解声明拦截目标,并实现 Interceptor 接口的 intercept 方法。
技术实现要点:
Plugin.wrap() 方法创建代理对象,形成嵌套代理链MappedStatement 获取执行的 SQL 类型、ID 等元数据BoundSql 中的原始 SQL 和参数映射@Intercepts({
@Signature(type = Executor.class, method = "query",
args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})
})
public class VectorSearchInterceptor implements Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
// 拦截逻辑实现
MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
Object parameter = invocation.getArgs()[1];
// 解析并增强原始 SQL
return enhancedQuery(invocation, ms, parameter);
}
}
Milvus 作为专为向量相似度搜索设计的分布式数据库,其架构核心包含以下几个关键组件:
1. 数据模型与存储引擎
2. 索引与搜索算法
3. Java SDK 架构设计

架构设计目标:
核心架构组件:
1. SQL 解析与增强层
vector_search(feature_column, query_vector, top_k))2. 双引擎查询协调器

3. 数据映射与结果合并器
技术实现关键点:
动态 SQL 重写策略
public class VectorSQLRewriter {
public String rewrite(String originalSQL, List<Long> vectorIds) {
// 将 vector_search() 函数替换为标准的 IN 查询
// 示例:WHERE vector_search(features, ?, 10) AND category = 'A'
// 重写为:WHERE id IN (?,?,...) AND category = 'A'
}
}
混合查询优化器
性能考量与优化策略
这种架构设计不仅保持了 MyBatis 开发的简洁性,还充分利用了 Milvus 的高性能向量搜索能力,为传统关系型数据库注入了 AI 原生能力,实现了从"数据查询"到"智能检索"的范式转变。
在深入代码实现之前,我们需要构建完整的插件架构。该架构围绕 VectorSearchInterceptor 核心拦截器展开,通过多个协作类完成从 SQL 拦截到向量搜索再到结果映射的完整流程。

VectorSearchInterceptor 是整个插件的入口点,负责拦截 MyBatis 的查询执行流程。
@Intercepts({
@Signature(type = Executor.class, method = "query",
args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),
@Signature(type = Executor.class, method = "query",
args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class})
})
@Component
@Slf4j
public class VectorSearchInterceptor implements Interceptor {
@Autowired
private SqlParser sqlParser;
@Autowired
private MilvusService milvusService;
@Autowired
private ResultMerger resultMerger;
privatefinal ThreadLocal<Boolean> inProcessing = ThreadLocal.withInitial(() -> false);
@Override
public Object intercept(Invocation invocation) throws Throwable {
// 防止递归拦截
if (inProcessing.get()) {
return invocation.proceed();
}
try {
inProcessing.set(true);
MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
Object parameter = invocation.getArgs()[1];
// 获取原始SQL
BoundSql boundSql = ms.getBoundSql(parameter);
String originalSql = boundSql.getSql();
// 解析SQL,检测是否包含向量搜索函数
ParsedSql parsedSql = sqlParser.parse(originalSql);
if (!parsedSql.hasVectorFunctions()) {
return invocation.proceed();
}
log.debug("检测到向量搜索SQL: {}", originalSql);
return processVectorSearch(invocation, ms, parameter, parsedSql);
} finally {
inProcessing.set(false);
}
}
private Object processVectorSearch(Invocation invocation, MappedStatement ms,
Object parameter, ParsedSql parsedSql) throws Exception {
// 提取向量函数信息
List<VectorFunction> vectorFunctions = parsedSql.getVectorFunctions();
// 执行向量搜索获取ID列表
List<Long> vectorIds = executeVectorSearch(vectorFunctions, parameter, boundSql);
// 重写SQL,将向量函数替换为IN查询
String rewrittenSql = sqlParser.rewriteSql(parsedSql, vectorIds);
// 创建新的BoundSql执行关系查询
BoundSql newBoundSql = createNewBoundSql(ms, boundSql, rewrittenSql, parameter);
// 替换invocation参数
Object[] args = invocation.getArgs();
args[0] = copyMappedStatement(ms, newBoundSql);
// 执行重写后的SQL
List<Map<String, Object>> relationalResults = (List<Map<String, Object>>) invocation.proceed();
// 合并向量搜索结果和关系查询结果
return resultMerger.mergeResults(vectorSearchResults, relationalResults);
}
}
SqlParser 负责解析原始 SQL,识别其中的向量搜索函数,并在获取向量搜索结果后重写 SQL。
@Component
publicclass SqlParser {
privatefinal CCJSqlParserManager parser = new CCJSqlParserManager();
public ParsedSql parse(String sql) throws JSQLParserException {
Statement statement = parser.parse(new StringReader(sql));
if (!(statement instanceof Select)) {
thrownew IllegalArgumentException("仅支持SELECT语句的向量搜索");
}
Select select = (Select) statement;
ParsedSql parsedSql = new ParsedSql();
parsedSql.setOriginalSql(sql);
parsedSql.setSelect(select);
// 提取向量函数
List<VectorFunction> vectorFunctions = extractVectorFunctions(select);
parsedSql.setVectorFunctions(vectorFunctions);
return parsedSql;
}
private List<VectorFunction> extractVectorFunctions(Select select) {
List<VectorFunction> functions = new ArrayList<>();
SelectBody selectBody = select.getSelectBody();
if (selectBody instanceof PlainSelect) {
PlainSelect plainSelect = (PlainSelect) selectBody;
WhereClauseVisitor visitor = new WhereClauseVisitor();
plainSelect.getWhere().accept(visitor);
functions = visitor.getVectorFunctions();
}
return functions;
}
public String rewriteSql(ParsedSql parsedSql, List<Long> vectorIds) {
Select select = parsedSql.getSelect();
SqlRewriteVisitor rewriteVisitor = new SqlRewriteVisitor(vectorIds);
select.accept(rewriteVisitor);
return select.toString();
}
// 自定义Visitor用于识别WHERE子句中的向量函数
privatestaticclass WhereClauseVisitor extends ExpressionVisitorAdapter {
privatefinal List<VectorFunction> vectorFunctions = new ArrayList<>();
@Override
public void visit(Function function) {
if ("vector_search".equalsIgnoreCase(function.getName())) {
VectorFunction vf = parseVectorFunction(function);
vectorFunctions.add(vf);
}
}
private VectorFunction parseVectorFunction(Function function) {
VectorFunction vf = new VectorFunction();
List<Expression> parameters = function.getParameters().getExpressions();
// 解析参数:column_name, query_vector, top_k, [metric_type]
if (parameters.size() >= 3) {
vf.setColumnName(extractColumnName(parameters.get(0)));
vf.setParameterName(extractParameterName(parameters.get(1)));
vf.setTopK(extractTopK(parameters.get(2)));
if (parameters.size() > 3) {
vf.setMetricType(extractMetricType(parameters.get(3)));
}
}
return vf;
}
}
}
MilvusService 封装了与 Milvus Java SDK 的交互,负责构建搜索参数并执行向量查询。
@Service
@Slf4j
publicclass MilvusService {
@Value("${milvus.collection.name:default_collection}")
private String collectionName;
@Autowired
private MilvusServiceClient milvusClient;
public List<Long> search(SearchParam searchParam) {
try {
R<SearchResults> response = milvusClient.search(searchParam);
if (response.getStatus() != R.Status.Success.getCode()) {
thrownew RuntimeException("Milvus搜索失败: " + response.getMessage());
}
SearchResults results = response.getData();
return extractEntityIds(results);
} catch (Exception e) {
log.error("Milvus向量搜索异常", e);
thrownew RuntimeException("向量搜索执行失败", e);
}
}
public SearchParam buildSearchParam(VectorFunction vectorFunction,
Object parameterObject,
Map<String, Object> additionalParams) {
List<?> queryVectors = resolveQueryVector(vectorFunction, parameterObject, additionalParams);
String expr = buildFilterExpression(additionalParams);
List<String> outputFields = Arrays.asList("id", vectorFunction.getColumnName());
SearchParam searchParam = SearchParam.newBuilder()
.withCollectionName(collectionName)
.withMetricType(vectorFunction.getMetricType())
.withTopK(vectorFunction.getTopK())
.withVectors(queryVectors)
.withVectorFieldName(vectorFunction.getColumnName())
.withExpr(expr)
.withOutFields(outputFields)
.withParams(buildSearchParams(vectorFunction))
.build();
return searchParam;
}
private List<?> resolveQueryVector(VectorFunction vectorFunction,
Object parameterObject,
Map<String, Object> additionalParams) {
// 从MyBatis参数中解析查询向量
// 支持多种参数类型:Map、实体对象、@Param注解参数等
String paramName = vectorFunction.getParameterName();
Object vectorValue = ParameterResolver.resolve(parameterObject, paramName);
if (vectorValue == null) {
thrownew IllegalArgumentException("未找到向量参数: " + paramName);
}
return convertToVectorList(vectorValue);
}
private List<Long> extractEntityIds(SearchResults results) {
List<Long> ids = new ArrayList<>();
for (QueryResultsWrapper wrapper : results) {
List<Long> chunkIds = wrapper.getFieldData("id", 0).get(Long.class);
ids.addAll(chunkIds);
}
return ids;
}
}
ResultMerger 负责将向量搜索结果与关系查询结果进行智能合并,确保数据一致性。
@Service
publicclass ResultMerger {
public List<Map<String, Object>> mergeResults(List<SearchResults> vectorResults,
List<Map<String, Object>> relationalResults) {
if (vectorResults == null || vectorResults.isEmpty()) {
return relationalResults;
}
// 构建向量结果的ID到完整数据的映射
Map<Long, Map<String, Object>> vectorResultMap = buildVectorResultMap(vectorResults);
// 按照ID关联合并结果
List<Map<String, Object>> mergedResults = new ArrayList<>();
for (Map<String, Object> relationalRow : relationalResults) {
Long entityId = (Long) relationalRow.get("id");
Map<String, Object> vectorData = vectorResultMap.get(entityId);
if (vectorData != null) {
Map<String, Object> mergedRow = new LinkedHashMap<>();
// 优先使用关系查询的结果,补充向量数据
mergedRow.putAll(relationalRow);
mergedRow.putAll(vectorData);
mergedResults.add(mergedRow);
}
}
// 根据向量相似度排序
return sortBySimilarity(mergedResults);
}
private Map<Long, Map<String, Object>> buildVectorResultMap(List<SearchResults> vectorResults) {
Map<Long, Map<String, Object>> resultMap = new HashMap<>();
for (SearchResults results : vectorResults) {
for (QueryResultsWrapper wrapper : results) {
List<Long> ids = wrapper.getFieldData("id", 0).get(Long.class);
List<Float> distances = wrapper.getScores();
for (int i = 0; i < ids.size(); i++) {
Map<String, Object> vectorData = new HashMap<>();
vectorData.put("id", ids.get(i));
vectorData.put("similarity_score", 1 - distances.get(i)); // 转换为相似度分数
resultMap.put(ids.get(i), vectorData);
}
}
}
return resultMap;
}
private List<Map<String, Object>> sortBySimilarity(List<Map<String, Object>> results) {
return results.stream()
.sorted((r1, r2) -> {
Double score1 = (Double) r1.getOrDefault("similarity_score", 0.0);
Double score2 = (Double) r2.getOrDefault("similarity_score", 0.0);
return score2.compareTo(score1); // 降序排列
})
.collect(Collectors.toList());
}
}
ParameterResolver 负责处理 MyBatis 的各种参数类型,确保正确提取向量参数。
public class ParameterResolver {
public static Object resolve(Object parameterObject, String paramName) {
if (parameterObject == null) {
returnnull;
}
if (parameterObject instanceof Map) {
// 处理@Param注解或Map参数
return ((Map<?, ?>) parameterObject).get(paramName);
} elseif (isPrimitiveOrWrapper(parameterObject.getClass())) {
// 处理基本类型参数
return parameterObject;
} else {
// 处理实体对象参数
return resolveFromEntity(parameterObject, paramName);
}
}
private static Object resolveFromEntity(Object entity, String fieldName) {
try {
Field field = entity.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(entity);
} catch (Exception e) {
// 尝试通过getter方法获取
String getterName = "get" + capitalize(fieldName);
try {
Method getter = entity.getClass().getMethod(getterName);
return getter.invoke(entity);
} catch (Exception ex) {
thrownew IllegalArgumentException("无法解析参数: " + fieldName, ex);
}
}
}
privatestatic List<?> convertToVectorList(Object vectorValue) {
if (vectorValue instanceof List) {
return (List<?>) vectorValue;
} elseif (vectorValue instanceoffloat[]) {
return Arrays.stream((float[]) vectorValue)
.boxed()
.collect(Collectors.toList());
} elseif (vectorValue instanceofdouble[]) {
return Arrays.stream((double[]) vectorValue)
.boxed()
.collect(Collectors.toList());
} else {
thrownew IllegalArgumentException("不支持的向量数据类型: " + vectorValue.getClass());
}
}
}
最后,我们需要在 MyBatis 配置中注册插件,并配置相关的 Bean。
@Configuration
@AutoConfigureAfter(MybatisAutoConfiguration.class)
public class VectorSearchConfig {
@Bean
public VectorSearchInterceptor vectorSearchInterceptor() {
returnnew VectorSearchInterceptor();
}
@Bean
@ConditionalOnMissingBean
public MilvusServiceClient milvusServiceClient(
@Value("${milvus.host:localhost}") String host,
@Value("${milvus.port:19530}") int port) {
ConnectParam connectParam = ConnectParam.newBuilder()
.withHost(host)
.withPort(port)
.build();
returnnew MilvusServiceClient(connectParam);
}
}
// 在MyBatis配置中注册拦截器
@Bean
public ConfigurationCustomizer configurationCustomizer() {
return configuration -> {
configuration.addInterceptor(vectorSearchInterceptor());
};
}
整个插件的执行流程可以通过以下序列图清晰展示:

通过上述完整实现,我们构建了一个功能完备的 MyBatis 插件,能够无缝集成 SQL 查询与 Milvus 向量搜索。该实现不仅保持了 MyBatis 的原生开发体验,还提供了强大的向量搜索能力,为传统应用注入了 AI 原生能力。
在基础实现之上,我们引入了强大的动态 SQL 扩展机制,使得向量搜索能够与 MyBatis 的动态 SQL 标签无缝协作。这种扩展不仅支持传统的 <if>、<choose> 等标签,还专门针对向量搜索场景设计了智能优化策略。
动态向量参数解析器
@Component
publicclass DynamicVectorParameterResolver {
public List<VectorSearchCondition> resolveDynamicConditions(
MappedStatement ms, Object parameter, BoundSql boundSql) {
List<VectorSearchCondition> conditions = new ArrayList<>();
SqlNode rootSqlNode = getSqlSourceRoot(ms);
// 深度优先遍历动态SQL节点树
traverseSqlNode(rootSqlNode, parameter, conditions, new HashMap<>());
return optimizeConditions(conditions);
}
private void traverseSqlNode(SqlNode node, Object parameter,
List<VectorSearchCondition> conditions,
Map<String, Object> context) {
if (node instanceof IfSqlNode) {
processIfNode((IfSqlNode) node, parameter, conditions, context);
} elseif (node instanceof ChooseSqlNode) {
processChooseNode((ChooseSqlNode) node, parameter, conditions, context);
} elseif (node instanceof ForEachSqlNode) {
processForEachNode((ForEachSqlNode) node, parameter, conditions, context);
} elseif (node instanceof MixedSqlNode) {
processMixedNode((MixedSqlNode) node, parameter, conditions, context);
}
}
private void processIfNode(IfSqlNode ifNode, Object parameter,
List<VectorSearchCondition> conditions,
Map<String, Object> context) {
try {
OgnlClassResolver resolver = new OgnlClassResolver();
OgnlContext ognlContext = (OgnlContext) Ognl.createDefaultContext(parameter, resolver);
Boolean testResult = (Boolean) Ognl.getValue(ifNode.getTest(), ognlContext, parameter);
if (Boolean.TRUE.equals(testResult)) {
traverseSqlNode(ifNode.getContents(), parameter, conditions, context);
}
} catch (OgnlException e) {
thrownew RuntimeException("动态SQL条件解析失败", e);
}
}
}
智能条件优化器
动态 SQL 扩展的核心在于条件优化,系统能够根据查询条件的组合自动选择最优执行路径:
public class VectorConditionOptimizer {
public ExecutionPlan optimizeExecution(List<VectorSearchCondition> conditions,
QueryStatistics statistics) {
ExecutionPlan plan = new ExecutionPlan();
// 分析条件类型分布
ConditionAnalysis analysis = analyzeConditions(conditions);
if (analysis.getVectorRatio() > 0.7) {
// 向量条件占主导,优先执行向量搜索
plan.setPrimaryExecutor(ExecutorType.VECTOR);
plan.setSecondaryExecutor(ExecutorType.RELATIONAL);
} elseif (analysis.getScalarRatio() > 0.7) {
// 标量条件占主导,优先执行关系查询
plan.setPrimaryExecutor(ExecutorType.RELATIONAL);
plan.setSecondaryExecutor(ExecutorType.VECTOR);
} else {
// 混合条件,采用并行执行策略
plan.setPrimaryExecutor(ExecutorType.PARALLEL);
}
// 基于历史统计信息调整执行策略
if (statistics != null) {
adjustPlanByStatistics(plan, statistics);
}
return plan;
}
private ConditionAnalysis analyzeConditions(List<VectorSearchCondition> conditions) {
ConditionAnalysis analysis = new ConditionAnalysis();
for (VectorSearchCondition condition : conditions) {
if (condition.isVectorCondition()) {
analysis.incrementVectorCount();
} elseif (condition.isScalarCondition()) {
analysis.incrementScalarCount();
}
// 分析条件选择性
analysis.analyzeSelectivity(condition);
}
return analysis;
}
}
现代向量搜索场景往往需要在同一系统中处理多种类型的向量数据,我们的插件支持基于多向量索引的智能路由机制。
索引元数据管理
@Service
publicclass VectorIndexRegistry {
privatefinal Map<String, IndexMetadata> indexMetadataMap = new ConcurrentHashMap<>();
privatefinal IndexRouter indexRouter;
@PostConstruct
public void initialize() {
refreshIndexMetadata();
startMetadataRefreshTask();
}
public String routeToIndex(VectorSearchContext context) {
return indexRouter.route(context);
}
public IndexMetadata getIndexMetadata(String indexName) {
return indexMetadataMap.get(indexName);
}
private void refreshIndexMetadata() {
ListCollectionsParam param = ListCollectionsParam.newBuilder().build();
R<List<String>> response = milvusClient.listCollections(param);
if (response.getStatus() == R.Status.Success.getCode()) {
for (String collectionName : response.getData()) {
loadCollectionMetadata(collectionName);
}
}
}
private void loadCollectionMetadata(String collectionName) {
DescribeCollectionParam param = DescribeCollectionParam.newBuilder()
.withCollectionName(collectionName)
.build();
R<DescribeCollectionResponse> response = milvusClient.describeCollection(param);
if (response.getStatus() == R.Status.Success.getCode()) {
IndexMetadata metadata = buildIndexMetadata(response.getData());
indexMetadataMap.put(collectionName, metadata);
}
}
}
基于规则的智能路由

动态索引选择算法
public class AdaptiveIndexSelector {
public SelectedIndex selectOptimalIndex(VectorSearchRequest request,
List<IndexMetadata> candidates) {
List<IndexScore> scores = candidates.stream()
.map(index -> calculateIndexScore(index, request))
.sorted(Comparator.comparing(IndexScore::getScore).reversed())
.collect(Collectors.toList());
return scores.get(0).getIndex();
}
private IndexScore calculateIndexScore(IndexMetadata index, VectorSearchRequest request) {
double score = 0.0;
// 维度匹配度 (权重: 0.3)
score += 0.3 * calculateDimensionScore(index, request);
// 度量类型匹配度 (权重: 0.25)
score += 0.25 * calculateMetricScore(index, request);
// 性能预测分数 (权重: 0.25)
score += 0.25 * calculatePerformanceScore(index, request);
// 负载均衡分数 (权重: 0.2)
score += 0.2 * calculateLoadScore(index);
returnnew IndexScore(index, score);
}
private double calculateDimensionScore(IndexMetadata index, VectorSearchRequest request) {
int indexDim = index.getDimension();
int requestDim = request.getVectorDimension();
if (indexDim == requestDim) {
return1.0;
} else {
return1.0 - (Math.abs(indexDim - requestDim) / (double) Math.max(indexDim, requestDim));
}
}
}
在分布式环境中,健壮的错误处理是保证系统可靠性的关键。我们实现了分层的错误处理策略。
错误分类与处理策略
public enum ErrorCategory {
NETWORK_ERROR(1, "网络通信错误", RetryStrategy.EXPONENTIAL_BACKOFF),
TIMEOUT_ERROR(2, "请求超时错误", RetryStrategy.FIXED_DELAY),
RATE_LIMIT_ERROR(3, "限流错误", RetryStrategy.EXPONENTIAL_BACKOFF),
DATA_ERROR(4, "数据格式错误", RetryStrategy.NO_RETRY),
SERVER_ERROR(5, "服务器内部错误", RetryStrategy.FIXED_DELAY),
UNKNOWN_ERROR(6, "未知错误", RetryStrategy.NO_RETRY);
privatefinalint code;
privatefinal String description;
privatefinal RetryStrategy defaultStrategy;
}
@Component
publicclass ErrorHandler {
privatefinal Map<ErrorCategory, ErrorHandlerStrategy> strategies;
privatefinal CircuitBreaker circuitBreaker;
public <T> T executeWithRetry(Supplier<T> operation, String operationName) {
int attempt = 0;
Exception lastException = null;
while (attempt < getMaxRetries()) {
try {
// 检查熔断器状态
if (!circuitBreaker.allowRequest()) {
thrownew CircuitBreakerOpenException("熔断器已开启");
}
T result = operation.get();
circuitBreaker.recordSuccess();
return result;
} catch (Exception e) {
lastException = e;
attempt++;
ErrorCategory category = classifyError(e);
if (!shouldRetry(category, attempt)) {
break;
}
circuitBreaker.recordFailure();
waitBeforeRetry(category, attempt);
}
}
thrownew OperationFailedException("操作失败 after " + attempt + " 次重试", lastException);
}
private ErrorCategory classifyError(Exception e) {
if (e instanceof ConnectException || e instanceof SocketTimeoutException) {
return ErrorCategory.NETWORK_ERROR;
} elseif (e instanceof TimeoutException) {
return ErrorCategory.TIMEOUT_ERROR;
} elseif (e instanceof RpcException) {
RpcException rpcException = (RpcException) e;
if (rpcException.getStatus().getCode() == Status.Code.RESOURCE_EXHAUSTED) {
return ErrorCategory.RATE_LIMIT_ERROR;
}
}
return ErrorCategory.UNKNOWN_ERROR;
}
}
熔断器状态管理

在混合查询场景中,保证向量搜索与关系查询的事务一致性是极具挑战性的任务。我们通过多阶段的协调机制来实现最终一致性。
混合事务协调器
@Component
publicclass HybridTransactionCoordinator {
privatefinal VectorTransactionManager vectorTM;
privatefinal RelationalTransactionManager relationalTM;
privatefinal TransactionLogStore logStore;
public <T> T executeInHybridTransaction(TransactionCallback<T> callback) {
String transactionId = generateTransactionId();
TransactionContext context = new TransactionContext(transactionId);
try {
// 阶段1: 预执行检查
preExecuteCheck(context);
// 阶段2: 并行执行
CompletableFuture<VectorTransaction> vectorFuture =
CompletableFuture.supplyAsync(() -> vectorTM.beginTransaction(context));
CompletableFuture<RelationalTransaction> relationalFuture =
CompletableFuture.supplyAsync(() -> relationalTM.beginTransaction(context));
CompletableFuture.allOf(vectorFuture, relationalFuture).join();
// 阶段3: 业务逻辑执行
T result = callback.doInTransaction(context);
// 阶段4: 两阶段提交
if (shouldCommit(context)) {
commitTransaction(vectorFuture.get(), relationalFuture.get(), context);
} else {
rollbackTransaction(vectorFuture.get(), relationalFuture.get(), context);
}
return result;
} catch (Exception e) {
// 阶段5: 异常处理与恢复
handleTransactionException(context, e);
thrownew TransactionException("混合事务执行失败", e);
}
}
private void commitTransaction(VectorTransaction vt,
RelationalTransaction rt,
TransactionContext context) {
// 阶段4.1: 预提交
vectorTM.prepareCommit(vt);
relationalTM.prepareCommit(rt);
// 阶段4.2: 提交
try {
vectorTM.commit(vt);
relationalTM.commit(rt);
logStore.logTransactionSuccess(context.getTransactionId());
} catch (Exception e) {
// 阶段4.3: 提交失败处理
handleCommitFailure(vt, rt, context, e);
}
}
}
一致性状态追踪
public class ConsistencyTracker {
privatefinal Map<String, QueryConsistency> consistencyMap = new ConcurrentHashMap<>();
privatefinal ConsistencyVerifier verifier;
public void trackQueryExecution(String queryId, List<ExecutionStep> steps) {
QueryConsistency consistency = new QueryConsistency(queryId);
for (ExecutionStep step : steps) {
consistency.addStep(step);
// 实时验证一致性
if (needsImmediateVerification(step)) {
VerificationResult result = verifier.verifyStepConsistency(step);
if (!result.isConsistent()) {
triggerConsistencyRepair(consistency, step, result);
}
}
}
consistencyMap.put(queryId, consistency);
}
public void verifyFinalConsistency(String queryId) {
QueryConsistency consistency = consistencyMap.get(queryId);
if (consistency == null) return;
CompletableFuture<VerificationResult> vectorFuture =
CompletableFuture.supplyAsync(() ->
verifier.verifyVectorResults(consistency));
CompletableFuture<VerificationResult> relationalFuture =
CompletableFuture.supplyAsync(() ->
verifier.verifyRelationalResults(consistency));
try {
CompletableFuture.allOf(vectorFuture, relationalFuture).get(30, TimeUnit.SECONDS);
VerificationResult vectorResult = vectorFuture.get();
VerificationResult relationalResult = relationalFuture.get();
if (!vectorResult.isConsistent() || !relationalResult.isConsistent()) {
scheduleConsistencyRepair(consistency, vectorResult, relationalResult);
}
} catch (TimeoutException e) {
log.warn("一致性验证超时,将异步处理: {}", queryId);
scheduleAsyncVerification(queryId);
} catch (Exception e) {
log.error("一致性验证异常: {}", queryId, e);
}
}
}
为了确保系统长期稳定运行,我们实现了全面的性能监控和自适应优化机制。
运行时指标收集
@Component
publicclass PerformanceMonitor {
privatefinal MetricsRegistry registry;
privatefinal List<PerformanceAlert> alerts;
@EventListener
public void onQueryExecution(QueryExecutionEvent event) {
// 收集执行时间指标
registry.timer("query.execution.time")
.record(event.getExecutionTime(), TimeUnit.MILLISECONDS);
// 收集资源使用指标
registry.gauge("query.memory.usage", event.getMemoryUsage());
registry.gauge("query.cpu.usage", event.getCpuUsage());
// 检查性能异常
checkPerformanceAnomalies(event);
}
private void checkPerformanceAnomalies(QueryExecutionEvent event) {
for (PerformanceAlert alert : alerts) {
if (alert.shouldTrigger(event)) {
handlePerformanceAlert(alert, event);
// 触发自适应优化
if (alert.requiresOptimization()) {
adaptiveOptimizer.optimize(alert.getOptimizationContext());
}
}
}
}
}
@Component
publicclass AdaptiveOptimizer {
public void optimize(OptimizationContext context) {
// 基于历史数据的学习优化
OptimizationPlan plan = learningOptimizer.generatePlan(context);
// 执行优化措施
executeOptimizationPlan(plan);
// 验证优化效果
validateOptimizationEffect(plan);
}
private void executeOptimizationPlan(OptimizationPlan plan) {
for (OptimizationAction action : plan.getActions()) {
switch (action.getType()) {
case INDEX_REBUILD:
rebuildIndex(action.getParameters());
break;
case CACHE_ADJUSTMENT:
adjustCacheStrategy(action.getParameters());
break;
case QUERY_REWRITE:
updateQueryRewriteRules(action.getParameters());
break;
case RESOURCE_REALLOCATION:
reallocateResources(action.getParameters());
break;
}
}
}
}
自适应优化决策流程

通过这些特性的实现,系统不仅具备了强大的功能扩展性,还在可靠性、性能和一致性方面达到了生产级的要求。这些特性使得系统能够适应复杂的业务场景。