前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊ShardingSphere是怎么进行sql重写的

聊聊ShardingSphere是怎么进行sql重写的

作者头像
code4it
发布于 2023-09-12 11:55:27
发布于 2023-09-12 11:55:27
36400
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下ShardingSphere进行sql重写的原理

prepareStatement

org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final class ShardingSphereConnection extends AbstractConnectionAdapter {

    @Override
    public PreparedStatement prepareStatement(final String sql) throws SQLException {
        return new ShardingSpherePreparedStatement(this, sql);
    }

    //......
}    

ShardingSphereConnection的prepareStatement创建的是ShardingSpherePreparedStatement

ShardingSpherePreparedStatement

org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final class ShardingSpherePreparedStatement extends AbstractPreparedStatementAdapter {
    
    @Getter
    private final ShardingSphereConnection connection;

    public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
        this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false, null);
    }

    private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
                                            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys,
                                            final String[] columns) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new EmptySQLException().toSQLException();
        }
        this.connection = connection;
        metaDataContexts = connection.getContextManager().getMetaDataContexts();
        SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
        hintValueContext = sqlParserRule.isSqlCommentParseEnabled() ? new HintValueContext() : SQLHintUtils.extractHint(sql).orElseGet(HintValueContext::new);
        this.sql = sqlParserRule.isSqlCommentParseEnabled() ? sql : SQLHintUtils.removeHint(sql);
        statements = new ArrayList<>();
        parameterSets = new ArrayList<>();
        SQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(
                DatabaseTypeEngine.getTrunkDatabaseTypeName(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType()));
        sqlStatement = sqlParserEngine.parse(this.sql, true);
        sqlStatementContext = SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData(), sqlStatement, connection.getDatabaseName());
        parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
        statementOption = returnGeneratedKeys ? new StatementOption(true, columns) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
        executor = new DriverExecutor(connection);
        JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.getDatabaseConnectionManager().getConnectionContext());
        batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getDatabaseName());
        kernelProcessor = new KernelProcessor();
        statementsCacheable = isStatementsCacheable(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData());
        trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
        selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable();
        statementManager = new StatementManager();
    }

    //......
}    

ShardingSpherePreparedStatement继承了AbstractPreparedStatementAdapter,其构造器主要是通过SQLParserEngine解析sql得到SQLStatement,创建DriverExecutor、BatchPreparedStatementExecutor、KernelProcessor、StatementManager;这里即使useServerPrepStmts=true,也不会触发mysql server的prepare操作

executeUpdate

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public int executeUpdate() throws SQLException {
        try {
            if (statementsCacheable && !statements.isEmpty()) {
                resetParameters();
                return statements.iterator().next().executeUpdate();
            }
            clearPrevious();
            QueryContext queryContext = createQueryContext();
            trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
            if (null != trafficInstanceId) {
                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);
                return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());
            }
            executionContext = createExecutionContext(queryContext);
            if (hasRawExecutionRule()) {
                Collection<ExecuteResult> executeResults = executor.getRawExecutor().execute(createRawExecutionGroupContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback());
                return accumulate(executeResults);
            }
            return isNeedImplicitCommitTransaction(connection, executionContext) ? executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate();
            // CHECKSTYLE:OFF
        } catch (final RuntimeException ex) {
            // CHECKSTYLE:ON
            handleExceptionInTransaction(connection, metaDataContexts);
            throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());
        } finally {
            clearBatch();
        }
    }

    private void clearPrevious() {
        statements.clear();
        parameterSets.clear();
        generatedValues.clear();
    }

    private ExecutionContext createExecutionContext(final QueryContext queryContext) {
        ShardingSphereRuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData();
        ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName());
        SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext());
        ExecutionContext result = kernelProcessor.generateExecutionContext(
                queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext());
        findGeneratedKey(result).ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));
        return result;
    }

这里executeUpdate会先执行clearPrevious方法,清空statements、parameterSets、generatedValues,然后createExecutionContext,这里有一步是kernelProcessor.generateExecutionContext

KernelProcessor

generateExecutionContext

shardingsphere-infra-context-5.4.0-sources.jar!/org/apache/shardingsphere/infra/connection/kernel/KernelProcessor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public ExecutionContext generateExecutionContext(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,
                                                     final ConfigurationProperties props, final ConnectionContext connectionContext) {
        RouteContext routeContext = route(queryContext, database, globalRuleMetaData, props, connectionContext);
        SQLRewriteResult rewriteResult = rewrite(queryContext, database, globalRuleMetaData, props, routeContext, connectionContext);
        ExecutionContext result = createExecutionContext(queryContext, database, routeContext, rewriteResult);
        logSQL(queryContext, props, result);
        return result;
    }

KernelProcessor的generateExecutionContext方法先创建routeContext,然后执行rewrite,最后执行createExecutionContext

rewrite

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private SQLRewriteResult rewrite(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,
                                     final ConfigurationProperties props, final RouteContext routeContext, final ConnectionContext connectionContext) {
        SQLRewriteEntry sqlRewriteEntry = new SQLRewriteEntry(database, globalRuleMetaData, props);
        return sqlRewriteEntry.rewrite(queryContext.getSql(), queryContext.getParameters(), queryContext.getSqlStatementContext(), routeContext, connectionContext, queryContext.getHintValueContext());
    }

rewrite主要是通过SQLRewriteEntry的rewrite方法进行的

SQLRewriteEntry

shardingsphere-infra-rewrite-5.4.0-sources.jar!/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * Rewrite.
     * 
     * @param sql SQL
     * @param params SQL parameters
     * @param sqlStatementContext SQL statement context
     * @param routeContext route context
     * @param connectionContext connection context
     * @param hintValueContext hint value context
     * 
     * @return route unit and SQL rewrite result map
     */
    public SQLRewriteResult rewrite(final String sql, final List<Object> params, final SQLStatementContext sqlStatementContext,
                                    final RouteContext routeContext, final ConnectionContext connectionContext, final HintValueContext hintValueContext) {
        SQLRewriteContext sqlRewriteContext = createSQLRewriteContext(sql, params, sqlStatementContext, routeContext, connectionContext, hintValueContext);
        SQLTranslatorRule rule = globalRuleMetaData.getSingleRule(SQLTranslatorRule.class);
        DatabaseType protocolType = database.getProtocolType();
        Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
        return routeContext.getRouteUnits().isEmpty()
                ? new GenericSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext)
                : new RouteSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext, routeContext);
    }

    private SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> params, final SQLStatementContext sqlStatementContext,
                                                      final RouteContext routeContext, final ConnectionContext connectionContext, final HintValueContext hintValueContext) {
        SQLRewriteContext result = new SQLRewriteContext(database.getName(), database.getSchemas(), sqlStatementContext, sql, params, connectionContext, hintValueContext);
        decorate(decorators, result, routeContext, hintValueContext);
        result.generateSQLTokens();
        return result;
    }

    private void decorate(final Map<ShardingSphereRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext,
                          final RouteContext routeContext, final HintValueContext hintValueContext) {
        if (hintValueContext.isSkipSQLRewrite()) {
            return;
        }
        for (Entry<ShardingSphereRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) {
            entry.getValue().decorate(entry.getKey(), props, sqlRewriteContext, routeContext);
        }
    }

SQLRewriteEntry的rewrite方法,先通过createSQLRewriteContext来创建SQLRewriteContext,这里通过decorate方法遍历decorators,挨个执行SQLRewriteContextDecorator的decorate方法;最后通过GenericSQLRewriteEngine或者RouteSQLRewriteEngine进行rewrite

SQLRewriteContextDecorator

org/apache/shardingsphere/infra/rewrite/context/SQLRewriteContextDecorator.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@SingletonSPI
public interface SQLRewriteContextDecorator<T extends ShardingSphereRule> extends OrderedSPI<T> {
    
    /**
     * Decorate SQL rewrite context.
     *
     * @param rule rule
     * @param props ShardingSphere properties
     * @param sqlRewriteContext SQL rewrite context to be decorated
     * @param routeContext route context
     */
    void decorate(T rule, ConfigurationProperties props, SQLRewriteContext sqlRewriteContext, RouteContext routeContext);
}

SQLRewriteContextDecorator定义了decorate方法,它有诸如ShardingSQLRewriteContextDecorator、EncryptSQLRewriteContextDecorator的实现类

EncryptSQLRewriteContextDecorator

org/apache/shardingsphere/encrypt/rewrite/context/EncryptSQLRewriteContextDecorator.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * SQL rewrite context decorator for encrypt.
 */
public final class EncryptSQLRewriteContextDecorator implements SQLRewriteContextDecorator<EncryptRule> {
    
    @Override
    public void decorate(final EncryptRule encryptRule, final ConfigurationProperties props, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
        SQLStatementContext sqlStatementContext = sqlRewriteContext.getSqlStatementContext();
        if (!containsEncryptTable(encryptRule, sqlStatementContext)) {
            return;
        }
        Collection<EncryptCondition> encryptConditions = createEncryptConditions(encryptRule, sqlRewriteContext);
        if (!sqlRewriteContext.getParameters().isEmpty()) {
            Collection<ParameterRewriter> parameterRewriters = new EncryptParameterRewriterBuilder(encryptRule,
                    sqlRewriteContext.getDatabaseName(), sqlRewriteContext.getSchemas(), sqlStatementContext, encryptConditions).getParameterRewriters();
            rewriteParameters(sqlRewriteContext, parameterRewriters);
        }
        Collection<SQLTokenGenerator> sqlTokenGenerators = new EncryptTokenGenerateBuilder(encryptRule,
                sqlStatementContext, encryptConditions, sqlRewriteContext.getDatabaseName()).getSQLTokenGenerators();
        sqlRewriteContext.addSQLTokenGenerators(sqlTokenGenerators);
    }
    
    private Collection<EncryptCondition> createEncryptConditions(final EncryptRule encryptRule, final SQLRewriteContext sqlRewriteContext) {
        SQLStatementContext sqlStatementContext = sqlRewriteContext.getSqlStatementContext();
        if (!(sqlStatementContext instanceof WhereAvailable)) {
            return Collections.emptyList();
        }
        Collection<WhereSegment> whereSegments = ((WhereAvailable) sqlStatementContext).getWhereSegments();
        Collection<ColumnSegment> columnSegments = ((WhereAvailable) sqlStatementContext).getColumnSegments();
        return new EncryptConditionEngine(encryptRule, sqlRewriteContext.getSchemas())
                .createEncryptConditions(whereSegments, columnSegments, sqlStatementContext, sqlRewriteContext.getDatabaseName());
    }
    
    private boolean containsEncryptTable(final EncryptRule encryptRule, final SQLStatementContext sqlStatementContext) {
        for (String each : sqlStatementContext.getTablesContext().getTableNames()) {
            if (encryptRule.findEncryptTable(each).isPresent()) {
                return true;
            }
        }
        return false;
    }
    
    private void rewriteParameters(final SQLRewriteContext sqlRewriteContext, final Collection<ParameterRewriter> parameterRewriters) {
        for (ParameterRewriter each : parameterRewriters) {
            each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
        }
    }
    
    @Override
    public int getOrder() {
        return EncryptOrder.ORDER;
    }
    
    @Override
    public Class<EncryptRule> getTypeClass() {
        return EncryptRule.class;
    }
}

rewriteParameters是通过ParameterRewriter进行rewrite,主要是修改ParameterBuilder;而具体sql语句的修改则通过sqlTokenGenerators进行

SQLToken

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@RequiredArgsConstructor
@Getter
public abstract class SQLToken implements Comparable<SQLToken> {
    
    private final int startIndex;
    
    @Override
    public final int compareTo(final SQLToken sqlToken) {
        return startIndex - sqlToken.startIndex;
    }
}

SQLToken它有诸如InsertValuesToken、SubstitutableColumnNameToken、InsertColumnsToken之类的实现类

RouteSQLRewriteEngine

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * Rewrite SQL and parameters.
     *
     * @param sqlRewriteContext SQL rewrite context
     * @param routeContext route context
     * @return SQL rewrite result
     */
    public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
        Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1F);
        for (Entry<String, Collection<RouteUnit>> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) {
            Collection<RouteUnit> routeUnits = entry.getValue();
            if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) {
                sqlRewriteUnits.put(routeUnits.iterator().next(), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));
            } else {
                addSQLRewriteUnits(sqlRewriteUnits, sqlRewriteContext, routeContext, routeUnits);
            }
        }
        return new RouteSQLRewriteResult(translate(sqlRewriteContext.getSqlStatementContext().getSqlStatement(), sqlRewriteUnits));
    }

    private void addSQLRewriteUnits(final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits, final SQLRewriteContext sqlRewriteContext,
                                    final RouteContext routeContext, final Collection<RouteUnit> routeUnits) {
        for (RouteUnit each : routeUnits) {
            sqlRewriteUnits.put(each, new SQLRewriteUnit(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeContext, each)));
        }
    }

    private Map<RouteUnit, SQLRewriteUnit> translate(final SQLStatement sqlStatement, final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits) {
        Map<RouteUnit, SQLRewriteUnit> result = new LinkedHashMap<>(sqlRewriteUnits.size(), 1F);
        for (Entry<RouteUnit, SQLRewriteUnit> entry : sqlRewriteUnits.entrySet()) {
            DatabaseType storageType = storageTypes.get(entry.getKey().getDataSourceMapper().getActualName());
            String sql = translatorRule.translate(entry.getValue().getSql(), sqlStatement, protocolType, storageType);
            SQLRewriteUnit sqlRewriteUnit = new SQLRewriteUnit(sql, entry.getValue().getParameters());
            result.put(entry.getKey(), sqlRewriteUnit);
        }
        return result;
    }

addSQLRewriteUnits是往sqlRewriteUnits添加SQLRewriteUnit,最后translate方法构建SQLRewriteUnit;SQLRewriteUnit包含了更改之后的sql以及对应改动后的参数

useDriverToExecuteUpdate

org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private int useDriverToExecuteUpdate() throws SQLException {
        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
        cacheStatements(executionGroupContext.getInputGroups());
        return executor.getRegularExecutor().executeUpdate(executionGroupContext,
                executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
    }

    private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {
        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
        return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getDatabaseName()));
    } 

    private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws SQLException {
        for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
            each.getInputs().forEach(eachInput -> {
                statements.add((PreparedStatement) eachInput.getStorageResource());
                parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters());
            });
        }
        replay();
    }

    private void replay() throws SQLException {
        replaySetParameter();
        for (Statement each : statements) {
            getMethodInvocationRecorder().replay(each);
        }
    }

    private void replaySetParameter() throws SQLException {
        for (int i = 0; i < statements.size(); i++) {
            replaySetParameter(statements.get(i), parameterSets.get(i));
        }
    }

    protected final void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> params) throws SQLException {
        setParameterMethodInvocations.clear();
        addParameters(params);
        for (PreparedStatementInvocationReplayer each : setParameterMethodInvocations) {
            each.replayOn(preparedStatement);
        }
    }

    private void addParameters(final List<Object> params) {
        int i = 0;
        for (Object each : params) {
            int index = ++i;
            setParameterMethodInvocations.add(preparedStatement -> preparedStatement.setObject(index, each));
        }
    }

useDriverToExecuteUpdate方法会执行createExecutionGroupContext(会执行prepare方法),cacheStatements这里主要是把eachInput.getStorageResource()真正的PrepareStatement赋值到ShardingSpherePreparedStatement的statements变量中,把eachInput.getExecutionUnit().getSqlUnit().getParameters()赋值到parameterSets,然后执行replay方法通过PreparedStatementInvocationReplayer把修改后的变量replay到真正的PrepareStatement 该方法委托给executor.getRegularExecutor().executeUpdate,最后一个参数为callback,即createExecuteUpdateCallback

DriverExecutionPrepareEngine.prepare

org/apache/shardingsphere/infra/executor/sql/prepare/AbstractExecutionPrepareEngine.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public final ExecutionGroupContext<T> prepare(final RouteContext routeContext, final Collection<ExecutionUnit> executionUnits,
                                                  final ExecutionGroupReportContext reportContext) throws SQLException {
        return prepare(routeContext, Collections.emptyMap(), executionUnits, reportContext);
    }

    public final ExecutionGroupContext<T> prepare(final RouteContext routeContext, final Map<String, Integer> connectionOffsets, final Collection<ExecutionUnit> executionUnits,
                                                  final ExecutionGroupReportContext reportContext) throws SQLException {
        Collection<ExecutionGroup<T>> result = new LinkedList<>();
        for (Entry<String, List<SQLUnit>> entry : aggregateSQLUnitGroups(executionUnits).entrySet()) {
            String dataSourceName = entry.getKey();
            List<SQLUnit> sqlUnits = entry.getValue();
            List<List<SQLUnit>> sqlUnitGroups = group(sqlUnits);
            ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
            result.addAll(group(dataSourceName, connectionOffsets.getOrDefault(dataSourceName, 0), sqlUnitGroups, connectionMode));
        }
        return decorate(routeContext, result, reportContext);
    }

    protected List<ExecutionGroup<T>> group(final String dataSourceName, final int connectionOffset, final List<List<SQLUnit>> sqlUnitGroups, final ConnectionMode connectionMode) throws SQLException {
        List<ExecutionGroup<T>> result = new LinkedList<>();
        List<C> connections = databaseConnectionManager.getConnections(dataSourceName, connectionOffset, sqlUnitGroups.size(), connectionMode);
        int count = 0;
        for (List<SQLUnit> each : sqlUnitGroups) {
            result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionMode));
        }
        return result;
    }

    private ExecutionGroup<T> createExecutionGroup(final String dataSourceName, final List<SQLUnit> sqlUnits, final C connection, final ConnectionMode connectionMode) throws SQLException {
        List<T> result = new LinkedList<>();
        for (SQLUnit each : sqlUnits) {
            result.add((T) sqlExecutionUnitBuilder.build(new ExecutionUnit(dataSourceName, each), statementManager, connection, connectionMode, option, databaseTypes.get(dataSourceName)));
        }
        return new ExecutionGroup<>(result);
    }

group方法调用遍历SQLUnit执行createExecutionGroup,而后者则执行sqlExecutionUnitBuilder.build;这里databaseConnectionManager.getConnections获取的connection是通过真正driver获取的connection(com.mysql.jdbc.Driver)

PreparedStatementExecutionUnitBuilder

org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager,
                                   final Connection connection, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException {
        PreparedStatement preparedStatement = createPreparedStatement(
                executionUnit, statementManager, connection, connectionMode, option, databaseType);
        return new JDBCExecutionUnit(executionUnit, connectionMode, preparedStatement);
    }

    private PreparedStatement createPreparedStatement(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, final Connection connection,
                                                      final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException {
        return (PreparedStatement) statementManager.createStorageResource(executionUnit, connection, connectionMode, option, databaseType);
    }

PreparedStatementExecutionUnitBuilder的build方法这里才真正创建PreparedStatement

StatementManager

org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public Statement createStorageResource(final ExecutionUnit executionUnit, final Connection connection, final ConnectionMode connectionMode, final StatementOption option,
                                           final DatabaseType databaseType) throws SQLException {
        Statement result = cachedStatements.get(new CacheKey(executionUnit, connectionMode));
        if (null == result || result.isClosed() || result.getConnection().isClosed()) {
            String sql = executionUnit.getSqlUnit().getSql();
            if (option.isReturnGeneratedKeys()) {
                result = null == option.getColumns() || 0 == option.getColumns().length
                        ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
                        : connection.prepareStatement(sql, option.getColumns());
            } else {
                result = connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());
            }
            cachedStatements.put(new CacheKey(executionUnit, connectionMode), result);
        }
        return result;
    }

createStorageResource则是通过connection.prepareStatement来创建真正的PrepareStatement,而此时传入的sql也是经过重写之后的sql

createExecuteUpdateCallback

org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {
        boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
        return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown) {
            
            @Override
            protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
                return ((PreparedStatement) statement).executeUpdate();
            }
            
            @Override
            protected Optional<Integer> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
                return Optional.empty();
            }
        };
    }

createExecuteUpdateCallback创建的JDBCExecutorCallback,其executeSQL方法则是通过((PreparedStatement) statement).executeUpdate()来执行,即委托给了真正的PreparedStatement

小结

  • ShardingSphereConnection的prepareStatement创建的是ShardingSpherePreparedStatement,它在ShardingSpherePreparedStatement的executeUpdate的时候进行sql重写,然后prepare,最后执行的时候是通过JDBCExecutorCallback,其executeSQL方法则是通过((PreparedStatement) statement).executeUpdate()来执行,即委托给了真正的PreparedStatement
  • rewriteParameters是通过ParameterRewriter进行rewrite,主要是修改ParameterBuilder;而具体sql语句的修改则通过sqlTokenGenerators进行
  • PreparedStatementExecutionUnitBuilder的build方法这里才真正创建PreparedStatement:它通过StatementManager.createStorageResource则是通过connection.prepareStatement来创建真正的PrepareStatement,而此时传入的sql也是经过重写之后的sql
  • useDriverToExecuteUpdate方法会执行createExecutionGroupContext(会执行prepare方法),cacheStatements这里主要是把eachInput.getStorageResource()真正的PrepareStatement赋值到ShardingSpherePreparedStatement的statements变量中,把eachInput.getExecutionUnit().getSqlUnit().getParameters()赋值到parameterSets,然后执行replay方法通过PreparedStatementInvocationReplayer把修改后的变量replay到真正的PrepareStatement

ShardingSpherePreparedStatement实现了java.sql.PreparedStatement接口,其sql属性是用户传入的sql,即未经过重写的sql,而实际execute的时候,会触发sql重写(包括重写sql语句及参数),最后会通过connection.prepareStatement(传入重写之后的sql)来创建真正的PrepareStatement,然后有一步replay操作,把重写后的参数作用到真正的PrepareStatement,最后通过((PreparedStatement) statement).executeUpdate()来触发执行 至此我们可以得到sql重写的一个基本思路:通过实现java.sql.PreparedStatement接口伪装一个PreparedStatement类,其创建和set参数先内存缓存起来,之后在execute的时候进行sql重写,创建真正的PreparedStatement,replay参数,执行execute方法

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-05 09:09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
新iPhone专配灭霸紫,药丸屏变废为宝!6千起售,也能“向上捅破天”,库克最后还留下一个彩蛋
杨净 宇衡 羿阁 发自 凹非寺 量子位 | 公众号 QbitAI 顶着“十三香”压力,iPhone14系列终于来了! 结果这一次,不发mini改发沉寂多年的Plus版本,还将诟病已久的“叹号屏”爆改成“灵动岛”…… 直接让网友直呼:开冲!再度惊叹于苹果的创新力。 更关键的是,全系列几乎跟上一代售价一样。 只有在512G、1T这种大存储容量上,略涨了个300到500元不等,要知道还是在搭载新芯片A16的情况下。 要说,论刀法还是库克精准~去年体验过十三香的用户,瞬间觉得手里的iPhone不香了。 除此
量子位
2022/09/22
6280
新iPhone专配灭霸紫,药丸屏变废为宝!6千起售,也能“向上捅破天”,库克最后还留下一个彩蛋
首发双向卫星通信!苹果iPhone14系列升级A16处理器/药丸屏/4800万像素主摄!国内供应链厂商有哪些?
北京时间9月8日凌晨1点,苹果秋季新品发布会在线上召开,带来了全新的iPhone 14系列手机,以及Apple Watch系列手表、AirPods Pro 2耳机等新品。
芯智讯
2022/09/14
7560
首发双向卫星通信!苹果iPhone14系列升级A16处理器/药丸屏/4800万像素主摄!国内供应链厂商有哪些?
iPhone 14起价5999,160亿晶体管A16+「灵动岛」王炸!
---- 新智元报道   编辑:编辑部 【新智元导读】苹果发布会一句话总结:iPhone14 Pro 「灵动岛」惊艳到飞起!连莎士比亚都说十四行(手动狗头)。 一觉醒来... iPhone14 Pro 「灵动岛」太帅了!简直绝了! 莎士比亚都说「十四行」,看来果粉们手里的「十三香」已经不香了。 刚刚,苹果在线上举办了「超前瞻」(Far Out)发布会。 (凌晨两点半的时候) 看得出来,大家都没睡。 还真别说,苹果今年挤牙膏式设计还是深得人心。 关键是,厨子在教我们做事「药丸屏才叫设计,而不是
新智元
2022/09/22
5350
iPhone 14起价5999,160亿晶体管A16+「灵动岛」王炸!
搭载A16芯片,iPhone14Pro把刘海改成交互窗口!库克:这是最具创新性的专业产品
大数据文摘出品 作者:Caleb 2022年的“科技春晚”,比以往来得更早一些。 美国东部时间9月7日下午1点,苹果发布会如期召开。 与往年一样,在这场被命名为“Far Out”的发布会之前,网上对于今年苹果产品的趋势透露已经随处可见了。除了按照惯例的iPhone 14和AirPods的更新外,苹果或将砍掉iPhone mini,新增iPhone Plus,以及全新的Apple Watch Pro…… ‍ ‍实际情况如何,那就要从库克为这场发布会定下“手机,手表,耳机”的调开始说起:iPhone 14
大数据文摘
2022/09/08
7360
搭载A16芯片,iPhone14Pro把刘海改成交互窗口!库克:这是最具创新性的专业产品
iPhone 14系列发布:卫星功能上线,人们却盯着挖孔屏
机器之心报道 编辑:泽南、杜伟 苹果将「药丸」玩出了新花样。 「今天我介绍的产品 iPhone、AirPods 和 Apple Watch,将会成为人们生活的必需品,不论你身处何处,在什么时间。它们配合无间,同时每一件都是业界顶尖水平,」苹果 CEO 蒂姆 · 库克说道。 周四凌晨,今年科技领域最受关注的发布会在加州的苹果总部举行。今年的新一代 iPhone 价格上涨幅度不大,增加了不少关注安全性的功能,跑分水平提升有限,只有 Apple Watch 出了一款「超大」的新品。 和两天前的华为 mate50
机器之心
2022/09/22
7560
iPhone 14系列发布:卫星功能上线,人们却盯着挖孔屏
苹果全新iPhone首发3nm自研芯片,结果“华为发布会”冲上热搜第一…
哪怕是iPhone 15全系告别11年闪电接口改用USB-C、经典静音键从Pro系列消失,这些库克“违背祖宗的决定”,都没抢到更多热度。
量子位
2023/09/19
2150
苹果全新iPhone首发3nm自研芯片,结果“华为发布会”冲上热搜第一…
库克玩起性价比,iPhone 13同配置降价800块、Pro上高刷:这次苹果「真香了」?
机器之心报道 编辑:泽南、杜伟 你们都说十三香,今天它来了,你买不买? 虽然台积电的芯片开始涨价,iPhone 13 在国内的价格却变便宜了。 北京时间 2021 年 9 月 15 日凌晨 1 点,我们迎来了主题为「加州来电」的苹果线上秋季发布会。 会上,苹果发布了 iPhone 13 系列、新款 iPad 和 iPad mini 以及 Apple Watch Series 7。 新一代 iPhone 首次配备了 1TB 容量、加强长焦拍摄、提高充电速度和电池容量、缩小刘海尺寸,并搭载 A15 处理器,预计
机器之心
2023/03/29
5050
库克玩起性价比,iPhone 13同配置降价800块、Pro上高刷:这次苹果「真香了」?
iPhone 13发布,聊一聊这次苹果新品发布会
今天凌晨苹果刚刚开完了新品发布会,因为我是一个果粉,手上也有很多苹果设备,平时除了研究技术,还对各种数码产品感兴趣,所以对这次发布会也格外关注。
_Kaito
2021/10/08
5900
首款3nm芯片,支持硬件级光追,iPhone 15 Pro遥遥领先了吗?
华为 Mate60 系列的「抢先一步」,已经把遥遥领先玩成了梗,压力给到了库克这边。
机器之心
2023/09/19
3570
首款3nm芯片,支持硬件级光追,iPhone 15 Pro遥遥领先了吗?
iPhone14 刚发布就跌破发行价,到手仅需5599元!
都熬夜看发布会了吗? 1 科技界春晚 昨晚被誉为"科技界的春晚"的苹果秋季新品发布会如期而至,苹果推出三大全新产品:iPhone 14系列智能手机、Apple Watch智能穿戴手表、AirPods Pro 2无线蓝牙耳机。 当然了,我觉得大家最关注的产品仍然是 iPhone 14。 看完整个发布会,我感觉亮点基本上都集中在iPhone 14 Pro和 Pro Max 上了。  (iPhone 14 Pro Max与iPhone 14 Pro,图源Apple官网)  iPhone 14 Pro和iPhon
纯洁的微笑
2022/09/13
9340
iPhone14 刚发布就跌破发行价,到手仅需5599元!
苹果电脑全系换上自研芯片,除了不能打电话,比iPhone 12亮眼多了
贾浩楠 发自 凹非寺 量子位 报道 | 公众号 QbitAI 看到凌晨两点都不让人睡觉的苹果,忍不住要说一声: 「苹果够持久!」 今年的第三场新品发布会,苹果带来了预热已久的新Mac系列: 与「牙膏厂」英特尔彻底分手后,苹果一口气拿出了性能提升数倍、续航史上最长的Mac系列! 如此强力且持久,得益于苹果首款自研ARM架构的芯片:M1。 M1处理器 发布会上,库克宣布的第一款产品,就是M1芯片: M1芯片专门为Mac开发的,但不是一枚单独的CPU, 而是采用了5nm制程的SoC(系统级芯片),包括了4
量子位
2023/03/10
7700
苹果电脑全系换上自研芯片,除了不能打电话,比iPhone 12亮眼多了
一文看懂苹果2024秋季发布会:毫无创新的公式化2小时。
随着老库克上来一句Good Morning之后,发布会正式开始了。我看完了全程,看完的观后感是:苹果还是那个苹果,真的一点创新都没有,全程公式化的两小时发布会。
数字生命卡兹克
2025/04/14
900
一文看懂苹果2024秋季发布会:毫无创新的公式化2小时。
只要一万两千八,全新iPhone带回家 | 一文看尽苹果新品发布会
但即便如此,万亿市值的苹果依然展现出不少亮点——尤其是AI方面,新一代芯片A12,贯穿全场。
量子位
2018/09/29
9000
只要一万两千八,全新iPhone带回家 | 一文看尽苹果新品发布会
全球首款3nm芯片,苹果再次封神!全员上岛史诗级换C,主机游戏塞进iPhone,地表最强影像就差一个Vision Pro
被评为「史上最出色、最Pro iPhone」的iPhone 15 Pro一亮相,就破了多个纪录——
新智元
2023/09/19
4120
全球首款3nm芯片,苹果再次封神!全员上岛史诗级换C,主机游戏塞进iPhone,地表最强影像就差一个Vision Pro
苹果预售夜,官网崩了!iPhone 14 Pro被果粉抢空
---- 新智元报道   编辑:Aeneas 好困 【新智元导读】9月9日晚20点,iPhone 14全线新品开启预售,果粉们抢疯了。这次,Pro系列大卖,iPhone 14无人问津。 9月8日的科技春晚之后,同往年一样,今年iPhone14系列预购依然卖爆了。 昨晚8点,iPhone 14全线产品准时开启预购,随着大量果粉的涌入,不管是官网还是天猫旗舰店,都未能逃脱崩溃的命运。 几分钟后,访问终于恢复正常,一大波下单成功的截图如潮水般袭来。 而手慢了的朋友,就只能开启漫长,以及比漫长更漫长的等
新智元
2022/09/13
7750
苹果预售夜,官网崩了!iPhone 14 Pro被果粉抢空
iPhone 13便宜到上热搜!王守义诚不我欺
万万没想到,在一片“铁定涨价”的预测中,库克老贼反手就是一波“逆市”降价,大半夜的直接冲上微博热搜第一。
量子位
2021/09/29
6310
一文看尽苹果发布会:iPhone X背后黑科技全剖析
李根 若朴 假装发自 Steve Jobs Theater 量子位 出品 | 公众号 QbitAI 刚刚,苹果秋季新品发布会胜利闭幕。 这次新登场的苹果产品包括:Apple Watch Series
量子位
2018/03/27
1.6K0
一文看尽苹果发布会:iPhone X背后黑科技全剖析
华为VS苹果,你更pick谁?
北京时间9月13日凌晨,苹果新品发布会如期举行,本次发布会上最大的焦点为iPhone 15Pro系列,其中包括iPhone15、15Plus、15Pro、15Pro Max四款机型。将于9月15日接受预购,9月22日发售。iPhone15系列的发售也代表着在苹果“上班”10年的Lightning接口宣布退休。
数据猿
2023/09/15
2390
华为VS苹果,你更pick谁?
苹果2022秋季发布会, iPhone 14 Pro灵动岛功能发布,设计师要如何应对?
静电说:2022年苹果的秋季发布会如约而至,作为每年一度的必看节目,今年我看了40分钟就睡着了。emmmm,不过灵动岛这个功能还是很有意思的,一起来看看有哪些设计要点吧?
用户5009027
2022/10/27
5230
苹果2022秋季发布会, iPhone 14 Pro灵动岛功能发布,设计师要如何应对?
一大波iPhone14 Pro测评出炉!苹果「细节狂魔」人设不倒
---- 新智元报道   编辑:好困 Aeneas 【新智元导读】在拿到第一波14 Pro预售的日子里,小编总结了现有的测评报告,灵动岛、屏幕常亮、摄影功能都有惊喜,苹果依然是那个「细节控」。 今天,成功抢到第一波预售的朋友们,就能拿到全新的iPhone 14 Pro和Pro Max了! 据说,由于mini的销量不好,所以库克决定换个大屏的plus,重新割一波韭菜。 结果,销量更差了…… 本mini用户:想不到,你也有今天啊 那么,对于还没拿到手机的小伙伴们,可以一起来看看别人手里的14 Pro和
新智元
2022/09/20
9560
一大波iPhone14 Pro测评出炉!苹果「细节狂魔」人设不倒
推荐阅读
新iPhone专配灭霸紫,药丸屏变废为宝!6千起售,也能“向上捅破天”,库克最后还留下一个彩蛋
6280
首发双向卫星通信!苹果iPhone14系列升级A16处理器/药丸屏/4800万像素主摄!国内供应链厂商有哪些?
7560
iPhone 14起价5999,160亿晶体管A16+「灵动岛」王炸!
5350
搭载A16芯片,iPhone14Pro把刘海改成交互窗口!库克:这是最具创新性的专业产品
7360
iPhone 14系列发布:卫星功能上线,人们却盯着挖孔屏
7560
苹果全新iPhone首发3nm自研芯片,结果“华为发布会”冲上热搜第一…
2150
库克玩起性价比,iPhone 13同配置降价800块、Pro上高刷:这次苹果「真香了」?
5050
iPhone 13发布,聊一聊这次苹果新品发布会
5900
首款3nm芯片,支持硬件级光追,iPhone 15 Pro遥遥领先了吗?
3570
iPhone14 刚发布就跌破发行价,到手仅需5599元!
9340
苹果电脑全系换上自研芯片,除了不能打电话,比iPhone 12亮眼多了
7700
一文看懂苹果2024秋季发布会:毫无创新的公式化2小时。
900
只要一万两千八,全新iPhone带回家 | 一文看尽苹果新品发布会
9000
全球首款3nm芯片,苹果再次封神!全员上岛史诗级换C,主机游戏塞进iPhone,地表最强影像就差一个Vision Pro
4120
苹果预售夜,官网崩了!iPhone 14 Pro被果粉抢空
7750
iPhone 13便宜到上热搜!王守义诚不我欺
6310
一文看尽苹果发布会:iPhone X背后黑科技全剖析
1.6K0
华为VS苹果,你更pick谁?
2390
苹果2022秋季发布会, iPhone 14 Pro灵动岛功能发布,设计师要如何应对?
5230
一大波iPhone14 Pro测评出炉!苹果「细节狂魔」人设不倒
9560
相关推荐
新iPhone专配灭霸紫,药丸屏变废为宝!6千起售,也能“向上捅破天”,库克最后还留下一个彩蛋
更多 >
LV.0
南京市镁一刻网络科技有限公司
目录
  • prepareStatement
  • ShardingSpherePreparedStatement
  • executeUpdate
  • KernelProcessor
    • generateExecutionContext
    • rewrite
    • SQLRewriteEntry
    • SQLRewriteContextDecorator
    • EncryptSQLRewriteContextDecorator
    • SQLToken
    • RouteSQLRewriteEngine
  • useDriverToExecuteUpdate
    • DriverExecutionPrepareEngine.prepare
    • PreparedStatementExecutionUnitBuilder
    • StatementManager
    • createExecuteUpdateCallback
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档