平时用于从生产环境hbase到导出数据到测试环境。
导入数据:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
@SuppressWarnings("deprecation")
public class HbaseImport {
public static void main(String args[]) throws Exception{
Configuration conf = HBaseConfiguration.create();
HConnection connection = HConnectionManager.createConnection(conf);
List<Map<String,String>> datas = getDatas("d:\\hbaseData\\datas.txt",connection);
wirteHbase(connection,"EVENT_LOG_LBS",datas);
}
public static List<Map<String,String>> getDatas(String filePath,HConnection connection) throws IOException{
List<Map<String,String>> datas = new ArrayList<Map<String,String>>();
File file = new File(filePath);
BufferedReader br = new BufferedReader(new FileReader(file));
String tr = null;
while(((tr = br.readLine()) != null)){
String subData = tr.substring(1);
Map<String,String> data = new HashMap<String,String>();
String[] ss = subData.split("\\|");
for(String s : ss){
String[] tds = s.split("=");
String v = "";
if(tds.length == 2){
v = tds[1];
}
data.put(tds[0], v);
}
datas.add(data);
}
br.close();
return datas;
}
public static void wirteHbase(HConnection connection,String tableName,List<Map<String,String>>datas) throws IOException{
HTableInterface t = connection.getTable(tableName);
for(Map<String,String> map : datas){
Set<String> ks = map.keySet();
Put put = new Put(Bytes.toBytes(map.get("rowkey")));
for(String key : ks){
put.add(Bytes.toBytes("f1"),Bytes.toBytes(key),Bytes.toBytes(map.get(key)));
}
t.put(put);
}
}
}
导出数据:
package hbase;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@SuppressWarnings("deprecation")
public class HbaseExport {
public static Date getPassSevenDays(int day){
Calendar calendar = Calendar.getInstance();
int year = calendar.get(Calendar.YEAR);
int dayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
int j = 0;
for(int i = 0;i < day; i++){
calendar.set(Calendar.DAY_OF_YEAR, dayOfYear - j);
if(calendar.get(Calendar.YEAR) < year){
//跨年了
j = 1;
//更新 标记年
year = year + 1;
//重置日历
calendar.set(year, Calendar.DECEMBER,31);
//重新获取dayOfYear
dayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
}else{
j = j + 1;
}
}
return calendar.getTime();
}
public static Scan setScanCondition(Scan scan) throws IOException{
Date newDay = new Date();
Date otherDays = getPassSevenDays(7);
scan.setTimeRange(otherDays.getTime(),newDay.getTime());
scan.addColumn("f1".getBytes(), "LS_certifier_no".getBytes());
scan.addColumn("f1".getBytes(), "LS_location".getBytes());
scan.addColumn("f1".getBytes(), "LS_phone_no".getBytes());
scan.addColumn("f1".getBytes(), "LS_longitude".getBytes());
scan.addColumn("f1".getBytes(), "LS_latitude".getBytes());
scan.addColumn("f1".getBytes(), "date".getBytes());
scan.addColumn("f1".getBytes(), "time".getBytes());
scan.addColumn("f1".getBytes(), "hourOfDay".getBytes());
return scan;
}
public static void main(String args[]) throws IOException{
Configuration conf = HBaseConfiguration.create();
HConnection connection = HConnectionManager.createConnection(conf);
String tableName = "EVENT_LOG_LBS_HIS";
HTableInterface table = connection.getTable(tableName);
Scan scan = new Scan();
setScanCondition(scan);
ResultScanner rs = table.getScanner(scan);
for(Result r : rs){
List<String> lines = new ArrayList<String>();
StringBuilder sb = new StringBuilder();
sb.append(" rowkey=" + Bytes.toString(r.getRow()));
for(Cell cell : r.rawCells()){
String name = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(name + "=" + value);
sb.append("|" + name + "=" + value);
}
lines.add(sb.toString());
System.out.println("--------------------------");
writeFile(lines,"/home/hdfs/datas");
}
}
public static void writeFile(List<String> lines,String filePath) throws FileNotFoundException{
File file = new File(filePath);
PrintWriter pw = new PrintWriter(new FileOutputStream(file,true));
for(String line : lines){
pw.append(line);
pw.append("\n");
}
pw.flush();
pw.close();
}
}