csv的数据格式。
一个文件好几百兆,1个文件大概200万行左右的数据,现在我要解决的问题是,将 csv的数据读出来,组合数据,生成sql文件。
以前单线程跑,跑了一下午才完成,大概跑了几个小时。多线程跑,大概2-3分钟左右,200万条数据,包括过滤。
这个场景在平常开发中也是经常要用到的。发出来,希望大家能够指导学习~
优化版地址:http://my.oschina.net/u/1017195/blog/195508
package test.com.linapex.room;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import com.linapex.common.util.FileUtils;
import com.linapex.common.util.ZhengzeValidate;
public class TBuilderRoomSqlFileTool
{
final static int DATACACHENUM = 10000;
static int currThreadCount = 0;
static int maxThreadCount = 10;
static File roomFilterLogFile = new File("roomFilter.log");
static File sqlFile = new File("roomSql.sql");
static File csvFile = new File("D:\\baiduyundownload\\asd\\2000W\\1-200W.csv");
final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');";
public static BufferedWriter initSQLWrite() throws Exception
{
if (sqlFile.exists())
{
sqlFile.delete();
if (!sqlFile.createNewFile())
{
System.err.println("创建文件失败:" + sqlFile.getAbsolutePath());
}
}
return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8"));
}
public static void loadCSV(CallBack2 callBack) throws Exception
{
BufferedReader reader = null;
try
{
reader = new BufferedReader(new FileReader(csvFile));
String str = null;
int num = 0;
while ((str = reader.readLine()) != null)
{
num++;
callBack.call(num, str);
}
} finally
{
reader.close();
}
}
public static void main(String[] args) throws Exception
{
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount);
final List<Future<Object>> threadResultList = new ArrayList<Future<Object>>();
final WriteSqlHandle writeSqlFile = new WriteSqlHandle(initSQLWrite(), DATACACHENUM);
long begin = System.currentTimeMillis();
loadCSV(new CallBack2()
{
@Override
public void call(int num, String str)
{
String[] strs = str.split(",");
if (strs.length < 8)
{
writeLog("此条数据不录入::0", Arrays.toString(strs));
return;
}
String name = strs[0].trim();
if (!ZhengzeValidate.isChina(name))
{
writeLog("此条数据不录入::0", Arrays.toString(strs));
return;
}
try
{
String card = strs[4];
String gender = strs[5];
String birthday = strs[6];
String address = strs[7];
String zip = strs[8];
String mobile = strs[20];
String email = strs[22];
String version = strs[31];
//生成sql语句
final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version);
//添加数据,如果超出了缓存数据,则 开始写入文件系统
if (writeSqlFile.add(tempSql))
{
currThreadCount++;
//如果提交的线程过多,则取回之后再提交.
if (currThreadCount >= maxThreadCount)
{
System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
for (Future<Object> fs : threadResultList)
{
try
{
fs.get();
currThreadCount--;
System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount));
} catch (Exception e)
{
e.printStackTrace();
}
}
threadResultList.clear(); //清空
currThreadCount = threadResultList.size();
System.out.println(String.format("重新开始提交线程 当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
}
Future future = threadPool.submit(new Runnable()
{
@Override
public void run()
{
try
{
writeSqlFile.save();
} catch (Exception e)
{
e.printStackTrace();
}
}
});
threadResultList.add(future);
// System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num));
}
} catch (Exception e)
{
writeLog("录入错误的数据::0", Arrays.toString(strs));
writeLog("错误的原因::0", e.getMessage());
}
}
});
writeSqlFile.flush();
threadPool.shutdown();
long end = System.currentTimeMillis() - begin;
System.out.println(String.format("任务完成时间:%s", end));
}
public static void writeLog(String str, Object... values)
{
FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false);
}
public static String tm(String strSource, Object... values)
{
if (strSource == null)
{
return null;
}
StringBuilder builder = new StringBuilder(strSource);
final String prefix = ":";
for (int index = 0; index < values.length; index++)
{
String value = values[index].toString();
if (value == null)
{
continue;
}
String key = new StringBuilder(prefix).append(index).toString();
int i = -1;
if ((i = builder.indexOf(key, i)) > -1)
{
int len = key.length();
builder.replace(i, i + len, value);
}
}
return builder.toString();
}
}
class WriteSqlHandle
{
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
WriteLock writeLock = readWriteLock.writeLock();
List<String> cacheList;
BufferedWriter bw;
int dataCacheNum;
public WriteSqlHandle(BufferedWriter bw)
{
this.bw = bw;
cacheList = new ArrayList<String>();
}
public WriteSqlHandle(BufferedWriter bw, int dataCacheNum)
{
this.bw = bw;
this.dataCacheNum = dataCacheNum;
cacheList = new ArrayList<String>(dataCacheNum);
}
public boolean add(String sqlStr)
{
writeLock.lock();
cacheList.add(sqlStr);
writeLock.unlock();
return cacheList.size() >= dataCacheNum;
}
public void save() throws Exception
{
writeLock.lock();
long begin = System.currentTimeMillis();
System.out.println(String.format("%s,准备消费 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));
for (String str : cacheList)
{
bw.write(str + "\r\n");
}
long end = System.currentTimeMillis() - begin;
System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), end, cacheList.size()));
cacheList.clear(); //清空数据.
writeLock.unlock();
}
public void flush() throws Exception
{
System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));
for (String str : cacheList)
{
bw.write(str + "\r\n");
}
System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size()));
cacheList.clear(); //清空数据
closeWrite();
}
private void closeWrite() throws Exception
{
bw.flush();
bw.close();
}
}
interface CallBack2
{
void call(int num, String str);
}
如果需要测试代码的朋友,请修改
1、FileUtils.doWriteFile 改成 System.out 输出
2、将数据过滤去掉即可。
输出的日志,经过优化,代码效率从2-3分钟提高了1分钟多点:
pool-1-thread-8,准备消费 需要保存数据的集合长度:0
pool-1-thread-8,消费完成,耗费时间:0 ms,消费数据长度:0
已回调线程数:8
已回调线程数:9
pool-1-thread-9,准备消费 需要保存数据的集合长度:0
pool-1-thread-9,消费完成,耗费时间:0 ms,消费数据长度:0
已回调线程数:10
重新开始提交线程 当前线程数:0 允许最大线程数:10 等待线程完成回调.
pool-1-thread-10,准备消费 需要保存数据的集合长度:4
pool-1-thread-10,消费完成,耗费时间:0 ms,消费数据长度:4
flush线程:main, 需要保存数据的集合长度:9102
flush线程:main, 消费完成,消费数据长度:9102
任务完成时间:104797 ms
不允许转载~~~只能看,不能摸/偷笑
(adsbygoogle = window.adsbygoogle || []).push({});