首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Mysql中使用流式查询避免数据量过大导致OOM-后续

Mysql中使用流式查询避免数据量过大导致OOM-后续

作者头像
加多
发布于 2018-09-06 07:02:41
发布于 2018-09-06 07:02:41
4.7K00
代码可运行
举报
文章被收录于专栏:Java编程技术Java编程技术
运行总次数:0
代码可运行

一、前言

之前http://www.jianshu.com/p/0339c6fe8b61 介绍了MySQL中三种使用流式方法,看起来很优雅,实则优雅的同时还是有一些注意事项的,下面就针对流式查询时候的注意事项进行介绍。

二、 同一个连接在游标迭代数据过程中不能被复用

2.1 简单介绍

先贴下MySQL Connector/J 5.1 Developer Guide中原文:

There are some caveats with this approach. You must read all of the rows in the result set (or close it) before you can issue any other queries on the connection, or an exception will be thrown.

也就是说当通过流式查询获取一个ResultSet后,在你通过next迭代出所有元素之前或者调用close关闭它之前,你不能使用同一个数据库连接去发起另外一个查询,否者抛出异常(第一次调用的正常,第二次的抛出异常)。

Therefore, if using streaming results, process them as quickly as possible if you want to maintain concurrent access to the tables referenced by the statement producing the result set.

如果你想要保持访问表并发量,那么就要尽可能快的把流式Resultset内容处理完毕。

之所以有这个限制是因为非游标情况下我们在得到resultset后,mysqlclient已经把数据全部放到了resultset,所以这时候该数据库连接就空闲了,所以可以去执行其他查询,而流式查询时候返回给我们Resultset后,所有数据并不都在Resultset,当我们调用next时候需要使用数据库连接从Server获取数据,所以在整个数据访问完毕之前这个连接一直被占用,所以才有了同一个连接在游标迭代数据过程中不能被复用的注意事项。

2.2 一个例子

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static void testOneConn() throws Exception {

        System.out.print("begn");
        String cmdSql = "select app_key,device_id,brand from test where app_key='111' AND app_version = '2.2.2' and package_name='com.taobao.taobao.test'";

        Connection conn = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;

        ExecutorService executor = Executors.newSingleThreadExecutor();

        try {

            conn = ds.getConnection();

            stmt = conn.prepareStatement(cmdSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            stmt.setFetchSize(Integer.MIN_VALUE);
                       //第一次查询(1)
            rs = stmt.executeQuery();
            final ResultSet tempRs = rs;
            Future<Integer> feture = executor.submit(new Callable<Integer>() {
                public Integer call() throws InterruptedException {
                    try {
                        while (tempRs.next()) {
                            try {
                                System.out.println("app_key:" + tempRs.getString(1) + "device_id:" + tempRs.getString(2)
                                        + "brand:" + tempRs.getString(3));
                            } catch (SQLException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                            
                        }
                    } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    return null;
                }
            });

            Thread.sleep(2000);

            try {
                //第二次查询(2)
                stmt.executeQuery();

            } catch (Exception e) {
                System.out.println("second search:" + e.getLocalizedMessage());

            }

            // 等待子线程执行完毕
            feture.get();

        } catch (Exception e) {
            System.out.println("first search:" + e.getLocalizedMessage());
        } finally {

            close(stmt, rs, conn);

        }
    }

执行上面代码:

app_key:111device_id:222brand:333

second search:Streaming result set com.mysql.jdbc.RowDataDynamic@3e0c5a62 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.

app_key:111device_id:232332brand:45454

app_key:111device_id:54brand:eyu345

........

可知第二次查询时候抛出了异常,说是RowDataDynamic@3e0c5a62 数据集还是激活状态,当一个连接上已经有一个打开的流式Resultset时候不能再发起一个查询,并且在尝试更多查询前确保调用了close方法。

而第一次查询不收影响继续自己的迭代数据。

那么就来看下在第二次查询前调用close方法会有啥效果。

在 查询(2)前添加rs.close();然后执行,结果是首先子线程会不断输出迭代结果,然后主线程调用close,调用close后子线程不在输出结果,然后主线程的close方法也没返回,这是啥情况那?不急,看看close里面做了啥:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void close() throws SQLException {
        ...

    //获取链接的锁
        Object mutex = this;

        MySQLConnection conn = null;

        if (this.owner != null) {
            conn = this.owner.connection;

            if (conn != null) {
                mutex = conn.getConnectionMutex();
            }
        }

        synchronized (mutex) {
            // drain the rest of the records.
            while (next() != null) {
                hadMore = true;
                howMuchMore++;

                if (howMuchMore % 100 == 0) {
                    Thread.yield();
                }
            }

        ...
    }

可知在调用close时候,里面还是循环调用next尝试把剩余记录迭代出来丢弃掉。我们调用close之所以没返回,实际上是因为内部在丢弃数据中,其实文档里面说迭代数据完毕或者调用close后才能调用新的查询,其实调用close作用还是要把Resultset里面的数据迭代出来完。

那么还有一个问题,上面说同时子线程也不输出结果了,为啥那?那么我们在回顾下next方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public boolean next() throws SQLException {
        synchronized (checkClosed().getConnectionMutex()) {

            ....
        }
    }

protected final MySQLConnection checkClosed() throws SQLException {
    MySQLConnection c = this.connection;
    
    if (c == null) {
        throw SQLError.createSQLException(
                Messages
                        .getString("ResultSet.Operation_not_allowed_after_ResultSet_closed_144"), //$NON-NLS-1$
                SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor());
    }
    
    return c;
}

soga,原来调用next方面里面也是先获取链接的锁,但是这个锁现在被close方法锁持有,你可能说synchronized是可重入锁哇,为啥调用next进入不了那? 不错synchronized是可重入锁,但是调用close和调用next是不同线程哦。

三、MyBatisCursorItemReader是线程不安全的

之前文章介绍了使用MyBatisCursorItemReader可以由我们自己操作游标,使用时候在xml注入即可:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<bean id="myMyBatisCursorItemReader" class="org.mybatis.spring.batch.MyBatisCursorItemReader">
    <property name="sqlSessionFactory" ref="sqlSessionFactory" />
    <property name="queryId"
        value="com.taobao.accs.mass.petadata.dal.sqlmap.AccsDeviceInfoDAOMapper.selectByExampleForPetaData" />
</bean>

当我们只有一个线程调用myMyBatisCursorItemReader进行查询操作时候,很优雅,没有问题,但是当多个线程都调用myMyBatisCursorItemReader进行open,read操作就有问题了,因为这货是线程不安全的。下面看下myMyBatisCursorItemReader代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MyBatisCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {

  private String queryId;

  private SqlSessionFactory sqlSessionFactory;
  private SqlSession sqlSession;

  private Map<String, Object> parameterValues;

  private Cursor<T> cursor;
  private Iterator<T> cursorIterator;
  
  ...
  @Override
  protected T doRead() throws Exception {
    T next = null;
    if (cursorIterator.hasNext()) {
      next = cursorIterator.next();
    }
    return next;
  }

  @Override
  protected void doOpen() throws Exception {
    Map<String, Object> parameters = new HashMap<String, Object>();
    if (parameterValues != null) {
      parameters.putAll(parameterValues);
    }

    sqlSession = sqlSessionFactory.openSession(ExecutorType.SIMPLE);
    cursor = sqlSession.selectCursor(queryId, parameters);
    cursorIterator = cursor.iterator();
  }

  @Override
  protected void doClose() throws Exception {
    cursor.close();
    sqlSession.close();
    cursorIterator = null;
  }

  ...
}

哦,原来下面这些变量都不是线程安全的

private SqlSession sqlSession;

private Map<String, Object> parameterValues;

private Cursor<T> cursor;

private Iterator<T> cursorIterator;

那么我们把他改造为ThreadLocal如何, 其实还是有问题,为啥那,看父类:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public abstract class AbstractItemCountingItemStreamItemReader<T> extends AbstractItemStreamItemReader<T> {

    private static final String READ_COUNT = "read.count";

    private static final String READ_COUNT_MAX = "read.count.max";

    private int currentItemCount = 0;

    private int maxItemCount = Integer.MAX_VALUE;

    private boolean saveState = true;

    
    protected void jumpToItem(int itemIndex) throws Exception {
        for (int i = 0; i < itemIndex; i++) {
            read();
        }
    }

    @Override
    public T read() throws Exception, UnexpectedInputException, ParseException {
        if (currentItemCount >= maxItemCount) {
            return null;
        }
        currentItemCount++;
        T item = doRead();
        if(item instanceof ItemCountAware) {
            ((ItemCountAware) item).setItemCount(currentItemCount);
        }
        return item;
    }

    protected int getCurrentItemCount() {
        return currentItemCount;
    }

    
    public void setCurrentItemCount(int count) {
        this.currentItemCount = count;
    }


    public void setMaxItemCount(int count) {
        this.maxItemCount = count;
    }

    @Override
    public void close() throws ItemStreamException {
        super.close();
        currentItemCount = 0;
        try {
            doClose();
        }
        catch (Exception e) {
            throw new ItemStreamException("Error while closing item reader", e);
        }
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        super.open(executionContext);
        try {
            doOpen();
        }
        catch (Exception e) {
            throw new ItemStreamException("Failed to initialize the reader", e);
        }
        if (!isSaveState()) {
            return;
        }

        if (executionContext.containsKey(getExecutionContextKey(READ_COUNT_MAX))) {
            maxItemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT_MAX));
        }

        int itemCount = 0;
        if (executionContext.containsKey(getExecutionContextKey(READ_COUNT))) {
            itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
        }
        else if(currentItemCount > 0) {
            itemCount = currentItemCount;
        }

        if (itemCount > 0 && itemCount < maxItemCount) {
            try {
                jumpToItem(itemCount);
            }
            catch (Exception e) {
                throw new ItemStreamException("Could not move to stored position on restart", e);
            }
        }

        currentItemCount = itemCount;

    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        super.update(executionContext);
        if (saveState) {
            Assert.notNull(executionContext, "ExecutionContext must not be null");
            executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
            if (maxItemCount < Integer.MAX_VALUE) {
                executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
            }
        }

    }


    public void setSaveState(boolean saveState) {
        this.saveState = saveState;
    }

    /**
     * The flag that determines whether to save internal state for restarts.
     * @return true if the flag was set
     */
    public boolean isSaveState() {
        return saveState;
    }

}

因为里面还有个currentItemCount是线程不安全的,回头看,会发现这个父类对我们没有用,他的作用是限制迭代出来的记录数据,如果不需要这个限制可以不用,所以可以改造为线程安全的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MyBatisCursorItemReaderThreadSafe<T> implements InitializingBean {

    private String queryId;

    private SqlSessionFactory sqlSessionFactory;
    private ThreadLocal<SqlSession> sqlSession = new ThreadLocal<SqlSession>();

    private ThreadLocal<Map<String, Object>> parameterValues = new ThreadLocal<Map<String, Object>>();

    private ThreadLocal<Cursor<T>> cursor = new ThreadLocal<Cursor<T>>();
    private ThreadLocal<Iterator<T>> cursorIterator = new ThreadLocal<Iterator<T>>();

    public MyBatisCursorItemReaderThreadSafe() {
    }

    public T doRead() throws Exception {
        T next = null;
        if (cursorIterator.get().hasNext()) {
            next = cursorIterator.get().next();
        }
        return next;
    }

    public void doOpen() throws Exception {
        Map<String, Object> parameters = new HashMap<String, Object>();
        if (parameterValues != null) {
            parameters.putAll(parameterValues.get());
        }
        SqlSession sqlSessionTmp = null;
        Cursor<T> cursorTemp = null;
        Iterator<T> cursorIteratorTmp = null;
        sqlSessionTmp = sqlSessionFactory.openSession(ExecutorType.SIMPLE);
        cursorTemp = sqlSessionTmp.selectCursor(queryId, parameters);
        cursorIteratorTmp = cursorTemp.iterator();

        // 成功后在设置
        cursor.set(cursorTemp);
        cursorIterator.set(cursorIteratorTmp);
        this.sqlSession.set(sqlSessionTmp);

    }

    public void doClose() throws Exception {

        Cursor<T> cursorTemp = cursor.get();
        if (null != cursorTemp) {
            cursorTemp.close();
            cursor.set(null);
        }
        sqlSession.get().close();

        Iterator<T> cursorIteratorTmp = cursorIterator.get();
        if (null != cursorIteratorTmp) {
            cursorIterator.set(null);

        }

        if (null != parameterValues) {
            parameterValues.set(null);
        }
    }

    
    public void setParameterValues(Map<String, Object> parameterValues) {
        this.parameterValues.set(parameterValues);
    }

}

当然还有更简单的办法,那就是使用原型模式,每次getBean时候会重新创建一个对象。或者每次使用时候new一个。

四 、参考

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.08.11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
MyBatis中使用流式查询避免数据量过大导致OOM
其中fetchSize="-2147483648",Integer.MIN_VALUE=-2147483648
加多
2018/09/06
8.2K0
MyBatis中使用流式查询避免数据量过大导致OOM
Mysql中使用流式查询避免数据量过大导致OOM
java 中MySQL JDBC 封装了流式查询操作,通过设置几个参数,就可以避免一次返回数据过大导致 OOM。
加多
2018/09/06
1.6K0
Mysql中使用流式查询避免数据量过大导致OOM
MyBatis中使用流式查询避免数据量过大导致OOM
看下日志,是因为一次查询数据量过大导致JVM内存溢出了,虽然可以配置JVM大小,但是指标不治本,还是需要优化代码。网上查看大家都是流式查询,这里记录下解决的过程。
终码一生
2022/04/15
1.4K0
Mybatis学习
使用jdbc进行数据库操作: 首先加载驱动driverManager,然后通过DriverManager获取数据库连接,创建statement,然后执行sql获取结果集,处理结果集,然后关闭连接。
路行的亚洲
2021/02/03
2780
Mybatis学习
强大:MyBatis 流式查询
流式查询指的是查询成功后不是返回一个集合而是返回一个迭代器,应用每次从迭代器取一条查询结果。流式查询的好处是能够降低内存使用。
肉眼品世界
2021/01/25
1.1K0
大数据量查询容易OOM?试试MySQL流式查询
程序访问 MySQL 数据库时,当查询出来的数据量特别大时,数据库驱动把加载到的数据全部加载到内存里,就有可能会导致内存溢出(OOM)。
陶陶技术笔记
2021/01/12
2.5K0
大数据量查询容易OOM?试试MySQL流式查询
JavaWeb——MyBatis框架之执行过程原理与解析(通过自定义MyBatis查询所有操作的实现来观察整个过程)
通过上一博文,我们了解了MyBatis的入门,知道了怎么搭建环境及最基本的使用,那么,本次我们结合上一博文的案例实战,进行更深入的分析MyBatis的执行过程,MyBatis使用代理dao方式进行增删改查时做了哪些事呢?其实就是两件:
Winter_world
2020/09/25
6760
JavaWeb——MyBatis框架之执行过程原理与解析(通过自定义MyBatis查询所有操作的实现来观察整个过程)
Spring Batch(6)——数据库批数据读写
前序文章陆续介绍了批处理的基本概念,Job使用、Step控制、Item的结构以及扁平文件的读写。本文将接着前面的内容说明数据库如何进行批处理读写。
随风溜达的向日葵
2019/07/11
4.6K0
Spring Batch(6)——数据库批数据读写
Mybatis源码阅读(二)
本文主要介绍Java中,不使用XML和使用XML构建SqlSessionFactory,通过SqlSessionFactory 中获取SqlSession的方法,使用SqlsessionManager管理Sqlsession复用等等..以及相关的示例代码
Java宝典
2020/11/30
2940
mybatis 核心原理 线程隔离
建议同时学习@Transaction, spring对事务的管理 spring @Transactional原理
平凡的学生族
2020/01/02
9280
Mybatis 源码分析(四)之 Mybatis 的执行流程梳理
前面了解到Mybatis的执行流程,首先读取我们的mybatis-config.xml配置文件,然后构建Configuration类,这个类会像上下文信息一样会传来传去,以便我们获取其中的信息。
zoro
2019/04/11
6390
Spring batch教程 之 配置Step「建议收藏」
正如在Batch Domain Language中叙述的,Step是一个独立封装域对象,包含了所有定义和控制实际处理信息批任务的序列。这是一个比较抽象的描述,因为任意一个Step的内容都是开发者自己编写的Job。一个Step的简单或复杂取决于开发者的意愿。一个简单的Step也许是从本地文件读取数据存入数据库,写很少或基本无需写代码。一个复杂的Step也许有复杂的业务规则(取决于所实现的方式),并作为整个个流程的一部分。
全栈程序员站长
2022/09/02
4.3K0
MySQL中流式查询使用
MySQL 是目前使用比较广泛的关系型数据库,而从数据库里面根据条件查询数据到内存的情况想必大家在日常项目实践中都有使用。
加多
2018/09/06
1.6K0
MySQL中流式查询使用
MyBatis学习笔记(一) --- MyBatis入门
MyBatis 本是apache的一个开源项目iBatis, 2010年这个项目由apache software foundation 迁移到了google code,并且改名为MyBatis 。2013年11月迁移到Github。 
挽风
2021/04/13
1.4K0
MyBatis学习笔记(一) --- MyBatis入门
快速学习Mybatis-自定义 Mybatis 框架
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
cwl_java
2019/12/11
3810
Spring Batch分析(一)
前面两篇文章,对于SpringBatch这个批处理框架做了一个大概的学习和了解,通过前两篇文章,你可以了解到SpringBatch是什么?应用场景有哪些?怎么去写一个SpringBatch的demo?以及SpringBatch的架构设计和核心组件的简单介绍。
xdd
2022/07/12
1.9K0
Spring Batch分析(一)
MyBatis从入门到精通(七)—源码剖析之Configuration、SqlSession、Executor、StatementHandler细节
MyBatis在初始化的时候,会将MyBatis的配置信息全部加载到内存中,使用org.apache.ibatis.session.Configuration 实例来维护。 下⾯进⼊对配置⽂件解析部分:
共饮一杯无
2022/11/28
1.1K0
一文搞懂Mybatis执行原理
MyBatis是一个Dao层映射框架,底层还是用的JDBC来访问数据库,在学习MyBatis之前有必要先回顾一下JDBC的执行过程:
闫同学
2023/11/06
5890
自定义mybatis解析
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
多凡
2019/11/01
6870
手写Mybatis源码(原来真的很简单!!!)
主要分两部分,项目使用端:平常写代码所说的后台服务;持久层框架:即项目使用端引入的jar包
Java微观世界
2025/01/21
1620
手写Mybatis源码(原来真的很简单!!!)
相关推荐
MyBatis中使用流式查询避免数据量过大导致OOM
更多 >
交个朋友
加入腾讯云官网粉丝站
蹲全网底价单品 享第一手活动信息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档