前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >超大csv解析攻略

超大csv解析攻略

作者头像
林老师带你学编程
发布2019-09-18 10:56:39
1.7K0
发布2019-09-18 10:56:39
举报
文章被收录于专栏:强仔仔

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

本文链接:https://blog.csdn.net/linzhiqiang0316/article/details/100864935

前段时间遇到这样一个需求,解析csv文件数据,将数据封装批量插入数据库中。

咋一看确实没什么问题,但是看到文件大小的时候,差点没吐一口老血,文件大小2.1g!!!如果用传统的方式直接将csv文件流按行解析,然后封装成po对象,结果很明显,优雅的OOM掉了。

所以为了能够成功解析这个超大文件,博主查阅了大量的博客,终于攻克这个问题了。因为这个坑相对比较大,所以这边给大家分享一下,博主的心路历程,希望大家以后可以不掉到这个坑里面。

方案研究:

万能的钱

其实基于这种超大文件解析,有很多方案,最实在的办法就是加钱,把自己服务器内存怼上去,但是很可惜,我们公司没钱,所以只能从代码层面解决了。

现有工具

一开始博主也是想着,有没有现成的工具可以直接拿来使用,后来很遗憾的发现没有这样的工具,所以只能自己来开发这个工具了。

当然有可能是有这样的工具,但是博主没有发现,如果大家有更好的方案可以在文章下方留言哦。

核心问题点

解析超大csv文件且不会内存溢出,最常见的方案就是按行解析。这样的好处就是不仅可以快速解析,而且不会有内存溢出的风险。

传统流解析

那我们该如何实现按行解析的功能呢?之前博主想过直接用字符流,然后readLine()方法进行解析,但是如果你只解析前半截还好,如果你想解析最后一行,那就必须将前面的所有数据也加载内存中,所以这种按行读取也没有解决根本问题。

随机读写

那我们应该怎么办呢?大家不要着急,我们可以使用RandomAccessFile工具类来实现真正的按行解析。通过RandomAccessFile工具,我们可以跳到任意位置进行解析,但是这边大家需要注意的是,RandomAccessFile工具的下标单位是字节,所以没有readLine()这边简便的方案,所以是否解析到行数据,需要我们自己去判断。

善用工具

因为是csv文件解析,这边我用的是CsvParser工具来进行csv解析(CsvParser据官网介绍,它的解析速度在同类工具中,也是数一数二的存在)。

方案设计

那原理介绍完毕之后,我们该如何来设计这个流程呢?因为就算可以按行解析,但是数据一多也会出现问题,这边博主想到两种方案,下面给大家详细介绍一下。

休眠模式解析

从上面流程图可以看出来,第一种解析方案主要通过Thread.sleep(),线程休眠的方式实现批量按行解析的。

这种方案的好处是不需要借助第三方工具就可以完成,实现起来简单省事。

但是缺点也异常的明显,这种方案虽然可以在线程休眠期间,通过jvm垃圾回收来保障内存不会OOM,但是这种方式不稳定性因素太多,还是有可能发生OOM的风险。而且因为是通过线程休眠,所以单次执行的时间会非常久,有可能会导致线程崩溃等不可控问题发生。

MQ异步解析

通过MQ异步解析方案流程如上所示,这种方案的好处非常明显, 每次消费消息只解析一部分的数据,如果消费完毕之后,发现不是最后一条数据,则接着发送MQ消息,等待下次解析。通过这种异步方式,我们完全不用担心会出现上述的内存OOM等问题,但是这种方案技术实现比较困难,没有线程休眠的方式简便。

代码展示:

说了这么多,我们来具体看看代码的实现吧,毕竟理论再完善,如果没有代码也是扯淡。核心代码如下所示:

代码语言:javascript
复制
  /**
     * csv文件解析(文件部分解析)
     *
     * @param sourcePath
     * @param charset
     * @param queryRows
     * @param position
     * @param isFirst
     * @throws IOException
     */
    public static CsvDateDto readFileForPart(String sourcePath, String charset, long position, long queryRows, boolean isFirst) throws Exception {
        CsvDateDto csvDateDto = new CsvDateDto();
        InputStream input = null;
        BufferedInputStream bufferedInputStream = null;
        BufferedReader reader = null;
        InputStreamReader inputStreamReader = null;
        // 全局csv数据
        List<String[]> globalRows = new ArrayList<>();
        try {
            //源文件
            File files = new File(sourcePath);
            //得到映射读文件的通道
            FileChannel channels = new RandomAccessFile(files, "r").getChannel();
            // 声明读源文件对象
            MappedByteBuffer mappedByteBuffer = null;
            // 文件总大小
            long size = files.length();
            // 需要获取的行数
            queryRows = position + queryRows;
            if (queryRows > size) {
                throw CsvFileException.READ_FILE_SIZE_EXCEED_EXCEPTION;
            } else if (queryRows <= 0) {
                throw CsvFileException.READ_FILE_SIZE_EXCEED_EXCEPTION;
            } else {
                size = queryRows;
            }
            // 每次循环读取的固定个数
            long pageSize = getPageSize(position, size);
            //初始读、写大小
            long readSize = pageSize;
            // 最后一次读取位置
            long lastPosition = 0;
            boolean lastPositionFlag = false;
            // 换行的次数,用来过滤头节点
            long count = 0;
            long brCount = 0;
            // 文件的position开始位置(从第二行开始)
            long startPosition = 0;
            // 临时文件字节数组
            byte[] tempReadDataForByte = null;
            while (position < size) {
                input = null;
                count++;
                //每次读源文件都重新构造对象
                mappedByteBuffer = channels.map(FileChannel.MapMode.READ_ONLY, position, readSize);
                // 文件字节数组
                byte[] readDataForByte = new byte[(int) readSize];
                // 换行位置标志
                boolean lastBrFlag = false;
                // 标志的位置
                int lastBrIndex = 0;
                for (int i = 0; i < readSize; i++) {
                    //从源文件读取字节
                    readDataForByte[i] = mappedByteBuffer.get(i);
                    // 最后一次循环
                    if ((position + readSize) == size) {
                        lastPositionFlag = true;
                    }
                    // byte的换行符号
                    if (readDataForByte[i] == 10) {
                        lastBrIndex = i;
                        lastBrFlag = true;
                        if (startPosition == 0) {
                            // 将index坐标赋值给startPosition
                            startPosition = lastBrIndex + 1;
                        }
                    }
                }
                if (startPosition != 0 && brCount == 0) {
                    brCount = count;
                }
                // 如果count=1,代表找到首行位置已经确定
                if (isFirst && count == brCount && startPosition != 0) {
                    readSize = lastBrIndex + 1;
                    if (readSize > startPosition) {
                        int newSize = (int) (lastBrIndex - startPosition);
                        tempReadDataForByte = new byte[newSize];
                        int j = 0;
                        for (int i = (int) startPosition; i < lastBrIndex; i++) {
                            tempReadDataForByte[j] = readDataForByte[i];
                            j++;
                        }
                        input = new ByteArrayInputStream(tempReadDataForByte);
                    }
                    if (input == null) {
                        //累加每次读写的字节
                        position += readSize;
                    }
                } else {
                    // 读取到是数据不是最后一行,需要对byte进行过滤
                    if (lastBrFlag && readSize != lastBrIndex) {
                        readSize = lastBrIndex + 1;
                        tempReadDataForByte = new byte[(int) lastBrIndex];
                        for (int i = 0; i < lastBrIndex; i++) {
                            tempReadDataForByte[i] = readDataForByte[i];
                        }
                        input = new ByteArrayInputStream(tempReadDataForByte);
                    } else {
                        // 如果lastBrFlag=true,说明本次读取到换行
                        if (lastBrFlag) {
                            input = new ByteArrayInputStream(readDataForByte);
                        }
                    }
                }
                if (lastBrFlag && input != null) {
                    // bufferedInputStream读取数据
                    bufferedInputStream = new BufferedInputStream(input);
                    // 封装为字符流
                    inputStreamReader = new InputStreamReader(bufferedInputStream, Charset.forName(charset));
                    // 封装为字符缓存流
                    reader = new BufferedReader(inputStreamReader, 1 * 1024 * 1024);
                    // 从reader中获取解析的记录
                    List<String[]> rows = getRowsData(reader, false).getRows();
                    globalRows.addAll(rows);
                    // 清空集合,防止OOM
                    rows.clear();
                    //累加每次读写的字节
                    position += readSize;
                }
                // 最后一次循环标志为true
                if (lastPositionFlag) {
                    lastPosition = position;
                    break;
                }
                //获取下一页size
                readSize = getNextPageSize(size, position);
            }
            // 是否是最后一次调度数据
            if (lastPosition == files.length()) {
                csvDateDto.setLast(true);
            } else {
                csvDateDto.setLast(false);
            }
            csvDateDto.setLastPosition(lastPosition);
            csvDateDto.setGlobalRows(globalRows);
            return csvDateDto;
        } catch (IOException e) {
            logger.error("readFile--IO转化异常,错误信息为:{}", ExceptionUtil.formatException(e));
            throw FileParseException.READ_FILE_EXCEPTION;
        } finally {
            // 释放流资源
            if (input != null) {
                input.close();
            }
            if (bufferedInputStream != null) {
                bufferedInputStream.close();
            }
            if (reader != null) {
                reader.close();
            }
            if (inputStreamReader != null) {
                inputStreamReader.close();
            }
        }
    }

数据测试:

代码语言:javascript
复制
CsvDateDto csvDateDto = CsvFileUtil.readFileForPart("E:/home/data/test-2.csv", "utf-8", 0, 1024, false);
System.out.println("下一行开始坐标:"+csvDateDto.getLastPosition());
List<String[]> rows =  csvDateDto.getGlobalRows();
for (String[] row : rows) {
    System.out.println("解析数据:"+Arrays.toString(row));
}

测试结果:

代码语言:javascript
复制
下一行开始坐标:765
解析数据:[1436, 27, 33, 173, 3354.03, 14/3/2018 15:10:50, 5/6/2018 13:40:37, 14/3/2018, 199, us, null, 3354.03, 96100, 454, 165.96, 368.82, 0, 165.96, 368.82, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0, 3354.03, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, null, null]
解析数据:[1440, 27, 33, 203, 3887.90, 14/3/2018 16:15:38, 13/7/2018 19:33:19, 13/3/2018, 253, us, null, 3887.90, 71271, 367, 130.82, 379.77, 0, 130.82, 379.77, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0, 3887.90, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, null, null]

可以看到我们解析了两行的数据,第三行的下一条开始坐标为765。

总结:

博主还是比较推荐采用MQ异步的方案,毕竟系统安全稳定比什么都重要。

大家以为这样就结束了吗?不不不!!!不管是采用MQ异步,还是线程休眠的方式都有一个很大的缺陷,那就是解析完csv时间会很久。如果系统对这个时效性要求比较高,那这两种方案都会被pass掉,那我们要如何进行改造呢?哈哈哈,这个坑就由聪明的童鞋们来思考喽~

今天的内容就讲到这边了,谢谢大家的阅读。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019年09月15日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档