前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >discuz论坛apache日志hadoop大数据分析项目:hive以及hbase是如何入库以及代码实现

discuz论坛apache日志hadoop大数据分析项目:hive以及hbase是如何入库以及代码实现

作者头像
用户1410343
发布2018-03-27 10:52:28
8620
发布2018-03-27 10:52:28
举报
文章被收录于专栏:about云

about云discuz论坛apache日志hadoop大数据分析项目: 数据时如何导入hbase与hive的到了这里项目的基本核心功能已经完成。这里介绍一下hive以及hbase是如何入库以及代码实现。 首先我们将hbase与hive整合,详细参考 about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的 about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的 整合完毕,我们就可以通过mapreduce把数据导入hbase,当然在导入hbase的同时,hive数据同时也可以查询出结果。 那么我们是如何导入hbase的,思路前面已经介绍,这里采用的是hbase put。以后的版本中,我们将采用多种方法来实现此功能包括hive分区、hbase后面如果遇到问题,我们可能还会重构。 开发环境介绍: 1.Eclipse 2.Hadoop2.2 3.hbase-0.98.3-hadoop2 思路: 在导入hbase的过程中,我们直接使用了mapreduce中的map函数,reduce在这里对我们没有太大的用处,我们这里借助的是mapreduce的分布式,提高查询效率。 mapreduce中map函数主要实现了哪些功能 1.清洗数据 通过

  1. public static void StringResolves(String line, Context context)

函数实现 2.数据的导入 通过public static void addData(String rowKey, String tableName, String[] column1, String[] value1, Context context) 函数实现

下面贴上代码: HbaseMain.java代码

代码语言:javascript
复制
package www.aboutyun.com;

import java.io.IOException;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class HbaseMain {



       static final String INPUT_PATH = "hdfs://master:8020/test.txt";

       static final String OUT_PATH = "hdfs://master:8020/Output";



       public static void main(String[] args) throws IOException,

                       InterruptedException, ClassNotFoundException {



               // 主类

               Configuration conf = new Configuration();

               Job job = Job.getInstance(conf, HbaseMain.class.getSimpleName());

               job.setJarByClass(HbaseMain.class);

               // 寻找输入

               FileInputFormat.setInputPaths(job, INPUT_PATH);

               // 1.2对输入数据进行格式化处理的类

               job.setInputFormatClass(TextInputFormat.class);

               job.setMapperClass(HbaseMap.class);

               // 1.2指定map输出类型<key,value>类型

               job.setMapOutputKeyClass(Text.class);

               job.setMapOutputValueClass(LongWritable.class);

               job.setNumReduceTasks(0);

               // 指定输出路径

               FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

 

               job.waitForCompletion(true);



       }

}


HbaseMap.java代码

代码语言:javascript
复制
package www.aboutyun.com;



import java.io.IOException;

import java.text.DateFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Locale;

import java.util.Random;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;



public class HbaseMap extends Mapper<LongWritable, Text, Text, IntWritable> {

       private static Configuration conf = null;

       /**

        * 初始化配置

        */



       static {

               conf = HBaseConfiguration.create();

               conf.set("hbase.zookeeper.quorum", "master");// 使用eclipse时必须添加这个,否则无法定位

               conf.set("hbase.zookeeper.property.clientPort", "2181");

       }



       /**************************************************************************/

       public void map(LongWritable key, Text line, Context context)

                       throws IOException, InterruptedException {



               try {

                       StringResolves(line.toString(), context);

               } catch (ParseException e) {

                       // TODO Auto-generated catch block

                       e.printStackTrace();

               }



       }



       /**************************************************************************/

       // 字符串解析



       public static void StringResolves(String line, Context context)

                       throws ParseException {

               String ipField, dateField, urlField, browserField;



               // 获取ip地址

               ipField = line.split("- -")[0].trim();



               // 获取时间,并转换格式

               int getTimeFirst = line.indexOf("[");

               int getTimeLast = line.indexOf("]");

               String time = line.substring(getTimeFirst + 1, getTimeLast).trim();

               Date dt = null;

               DateFormat df1 = DateFormat.getDateTimeInstance(DateFormat.LONG,

                               DateFormat.LONG);

               dt = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.US)

                               .parse(time);

               dateField = df1.format(dt);

               SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHMM");

               String dateField1 = sdf.format(dt);

               // 获取url

               String[] getUrl = line.split("\"");



               String firtGeturl = getUrl[1].substring(3).trim();



               String secondGeturl = getUrl[3].trim();

               urlField = firtGeturl + "分隔符" + secondGeturl;



               // 获取浏览器

               String[] getBrowse = line.split("\"");

               String strBrowse = getBrowse[5].toString();

               String str = "(KHTML, like Gecko)";

               int i = strBrowse.indexOf(str);

               strBrowse = strBrowse.substring(i);

               String strBrowse1[] = strBrowse.split("\\/");

               strBrowse = strBrowse1[0].toString();

               String strBrowse2[] = strBrowse.split("\\)");

               browserField = strBrowse2[1].trim();



               // 添加到数据库



               String rowKey = ipField + dateField1 + urlField

                               + new Random().nextInt();

               String[] cols = new String[] { "IpAddress", "AccressTime", "Url",

                               "UserBrowser", };

               String[] colsValue = new String[] { ipField, dateField, urlField,

                               browserField };



               try {

                       addData(rowKey, "LogTable", cols, colsValue, context);

                       context.write(new Text("1"), new IntWritable(1));



               } catch (IOException | InterruptedException e) {

                       // TODO Auto-generated catch block

                       e.printStackTrace();

               }

       }



       /*

        * 为表添加数据(适合知道有多少列族的固定表)

        * 

        * @rowKey rowKey

        * 

        * @tableName 表名

        * 

        * @column1 第一个列族列表

        * 

        * @value1 第一个列的值的列表

        */

       public static void addData(String rowKey, String tableName,

                       String[] column1, String[] value1, Context context)

                       throws IOException {



               Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey

               HTable table = new HTable(conf, Bytes.toBytes(tableName));// HTabel负责跟记录相关的操作如增删改查等//

                                                                                                                                       // 获取表

               HColumnDescriptor[] columnFamilies = table.getTableDescriptor() // 获取所有的列族

                               .getColumnFamilies();



               for (int i = 0; i < columnFamilies.length; i++) {

                       String familyName = columnFamilies[i].getNameAsString(); // 获取列族名

                       if (familyName.equals("Info")) { // info列族put数据

                               for (int j = 0; j < column1.length; j++) {

                                       put.add(Bytes.toBytes(familyName),

                                                       Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));

                               }

                       }



               }

               table.put(put);

               // context.write(new Text(rowKey), null);

               System.out.println("add data Success!");

       }



}




后面我们将会不断完善此功能。

上面的一些准备工作,就不要说了,这里展现一下运行后的效果:
hive效果图

Hbase效果图

这样就达到了效果。后面我们使用hive统计,然后通过将统计结果展示,项目基本完成,后面就不断完善即可。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 About云 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档