前面两篇文章,对于SpringBatch这个批处理框架做了一个大概的学习和了解,通过前两篇文章,你可以了解到SpringBatch是什么?应用场景有哪些?怎么去写一个SpringBatch的demo?以及SpringBatch的架构设计和核心组件的简单介绍。
今天这篇文章我们会找其中一些源码来做一下分析,让你对于SpringBatch更加了解,更好的去做技术选型和场景化方案落地。
今天来分析一个之前demo中用到的类JdbcPagingItemReader<T>
1、JdbcPagingItemReader类的继承层次:
可以看到,该类的顶层是ItemReader接口和ItemStream接口。
2、JdbcPagingItemReader的作用是什么呢?
3、JdbcPagingItemReader的属性有哪些?
private static final String START_AFTER_VALUE = "start.after";
public static final int VALUE_NOT_SET = -1;
// 数据源,在从数据库的数据源读取数据的时候,你可以在不同的reader中进行
// 设置的时候,设置不同的数据源,可以参考我之前的多数据源的demo
private DataSource dataSource;
// 如果需要使用分页查询的话,那么你可以使用该方式来做分页查询
private PagingQueryProvider queryProvider;
// 顾名思义,该属性是用来组装你的SQL Where的参数的
private Map<String, Object> parameterValues;
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
// 对于你Jdbc查询的返回结果集就是RowMapper
private RowMapper<T> rowMapper;
private String firstPageSql;
private String remainingPagesSql;
private Map<String, Object> startAfterValues;
private Map<String, Object> previousStartAfterValues;
private int fetchSize = VALUE_NOT_SET;
关于PagingQueryProvider接口,这里需要做一下说明,SpringBatch它根据不同的数据库类型,都封装了对应的PagingQueryProvider实现类,比如MySqlPagingQueryProvider、OraclePagingQueryProvider等,具体见下图:
如果你了解过阿里巴巴开源的DataX这个开源作品,那么你会感觉它在一定程度上的设计思想和SpringBatch是类似的,都是对接不同的数据源通过Reader,写入数据源叫Writer,只是DataX做到了更细粒度可控,能插能拔,你只需要对你需要的做一些组装就可以使用起来,而SpringBatch它是提供了基本上我们常使用的一些数据源的封装。
4、JdbcPagingItemReader也实现了InitializingBean接口的afterProperies方法
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
Assert.notNull(dataSource, "DataSource may not be null");
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
if (fetchSize != VALUE_NOT_SET) {
jdbcTemplate.setFetchSize(fetchSize);
}
jdbcTemplate.setMaxRows(getPageSize());
namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate);
Assert.notNull(queryProvider, "QueryProvider may not be null");
queryProvider.init(dataSource);
this.firstPageSql = queryProvider.generateFirstPageQuery(getPageSize());
this.remainingPagesSql = queryProvider.generateRemainingPagesQuery(getPageSize());
}
从这里我们可以看到,SpringBatch实质还是使用JdbcTemplate来进行的SQL查询,默认的pageSize为10,然后queryProvider调用了init方法,将对应的DataSource当作参数传入。
下面是DataSource当作init参数传入以后的代码逻辑,从下面代码我们可以看出几点:
public void init(DataSource dataSource) throws Exception {
Assert.notNull(dataSource, "A DataSource is required");
Assert.hasLength(selectClause, "selectClause must be specified");
Assert.hasLength(fromClause, "fromClause must be specified");
Assert.notEmpty(sortKeys, "sortKey must be specified");
StringBuilder sql = new StringBuilder(64);
sql.append("SELECT ").append(selectClause);
sql.append(" FROM ").append(fromClause);
if (whereClause != null) {
sql.append(" WHERE ").append(whereClause);
}
if(groupClause != null) {
sql.append(" GROUP BY ").append(groupClause);
}
List<String> namedParameters = new ArrayList<>();
parameterCount = JdbcParameterUtils.countParameterPlaceholders(sql.toString(), namedParameters);
if (namedParameters.size() > 0) {
if (parameterCount != namedParameters.size()) {
throw new InvalidDataAccessApiUsageException(
"You can't use both named parameters and classic \"?\" placeholders: " + sql);
}
usingNamedParameters = true;
}
}
此处,你也可以看出来SpringBatch的PagingQueryProvider是只支持单表查询的,如果你想存在一些join类型的查询,那么它是在这种情况下不支持的。
5、SortedKey是怎么样的一个结构?
public void setSortKeys(Map<String, Order> sortKeys) {
this.sortKeys = sortKeys;
}
可以看出来,SortedKey是一个Map对象,其中key就是你数据库表的唯一key的字段名称,value就是一个Order对象,目前该对象只有两个属性,升序或者降序,Order是一个枚举类型:
public enum Order {
ASCENDING, DESCENDING
}
其中,我没有具体分析怎么去拼接Where条件的参数,有兴趣的可以自己去分析一下。
今天主要分享了一下SpringBatch批处理的中从数据库数据源读取数据的方式PagingQueryProvider。对于开源的东西我们不说好坏,吸收他的设计思想,发现他的不足,如果有余力,可以自行研发。
如果你数据源有数据库、消息类、文件类、那么你可以选择SpringBatch,最好建议是每一个reader读取单表数据,然后在processor中进行多个结果集的处理,最后做一个目标数据源数据的insert。如果是database类型,希望你可以在SpringBatch使用Reader读取数据的时候可以提高性能,必须索引之类,不要全表扫描之类等等
当然对于数据的抽取、清洗和转换你业可以考虑其他的技术方案、比如kettle、DataX(商业版是DataWorks),还有大数据类型,当然你也需要考虑你的资源问题,比如时间、人力等等