首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >自定义 MyBatis 插件,让 SQL 查询与 Milvus Java SDK 完美融合

自定义 MyBatis 插件,让 SQL 查询与 Milvus Java SDK 完美融合

作者头像
javpower
发布2025-11-17 19:25:13
发布2025-11-17 19:25:13
880
举报

自定义 MyBatis 插件,让 SQL 查询与 Milvus Java SDK 完美融合

概念与架构

MyBatis 插件机制深度解析

MyBatis 插件本质上是一种基于 Java 动态代理和责任链模式的拦截器机制,其核心原理是通过拦截器(Interceptor)对 MyBatis 四大核心组件(Executor、StatementHandler、ParameterHandler、ResultSetHandler)的方法调用进行拦截和增强。具体实现需通过 @Intercepts@Signature 注解声明拦截目标,并实现 Interceptor 接口的 intercept 方法。

技术实现要点:

  • 拦截时机:可在 SQL 执行前(参数处理)、执行中(语句执行)或执行后(结果集映射)插入自定义逻辑
  • 代理链构建:通过 Plugin.wrap() 方法创建代理对象,形成嵌套代理链
  • 元数据访问:通过 MappedStatement 获取执行的 SQL 类型、ID 等元数据
  • 参数修改:可直接修改 BoundSql 中的原始 SQL 和参数映射
代码语言:javascript
复制
@Intercepts({
    @Signature(type = Executor.class, method = "query", 
               args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})
})
public class VectorSearchInterceptor implements Interceptor {
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        // 拦截逻辑实现
        MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
        Object parameter = invocation.getArgs()[1];
        // 解析并增强原始 SQL
        return enhancedQuery(invocation, ms, parameter);
    }
}
Milvus 向量数据库核心技术剖析

Milvus 作为专为向量相似度搜索设计的分布式数据库,其架构核心包含以下几个关键组件:

1. 数据模型与存储引擎

  • 集合(Collection):类比关系数据库的表,包含向量字段和标量字段
  • 分区(Partition):支持数据物理分片,提升查询并行度
  • 段(Segment):数据持久化的最小单元,支持自动合并与压缩

2. 索引与搜索算法

  • 近似最近邻搜索(ANNS):支持 HNSW、IVF_FLAT、IVF_PQ 等多种索引类型
  • 度量方式:支持 L2 距离、内积、杰卡德相似度等多种相似度计算
  • 混合查询:支持向量相似度过滤与标量属性过滤的组合查询

3. Java SDK 架构设计

融合架构设计:SQL 与向量搜索的无缝集成

架构设计目标:

  • 保持 MyBatis 原生开发体验,无需修改现有 DAO 层代码
  • 实现传统关系查询与向量相似度搜索的透明融合
  • 支持动态切换查询模式(纯 SQL/混合查询/纯向量搜索)

核心架构组件:

1. SQL 解析与增强层

  • 基于 JSqlParser 解析原始 SQL 的抽象语法树(AST)
  • 识别自定义的向量搜索函数(如 vector_search(feature_column, query_vector, top_k)
  • 将向量函数转换为 Milvus SearchRequest 的构建参数

2. 双引擎查询协调器

3. 数据映射与结果合并器

  • 向量搜索结果与关系查询结果的自动关联
  • 支持分页、排序等标准 SQL 特性的统一处理
  • 维护查询上下文,确保事务一致性

技术实现关键点:

动态 SQL 重写策略

代码语言:javascript
复制
public class VectorSQLRewriter {
    public String rewrite(String originalSQL, List<Long> vectorIds) {
        // 将 vector_search() 函数替换为标准的 IN 查询
        // 示例:WHERE vector_search(features, ?, 10) AND category = 'A'
        // 重写为:WHERE id IN (?,?,...) AND category = 'A'
    }
}

混合查询优化器

  • 根据查询条件自动选择最优执行路径
  • 支持向量搜索结果的二次过滤(标量条件)
  • 实现查询结果的智能缓存机制

性能考量与优化策略

  • 向量搜索与关系查询的并行执行
  • 连接池管理与资源复用
  • 批量操作的支持与优化

这种架构设计不仅保持了 MyBatis 开发的简洁性,还充分利用了 Milvus 的高性能向量搜索能力,为传统关系型数据库注入了 AI 原生能力,实现了从"数据查询"到"智能检索"的范式转变。

实现

插件核心实现架构

在深入代码实现之前,我们需要构建完整的插件架构。该架构围绕 VectorSearchInterceptor 核心拦截器展开,通过多个协作类完成从 SQL 拦截到向量搜索再到结果映射的完整流程。

拦截器核心实现

VectorSearchInterceptor 是整个插件的入口点,负责拦截 MyBatis 的查询执行流程。

代码语言:javascript
复制
@Intercepts({
    @Signature(type = Executor.class, method = "query", 
               args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),
    @Signature(type = Executor.class, method = "query", 
               args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class})
})
@Component
@Slf4j
public class VectorSearchInterceptor implements Interceptor {
    
    @Autowired
    private SqlParser sqlParser;
    
    @Autowired
    private MilvusService milvusService;
    
    @Autowired
    private ResultMerger resultMerger;
    
    privatefinal ThreadLocal<Boolean> inProcessing = ThreadLocal.withInitial(() -> false);
    
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        // 防止递归拦截
        if (inProcessing.get()) {
            return invocation.proceed();
        }
        
        try {
            inProcessing.set(true);
            MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
            Object parameter = invocation.getArgs()[1];
            
            // 获取原始SQL
            BoundSql boundSql = ms.getBoundSql(parameter);
            String originalSql = boundSql.getSql();
            
            // 解析SQL,检测是否包含向量搜索函数
            ParsedSql parsedSql = sqlParser.parse(originalSql);
            if (!parsedSql.hasVectorFunctions()) {
                return invocation.proceed();
            }
            
            log.debug("检测到向量搜索SQL: {}", originalSql);
            return processVectorSearch(invocation, ms, parameter, parsedSql);
            
        } finally {
            inProcessing.set(false);
        }
    }
    
    private Object processVectorSearch(Invocation invocation, MappedStatement ms, 
                                     Object parameter, ParsedSql parsedSql) throws Exception {
        // 提取向量函数信息
        List<VectorFunction> vectorFunctions = parsedSql.getVectorFunctions();
        
        // 执行向量搜索获取ID列表
        List<Long> vectorIds = executeVectorSearch(vectorFunctions, parameter, boundSql);
        
        // 重写SQL,将向量函数替换为IN查询
        String rewrittenSql = sqlParser.rewriteSql(parsedSql, vectorIds);
        
        // 创建新的BoundSql执行关系查询
        BoundSql newBoundSql = createNewBoundSql(ms, boundSql, rewrittenSql, parameter);
        
        // 替换invocation参数
        Object[] args = invocation.getArgs();
        args[0] = copyMappedStatement(ms, newBoundSql);
        
        // 执行重写后的SQL
        List<Map<String, Object>> relationalResults = (List<Map<String, Object>>) invocation.proceed();
        
        // 合并向量搜索结果和关系查询结果
        return resultMerger.mergeResults(vectorSearchResults, relationalResults);
    }
}
SQL 解析与向量函数识别

SqlParser 负责解析原始 SQL,识别其中的向量搜索函数,并在获取向量搜索结果后重写 SQL。

代码语言:javascript
复制
@Component
publicclass SqlParser {
    
    privatefinal CCJSqlParserManager parser = new CCJSqlParserManager();
    
    public ParsedSql parse(String sql) throws JSQLParserException {
        Statement statement = parser.parse(new StringReader(sql));
        
        if (!(statement instanceof Select)) {
            thrownew IllegalArgumentException("仅支持SELECT语句的向量搜索");
        }
        
        Select select = (Select) statement;
        ParsedSql parsedSql = new ParsedSql();
        parsedSql.setOriginalSql(sql);
        parsedSql.setSelect(select);
        
        // 提取向量函数
        List<VectorFunction> vectorFunctions = extractVectorFunctions(select);
        parsedSql.setVectorFunctions(vectorFunctions);
        
        return parsedSql;
    }
    
    private List<VectorFunction> extractVectorFunctions(Select select) {
        List<VectorFunction> functions = new ArrayList<>();
        SelectBody selectBody = select.getSelectBody();
        
        if (selectBody instanceof PlainSelect) {
            PlainSelect plainSelect = (PlainSelect) selectBody;
            WhereClauseVisitor visitor = new WhereClauseVisitor();
            plainSelect.getWhere().accept(visitor);
            functions = visitor.getVectorFunctions();
        }
        
        return functions;
    }
    
    public String rewriteSql(ParsedSql parsedSql, List<Long> vectorIds) {
        Select select = parsedSql.getSelect();
        SqlRewriteVisitor rewriteVisitor = new SqlRewriteVisitor(vectorIds);
        select.accept(rewriteVisitor);
        return select.toString();
    }
    
    // 自定义Visitor用于识别WHERE子句中的向量函数
    privatestaticclass WhereClauseVisitor extends ExpressionVisitorAdapter {
        privatefinal List<VectorFunction> vectorFunctions = new ArrayList<>();
        
        @Override
        public void visit(Function function) {
            if ("vector_search".equalsIgnoreCase(function.getName())) {
                VectorFunction vf = parseVectorFunction(function);
                vectorFunctions.add(vf);
            }
        }
        
        private VectorFunction parseVectorFunction(Function function) {
            VectorFunction vf = new VectorFunction();
            List<Expression> parameters = function.getParameters().getExpressions();
            
            // 解析参数:column_name, query_vector, top_k, [metric_type]
            if (parameters.size() >= 3) {
                vf.setColumnName(extractColumnName(parameters.get(0)));
                vf.setParameterName(extractParameterName(parameters.get(1)));
                vf.setTopK(extractTopK(parameters.get(2)));
                
                if (parameters.size() > 3) {
                    vf.setMetricType(extractMetricType(parameters.get(3)));
                }
            }
            
            return vf;
        }
    }
}
Milvus 服务集成层

MilvusService 封装了与 Milvus Java SDK 的交互,负责构建搜索参数并执行向量查询。

代码语言:javascript
复制
@Service
@Slf4j
publicclass MilvusService {
    
    @Value("${milvus.collection.name:default_collection}")
    private String collectionName;
    
    @Autowired
    private MilvusServiceClient milvusClient;
    
    public List<Long> search(SearchParam searchParam) {
        try {
            R<SearchResults> response = milvusClient.search(searchParam);
            
            if (response.getStatus() != R.Status.Success.getCode()) {
                thrownew RuntimeException("Milvus搜索失败: " + response.getMessage());
            }
            
            SearchResults results = response.getData();
            return extractEntityIds(results);
            
        } catch (Exception e) {
            log.error("Milvus向量搜索异常", e);
            thrownew RuntimeException("向量搜索执行失败", e);
        }
    }
    
    public SearchParam buildSearchParam(VectorFunction vectorFunction, 
                                      Object parameterObject, 
                                      Map<String, Object> additionalParams) {
        
        List<?> queryVectors = resolveQueryVector(vectorFunction, parameterObject, additionalParams);
        
        String expr = buildFilterExpression(additionalParams);
        
        List<String> outputFields = Arrays.asList("id", vectorFunction.getColumnName());
        
        SearchParam searchParam = SearchParam.newBuilder()
            .withCollectionName(collectionName)
            .withMetricType(vectorFunction.getMetricType())
            .withTopK(vectorFunction.getTopK())
            .withVectors(queryVectors)
            .withVectorFieldName(vectorFunction.getColumnName())
            .withExpr(expr)
            .withOutFields(outputFields)
            .withParams(buildSearchParams(vectorFunction))
            .build();
            
        return searchParam;
    }
    
    private List<?> resolveQueryVector(VectorFunction vectorFunction, 
                                     Object parameterObject,
                                     Map<String, Object> additionalParams) {
        // 从MyBatis参数中解析查询向量
        // 支持多种参数类型:Map、实体对象、@Param注解参数等
        String paramName = vectorFunction.getParameterName();
        Object vectorValue = ParameterResolver.resolve(parameterObject, paramName);
        
        if (vectorValue == null) {
            thrownew IllegalArgumentException("未找到向量参数: " + paramName);
        }
        
        return convertToVectorList(vectorValue);
    }
    
    private List<Long> extractEntityIds(SearchResults results) {
        List<Long> ids = new ArrayList<>();
        for (QueryResultsWrapper wrapper : results) {
            List<Long> chunkIds = wrapper.getFieldData("id", 0).get(Long.class);
            ids.addAll(chunkIds);
        }
        return ids;
    }
}
结果合并与映射处理

ResultMerger 负责将向量搜索结果与关系查询结果进行智能合并,确保数据一致性。

代码语言:javascript
复制
@Service
publicclass ResultMerger {
    
    public List<Map<String, Object>> mergeResults(List<SearchResults> vectorResults,
                                                List<Map<String, Object>> relationalResults) {
        if (vectorResults == null || vectorResults.isEmpty()) {
            return relationalResults;
        }
        
        // 构建向量结果的ID到完整数据的映射
        Map<Long, Map<String, Object>> vectorResultMap = buildVectorResultMap(vectorResults);
        
        // 按照ID关联合并结果
        List<Map<String, Object>> mergedResults = new ArrayList<>();
        for (Map<String, Object> relationalRow : relationalResults) {
            Long entityId = (Long) relationalRow.get("id");
            Map<String, Object> vectorData = vectorResultMap.get(entityId);
            
            if (vectorData != null) {
                Map<String, Object> mergedRow = new LinkedHashMap<>();
                // 优先使用关系查询的结果,补充向量数据
                mergedRow.putAll(relationalRow);
                mergedRow.putAll(vectorData);
                mergedResults.add(mergedRow);
            }
        }
        
        // 根据向量相似度排序
        return sortBySimilarity(mergedResults);
    }
    
    private Map<Long, Map<String, Object>> buildVectorResultMap(List<SearchResults> vectorResults) {
        Map<Long, Map<String, Object>> resultMap = new HashMap<>();
        
        for (SearchResults results : vectorResults) {
            for (QueryResultsWrapper wrapper : results) {
                List<Long> ids = wrapper.getFieldData("id", 0).get(Long.class);
                List<Float> distances = wrapper.getScores();
                
                for (int i = 0; i < ids.size(); i++) {
                    Map<String, Object> vectorData = new HashMap<>();
                    vectorData.put("id", ids.get(i));
                    vectorData.put("similarity_score", 1 - distances.get(i)); // 转换为相似度分数
                    resultMap.put(ids.get(i), vectorData);
                }
            }
        }
        
        return resultMap;
    }
    
    private List<Map<String, Object>> sortBySimilarity(List<Map<String, Object>> results) {
        return results.stream()
            .sorted((r1, r2) -> {
                Double score1 = (Double) r1.getOrDefault("similarity_score", 0.0);
                Double score2 = (Double) r2.getOrDefault("similarity_score", 0.0);
                return score2.compareTo(score1); // 降序排列
            })
            .collect(Collectors.toList());
    }
}
参数解析与类型处理

ParameterResolver 负责处理 MyBatis 的各种参数类型,确保正确提取向量参数。

代码语言:javascript
复制
public class ParameterResolver {
    
    public static Object resolve(Object parameterObject, String paramName) {
        if (parameterObject == null) {
            returnnull;
        }
        
        if (parameterObject instanceof Map) {
            // 处理@Param注解或Map参数
            return ((Map<?, ?>) parameterObject).get(paramName);
        } elseif (isPrimitiveOrWrapper(parameterObject.getClass())) {
            // 处理基本类型参数
            return parameterObject;
        } else {
            // 处理实体对象参数
            return resolveFromEntity(parameterObject, paramName);
        }
    }
    
    private static Object resolveFromEntity(Object entity, String fieldName) {
        try {
            Field field = entity.getClass().getDeclaredField(fieldName);
            field.setAccessible(true);
            return field.get(entity);
        } catch (Exception e) {
            // 尝试通过getter方法获取
            String getterName = "get" + capitalize(fieldName);
            try {
                Method getter = entity.getClass().getMethod(getterName);
                return getter.invoke(entity);
            } catch (Exception ex) {
                thrownew IllegalArgumentException("无法解析参数: " + fieldName, ex);
            }
        }
    }
    
    privatestatic List<?> convertToVectorList(Object vectorValue) {
        if (vectorValue instanceof List) {
            return (List<?>) vectorValue;
        } elseif (vectorValue instanceoffloat[]) {
            return Arrays.stream((float[]) vectorValue)
                       .boxed()
                       .collect(Collectors.toList());
        } elseif (vectorValue instanceofdouble[]) {
            return Arrays.stream((double[]) vectorValue)
                       .boxed()
                       .collect(Collectors.toList());
        } else {
            thrownew IllegalArgumentException("不支持的向量数据类型: " + vectorValue.getClass());
        }
    }
}
配置与注册机制

最后,我们需要在 MyBatis 配置中注册插件,并配置相关的 Bean。

代码语言:javascript
复制
@Configuration
@AutoConfigureAfter(MybatisAutoConfiguration.class)
public class VectorSearchConfig {
    
    @Bean
    public VectorSearchInterceptor vectorSearchInterceptor() {
        returnnew VectorSearchInterceptor();
    }
    
    @Bean
    @ConditionalOnMissingBean
    public MilvusServiceClient milvusServiceClient(
            @Value("${milvus.host:localhost}") String host,
            @Value("${milvus.port:19530}") int port) {
        
        ConnectParam connectParam = ConnectParam.newBuilder()
            .withHost(host)
            .withPort(port)
            .build();
            
        returnnew MilvusServiceClient(connectParam);
    }
}

// 在MyBatis配置中注册拦截器
@Bean
public ConfigurationCustomizer configurationCustomizer() {
    return configuration -> {
        configuration.addInterceptor(vectorSearchInterceptor());
    };
}
执行流程详解

整个插件的执行流程可以通过以下序列图清晰展示:

通过上述完整实现,我们构建了一个功能完备的 MyBatis 插件,能够无缝集成 SQL 查询与 Milvus 向量搜索。该实现不仅保持了 MyBatis 的原生开发体验,还提供了强大的向量搜索能力,为传统应用注入了 AI 原生能力。

高级特性

动态 SQL 扩展机制

在基础实现之上,我们引入了强大的动态 SQL 扩展机制,使得向量搜索能够与 MyBatis 的动态 SQL 标签无缝协作。这种扩展不仅支持传统的 <if><choose> 等标签,还专门针对向量搜索场景设计了智能优化策略。

动态向量参数解析器

代码语言:javascript
复制
@Component
publicclass DynamicVectorParameterResolver {
    
    public List<VectorSearchCondition> resolveDynamicConditions(
            MappedStatement ms, Object parameter, BoundSql boundSql) {
        
        List<VectorSearchCondition> conditions = new ArrayList<>();
        SqlNode rootSqlNode = getSqlSourceRoot(ms);
        
        // 深度优先遍历动态SQL节点树
        traverseSqlNode(rootSqlNode, parameter, conditions, new HashMap<>());
        
        return optimizeConditions(conditions);
    }
    
    private void traverseSqlNode(SqlNode node, Object parameter, 
                               List<VectorSearchCondition> conditions,
                               Map<String, Object> context) {
        
        if (node instanceof IfSqlNode) {
            processIfNode((IfSqlNode) node, parameter, conditions, context);
        } elseif (node instanceof ChooseSqlNode) {
            processChooseNode((ChooseSqlNode) node, parameter, conditions, context);
        } elseif (node instanceof ForEachSqlNode) {
            processForEachNode((ForEachSqlNode) node, parameter, conditions, context);
        } elseif (node instanceof MixedSqlNode) {
            processMixedNode((MixedSqlNode) node, parameter, conditions, context);
        }
    }
    
    private void processIfNode(IfSqlNode ifNode, Object parameter,
                             List<VectorSearchCondition> conditions,
                             Map<String, Object> context) {
        
        try {
            OgnlClassResolver resolver = new OgnlClassResolver();
            OgnlContext ognlContext = (OgnlContext) Ognl.createDefaultContext(parameter, resolver);
            
            Boolean testResult = (Boolean) Ognl.getValue(ifNode.getTest(), ognlContext, parameter);
            if (Boolean.TRUE.equals(testResult)) {
                traverseSqlNode(ifNode.getContents(), parameter, conditions, context);
            }
        } catch (OgnlException e) {
            thrownew RuntimeException("动态SQL条件解析失败", e);
        }
    }
}

智能条件优化器

动态 SQL 扩展的核心在于条件优化,系统能够根据查询条件的组合自动选择最优执行路径:

代码语言:javascript
复制
public class VectorConditionOptimizer {
    
    public ExecutionPlan optimizeExecution(List<VectorSearchCondition> conditions,
                                         QueryStatistics statistics) {
        
        ExecutionPlan plan = new ExecutionPlan();
        
        // 分析条件类型分布
        ConditionAnalysis analysis = analyzeConditions(conditions);
        
        if (analysis.getVectorRatio() > 0.7) {
            // 向量条件占主导,优先执行向量搜索
            plan.setPrimaryExecutor(ExecutorType.VECTOR);
            plan.setSecondaryExecutor(ExecutorType.RELATIONAL);
        } elseif (analysis.getScalarRatio() > 0.7) {
            // 标量条件占主导,优先执行关系查询
            plan.setPrimaryExecutor(ExecutorType.RELATIONAL);
            plan.setSecondaryExecutor(ExecutorType.VECTOR);
        } else {
            // 混合条件,采用并行执行策略
            plan.setPrimaryExecutor(ExecutorType.PARALLEL);
        }
        
        // 基于历史统计信息调整执行策略
        if (statistics != null) {
            adjustPlanByStatistics(plan, statistics);
        }
        
        return plan;
    }
    
    private ConditionAnalysis analyzeConditions(List<VectorSearchCondition> conditions) {
        ConditionAnalysis analysis = new ConditionAnalysis();
        
        for (VectorSearchCondition condition : conditions) {
            if (condition.isVectorCondition()) {
                analysis.incrementVectorCount();
            } elseif (condition.isScalarCondition()) {
                analysis.incrementScalarCount();
            }
            
            // 分析条件选择性
            analysis.analyzeSelectivity(condition);
        }
        
        return analysis;
    }
}
多向量索引智能路由

现代向量搜索场景往往需要在同一系统中处理多种类型的向量数据,我们的插件支持基于多向量索引的智能路由机制。

索引元数据管理

代码语言:javascript
复制
@Service
publicclass VectorIndexRegistry {
    
    privatefinal Map<String, IndexMetadata> indexMetadataMap = new ConcurrentHashMap<>();
    privatefinal IndexRouter indexRouter;
    
    @PostConstruct
    public void initialize() {
        refreshIndexMetadata();
        startMetadataRefreshTask();
    }
    
    public String routeToIndex(VectorSearchContext context) {
        return indexRouter.route(context);
    }
    
    public IndexMetadata getIndexMetadata(String indexName) {
        return indexMetadataMap.get(indexName);
    }
    
    private void refreshIndexMetadata() {
        ListCollectionsParam param = ListCollectionsParam.newBuilder().build();
        R<List<String>> response = milvusClient.listCollections(param);
        
        if (response.getStatus() == R.Status.Success.getCode()) {
            for (String collectionName : response.getData()) {
                loadCollectionMetadata(collectionName);
            }
        }
    }
    
    private void loadCollectionMetadata(String collectionName) {
        DescribeCollectionParam param = DescribeCollectionParam.newBuilder()
            .withCollectionName(collectionName)
            .build();
        
        R<DescribeCollectionResponse> response = milvusClient.describeCollection(param);
        if (response.getStatus() == R.Status.Success.getCode()) {
            IndexMetadata metadata = buildIndexMetadata(response.getData());
            indexMetadataMap.put(collectionName, metadata);
        }
    }
}

基于规则的智能路由

动态索引选择算法

代码语言:javascript
复制
public class AdaptiveIndexSelector {
    
    public SelectedIndex selectOptimalIndex(VectorSearchRequest request, 
                                          List<IndexMetadata> candidates) {
        
        List<IndexScore> scores = candidates.stream()
            .map(index -> calculateIndexScore(index, request))
            .sorted(Comparator.comparing(IndexScore::getScore).reversed())
            .collect(Collectors.toList());
        
        return scores.get(0).getIndex();
    }
    
    private IndexScore calculateIndexScore(IndexMetadata index, VectorSearchRequest request) {
        double score = 0.0;
        
        // 维度匹配度 (权重: 0.3)
        score += 0.3 * calculateDimensionScore(index, request);
        
        // 度量类型匹配度 (权重: 0.25)
        score += 0.25 * calculateMetricScore(index, request);
        
        // 性能预测分数 (权重: 0.25)
        score += 0.25 * calculatePerformanceScore(index, request);
        
        // 负载均衡分数 (权重: 0.2)
        score += 0.2 * calculateLoadScore(index);
        
        returnnew IndexScore(index, score);
    }
    
    private double calculateDimensionScore(IndexMetadata index, VectorSearchRequest request) {
        int indexDim = index.getDimension();
        int requestDim = request.getVectorDimension();
        
        if (indexDim == requestDim) {
            return1.0;
        } else {
            return1.0 - (Math.abs(indexDim - requestDim) / (double) Math.max(indexDim, requestDim));
        }
    }
}
分布式错误处理与重试机制

在分布式环境中,健壮的错误处理是保证系统可靠性的关键。我们实现了分层的错误处理策略。

错误分类与处理策略

代码语言:javascript
复制
public enum ErrorCategory {
    NETWORK_ERROR(1, "网络通信错误", RetryStrategy.EXPONENTIAL_BACKOFF),
    TIMEOUT_ERROR(2, "请求超时错误", RetryStrategy.FIXED_DELAY),
    RATE_LIMIT_ERROR(3, "限流错误", RetryStrategy.EXPONENTIAL_BACKOFF),
    DATA_ERROR(4, "数据格式错误", RetryStrategy.NO_RETRY),
    SERVER_ERROR(5, "服务器内部错误", RetryStrategy.FIXED_DELAY),
    UNKNOWN_ERROR(6, "未知错误", RetryStrategy.NO_RETRY);
    
    privatefinalint code;
    privatefinal String description;
    privatefinal RetryStrategy defaultStrategy;
    
}

@Component
publicclass ErrorHandler {
    
    privatefinal Map<ErrorCategory, ErrorHandlerStrategy> strategies;
    privatefinal CircuitBreaker circuitBreaker;
    
    public <T> T executeWithRetry(Supplier<T> operation, String operationName) {
        int attempt = 0;
        Exception lastException = null;
        
        while (attempt < getMaxRetries()) {
            try {
                // 检查熔断器状态
                if (!circuitBreaker.allowRequest()) {
                    thrownew CircuitBreakerOpenException("熔断器已开启");
                }
                
                T result = operation.get();
                circuitBreaker.recordSuccess();
                return result;
                
            } catch (Exception e) {
                lastException = e;
                attempt++;
                
                ErrorCategory category = classifyError(e);
                if (!shouldRetry(category, attempt)) {
                    break;
                }
                
                circuitBreaker.recordFailure();
                waitBeforeRetry(category, attempt);
            }
        }
        
        thrownew OperationFailedException("操作失败 after " + attempt + " 次重试", lastException);
    }
    
    private ErrorCategory classifyError(Exception e) {
        if (e instanceof ConnectException || e instanceof SocketTimeoutException) {
            return ErrorCategory.NETWORK_ERROR;
        } elseif (e instanceof TimeoutException) {
            return ErrorCategory.TIMEOUT_ERROR;
        } elseif (e instanceof RpcException) {
            RpcException rpcException = (RpcException) e;
            if (rpcException.getStatus().getCode() == Status.Code.RESOURCE_EXHAUSTED) {
                return ErrorCategory.RATE_LIMIT_ERROR;
            }
        }
        
        return ErrorCategory.UNKNOWN_ERROR;
    }
}

熔断器状态管理

分布式事务一致性保障

在混合查询场景中,保证向量搜索与关系查询的事务一致性是极具挑战性的任务。我们通过多阶段的协调机制来实现最终一致性。

混合事务协调器

代码语言:javascript
复制
@Component
publicclass HybridTransactionCoordinator {
    
    privatefinal VectorTransactionManager vectorTM;
    privatefinal RelationalTransactionManager relationalTM;
    privatefinal TransactionLogStore logStore;
    
    public <T> T executeInHybridTransaction(TransactionCallback<T> callback) {
        String transactionId = generateTransactionId();
        TransactionContext context = new TransactionContext(transactionId);
        
        try {
            // 阶段1: 预执行检查
            preExecuteCheck(context);
            
            // 阶段2: 并行执行
            CompletableFuture<VectorTransaction> vectorFuture = 
                CompletableFuture.supplyAsync(() -> vectorTM.beginTransaction(context));
                
            CompletableFuture<RelationalTransaction> relationalFuture = 
                CompletableFuture.supplyAsync(() -> relationalTM.beginTransaction(context));
            
            CompletableFuture.allOf(vectorFuture, relationalFuture).join();
            
            // 阶段3: 业务逻辑执行
            T result = callback.doInTransaction(context);
            
            // 阶段4: 两阶段提交
            if (shouldCommit(context)) {
                commitTransaction(vectorFuture.get(), relationalFuture.get(), context);
            } else {
                rollbackTransaction(vectorFuture.get(), relationalFuture.get(), context);
            }
            
            return result;
            
        } catch (Exception e) {
            // 阶段5: 异常处理与恢复
            handleTransactionException(context, e);
            thrownew TransactionException("混合事务执行失败", e);
        }
    }
    
    private void commitTransaction(VectorTransaction vt, 
                                 RelationalTransaction rt,
                                 TransactionContext context) {
        
        // 阶段4.1: 预提交
        vectorTM.prepareCommit(vt);
        relationalTM.prepareCommit(rt);
        
        // 阶段4.2: 提交
        try {
            vectorTM.commit(vt);
            relationalTM.commit(rt);
            logStore.logTransactionSuccess(context.getTransactionId());
        } catch (Exception e) {
            // 阶段4.3: 提交失败处理
            handleCommitFailure(vt, rt, context, e);
        }
    }
}

一致性状态追踪

代码语言:javascript
复制
public class ConsistencyTracker {
    
    privatefinal Map<String, QueryConsistency> consistencyMap = new ConcurrentHashMap<>();
    privatefinal ConsistencyVerifier verifier;
    
    public void trackQueryExecution(String queryId, List<ExecutionStep> steps) {
        QueryConsistency consistency = new QueryConsistency(queryId);
        
        for (ExecutionStep step : steps) {
            consistency.addStep(step);
            
            // 实时验证一致性
            if (needsImmediateVerification(step)) {
                VerificationResult result = verifier.verifyStepConsistency(step);
                if (!result.isConsistent()) {
                    triggerConsistencyRepair(consistency, step, result);
                }
            }
        }
        
        consistencyMap.put(queryId, consistency);
    }
    
    public void verifyFinalConsistency(String queryId) {
        QueryConsistency consistency = consistencyMap.get(queryId);
        if (consistency == null) return;
        
        CompletableFuture<VerificationResult> vectorFuture = 
            CompletableFuture.supplyAsync(() -> 
                verifier.verifyVectorResults(consistency));
                
        CompletableFuture<VerificationResult> relationalFuture = 
            CompletableFuture.supplyAsync(() -> 
                verifier.verifyRelationalResults(consistency));
        
        try {
            CompletableFuture.allOf(vectorFuture, relationalFuture).get(30, TimeUnit.SECONDS);
            
            VerificationResult vectorResult = vectorFuture.get();
            VerificationResult relationalResult = relationalFuture.get();
            
            if (!vectorResult.isConsistent() || !relationalResult.isConsistent()) {
                scheduleConsistencyRepair(consistency, vectorResult, relationalResult);
            }
            
        } catch (TimeoutException e) {
            log.warn("一致性验证超时,将异步处理: {}", queryId);
            scheduleAsyncVerification(queryId);
        } catch (Exception e) {
            log.error("一致性验证异常: {}", queryId, e);
        }
    }
}
性能监控与自适应优化

为了确保系统长期稳定运行,我们实现了全面的性能监控和自适应优化机制。

运行时指标收集

代码语言:javascript
复制
@Component
publicclass PerformanceMonitor {
    
    privatefinal MetricsRegistry registry;
    privatefinal List<PerformanceAlert> alerts;
    
    @EventListener
    public void onQueryExecution(QueryExecutionEvent event) {
        // 收集执行时间指标
        registry.timer("query.execution.time")
               .record(event.getExecutionTime(), TimeUnit.MILLISECONDS);
        
        // 收集资源使用指标
        registry.gauge("query.memory.usage", event.getMemoryUsage());
        registry.gauge("query.cpu.usage", event.getCpuUsage());
        
        // 检查性能异常
        checkPerformanceAnomalies(event);
    }
    
    private void checkPerformanceAnomalies(QueryExecutionEvent event) {
        for (PerformanceAlert alert : alerts) {
            if (alert.shouldTrigger(event)) {
                handlePerformanceAlert(alert, event);
                
                // 触发自适应优化
                if (alert.requiresOptimization()) {
                    adaptiveOptimizer.optimize(alert.getOptimizationContext());
                }
            }
        }
    }
}

@Component
publicclass AdaptiveOptimizer {
    
    public void optimize(OptimizationContext context) {
        // 基于历史数据的学习优化
        OptimizationPlan plan = learningOptimizer.generatePlan(context);
        
        // 执行优化措施
        executeOptimizationPlan(plan);
        
        // 验证优化效果
        validateOptimizationEffect(plan);
    }
    
    private void executeOptimizationPlan(OptimizationPlan plan) {
        for (OptimizationAction action : plan.getActions()) {
            switch (action.getType()) {
                case INDEX_REBUILD:
                    rebuildIndex(action.getParameters());
                    break;
                case CACHE_ADJUSTMENT:
                    adjustCacheStrategy(action.getParameters());
                    break;
                case QUERY_REWRITE:
                    updateQueryRewriteRules(action.getParameters());
                    break;
                case RESOURCE_REALLOCATION:
                    reallocateResources(action.getParameters());
                    break;
            }
        }
    }
}

自适应优化决策流程

通过这些特性的实现,系统不仅具备了强大的功能扩展性,还在可靠性、性能和一致性方面达到了生产级的要求。这些特性使得系统能够适应复杂的业务场景。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-10-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coder建设 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 自定义 MyBatis 插件,让 SQL 查询与 Milvus Java SDK 完美融合
    • 概念与架构
    • 实现
    • 高级特性
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档