好久未更。
抱着学Flink的心,没想到先试水最基本的mapreduce了。由于项目不便于公开,所以这里故事描述会进行一些演义,尽量不影响看官们理解。
电流在两个电站间进行高压传输时会有能量衰耗,明显的差异时收端电压是略低于发端电压的,这个是电压衰耗(个人杜撰)。当电压衰耗达到一个阈值时可能会对用电造成问题。
已经具备的前提:
首先这是一个离线计算问题,采用mapreduce或者spark都可以,由于对效率不是很敏感,所以选择了mapreduce。
然后对问题进行分解:
mapreduce在处理数据时不会做时间上的保序,所以需要在reducer中开辟缓存进行聚合。
这里不着重介绍mapper和reducer的代码了,其他博主的例子都很多了。
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create(); // 基于HBaseConfiguration创建配置对象
conf.set("zookeeper.znode.parent", "/hbase"); // 设置hbase zk的根目录
conf.set("hbase.zookeeper.quorum", "127.0.0.1"); // 设置hbase zk的IP
conf.set("hbase.zookeeper.property.clientPort", "2181"); // 设置hbase zk的端口
conf.setLong("start-time", 1577808000000); // 设置计算范围的起始时间戳
conf.setLong("end-time", 1609430399000); // 设置计算范围的结束时间戳
TopoFetcher topo = new TopoFetcher(c.getCtrlUserName(), c.getCtrlPassword(), c.getCtrlAddr()); // 获取电网拓扑,请忽略
topo.updateTopo();
PhyNodeMap nodeMap = topo.getPhyNodeMap();
ConfigurationUtil.setClass("node-map", conf, nodeMap); // 注意:Configuration不能存放对象,需要将对象序列化成json串存储,然后在工作节点取出反序列为对象
PhyLinkMap linkMap = topo.getPhyLinkMap();
ConfigurationUtil.setClass("link-map", conf, linkMap); // ConfigurationUtil是私有实现的
System.setProperty("HADOOP_USER_NAME", "root");
Path tmpDir = new Path("hdfs://127.0.0.1:9000/tmp_dir"); // 创建临时目录作为job1的输出(即job2的输入)
FileSystem.get(conf).deleteOnExit(tmpDir); // 当主程序退出时会删除临时目录
// 一阶段job将hbase的scan作为输入流
List<Scan> list = new ArrayList<Scan>();
Scan scan = new Scan();
// scan.setCaching(1000); // 这里不设置会使用缺省cache size
scan.setCacheBlocks(false);
scan.setTimeRange(c.getStartTime()-120000, c.getEndTime()+120000); // 注意:设置扫描时间戳范围,这里的时间戳是hbase记录的入库时间戳,即使hbase是实时数据库,还是将扫描范围前后各加2分钟
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, "voltage".getBytes()); // 设置扫描的表名
list.add(scan);
// job1设置
Job job1 = Job.getInstance(conf, "job1");
job1.setJarByClass(MyMapreduce.class);
TableMapReduceUtil.initTableMapperJob(list, Mapper1.class, Text.class, PortPmWritable.class, job1); // 里面会执行setMapperClass
job1.setReducerClass(Reducer1.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
job1.setMapOutputValueClass(SiteVoltageWritable.class); // 注意:当mapper和reducer的输出key和value一致时这里可不设置,如果设置会单独覆盖mapper
job1.setNumReduceTasks(8); // 设置Reducer工作节点的数量,一般为CPU核数,如果不设置缺省为1
FileOutputFormat.setOutputPath(job1, tmpDir); // 这里缺省设置OutputFormat是Text
if (!job1.waitForCompletion(true)) {
System.exit(1);
}
// 二阶段job,设置mysql数据库
String driverClass = "com.mysql.cj.jdbc.Driver";
String url = "jdbc:mysql://127.0.0.1:3306/testdb";
DBConfiguration.configureDB(conf, driverClass, url, "root", "root");
Job job2 = Job.getInstance(conf, "job2");
job2.setJarByClass(MyMapreduce.class);
job2.setMapperClass(Mapper2.class);
job2.setReducerClass(Reducer2.class);
job2.setOutputKeyClass(MyDbWritable.class);
job2.setOutputValueClass(MyDbWritable.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job2, tmpDir); // 将hdfs目录下的文件作为输入流
job2.setInputFormatClass(KeyValueTextInputFormat.class); // 注意:因为job1的输出是key-value格式,所以这里的输入格式要指定是key-value格式的Text
job2.setOutputFormatClass(DBOutputFormat.class); // 设置输出格式是数据库
DBOutputFormat.setOutput(job2, "link_unavl", "link_id", "ts", "unavl"); // 第二个参数是数据库表名,后面参数是表中列名(按顺序)
job2.setNumReduceTasks(8);
if (!job2.waitForCompletion(true)) {
System.exit(1);
}
System.exit(0);
}
讨论区欢迎提问^^
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。