CMCC旗下拥有很多的子机构,基本可以按照省份划分. 而各省份旗下的充值机构也非常的多.
目前要想获取整个平台的充值情况,需要先以省为单元,进行省份旗下的机构统计,然后由下往上一层一层的统计汇总,过程太过繁琐,且统计周期太长. 且充值过程中会涉及到中国移动信息系统内部各个子系统之间的接口调用, 接口故障监控也成为了重点监控的内容之一.为此建设一个能够实时监控全国的充值情况的平台, 掌控全网的实时充值, 各接口调用情况意义重大.
难点分析
移动公司旗下子充值机构众多, 充值数据量大.
数据实时性要求高
可用技术选型
实时流式计算框架 Storm
实时流式计算框架 Spark Streaming
实时流式计算框架 Flink
对比分析
项目数据量
1. 每天的业务概况
统计全网的充值订单量、充值金额、充值成功率、及充值平均时长
2. 每小时的业务办理趋势
主要统计全网的订单量数据和成功率.
3.业务质量
统计每个省充值业务的失败量
4. 统计实时充值业务办理信息
统计每分钟的充值金额和充值数量
1.4 日志数据
{"bussinessRst":"0000","channelCode":"6900","chargefee":"1000","clientIp":"117.136.79.101","gateway_id":"WXPAY","interFacRst":"0000","logOutTime":"20170412030030067","orderId":"384663607178845909","payPhoneNo":"15015541313","phoneno":"15015541313","provinceCode":"200","rateoperateid":"1513","receiveNotifyTime":"20170412030030017","requestId":"20170412030007090581518228485394","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"payNotifyReq","shouldfee":"1000","srcChannel":"11","sysId":"01"}
{"bussinessRst":"0000","channelCode":"6900","chargefee":"1000","clientIp":"117.136.79.101","endReqTime":"20170412030030230","idType":"01","interFacRst":"0000","logOutTime":"20170412030030230","orderId":"384663607178845909","prodCnt":"1","provinceCode":"200","requestId":"20170412030007090581518228485394","retMsg":"成功","serverIp":"172.16.59.241","serverPort":"8088","serviceName":"sendRechargeReq","shouldfee":"1000","startReqTime":"20170412030030080","sysId":"01"}
{"bussinessRst":"0000","channelCode":"0702","chargefee":"1000","clientIp":"101.204.129.105","gateway_id":"CMPAY","interFacRst":"0000","logOutTime":"20170412030031580","orderId":"384663613178752811","payPhoneNo":"","phoneno":"18200222444","provinceCode":"280","rateoperateid":"1514","receiveNotifyTime":"20170412030031554","requestId":"20170412030013393282687799171031","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"payNotifyReq","shouldfee":"995","srcChannel":"00","sysId":"01"}
{"bussinessRst":"0000","channelCode":"0702","chargefee":"1000","clientIp":"101.204.129.105","endReqTime":"20170412030031698","idType":"01","interFacRst":"0000","logOutTime":"20170412030031698","orderId":"384663613178752811","prodCnt":"1","provinceCode":"280","requestId":"20170412030013393282687799171031","retMsg":"成功","serverIp":"172.16.59.241","serverPort":"8088","serviceName":"sendRechargeReq","shouldfee":"995","startReqTime":"20170412030031592","sysId":"01"}
{"bussinessRst":"0000","channelCode":"6900","chargefee":"20000","clientIp":"112.17.244.230","gateway_id":"WXPAY","interFacRst":"0000","logOutTime":"20170412030032234","orderId":"384663617163048689","payPhoneNo":"15857207376","phoneno":"15857207376","provinceCode":"571","rateoperateid":"","receiveNotifyTime":"20170412030032194","requestId":"20170412030017876364973282669502","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"payNotifyReq","shouldfee":"20000","srcChannel":"11","sysId":"01"}
需要的文件日志已上传 https://download.csdn.net/download/qq_32539825/10739951
实时监控文件夹,有新的文件产生的时候,就会传给kafka。这里kafka的Topic,会自动创建。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/cmccdata
# 描述和配置sink组件:k1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = myTopic
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Flume生产的消息保存在Kafka中,每条消息按顺序都有一个Offset。
*如何保证kafka的数据的不丢失?
kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。
0:不等待broker返回确认消息 1:等待topic中某个partition leader保存成功的状态反馈 -1:等待topic中某个partition 所有副本都保存成功的状态反馈
*SparkStreaming如何保证消费kafka数据的数据量?
在创建sparkStreaming时,会设置一个变量, spark.streaming.kafka.maxRatePerPartition。一次拉去的数据=该数*分区数*拉取数据的时间间隔。
根据需求处理实时流数据。
防止消息重复消费
保存实时流的业务数据。分布式数据库,基于内存,速度快。
问题:如何保持offset和业务数据存储的同步
需要将offset和业务数据保存在相同的数据库,通过事务来保持同步。
因为业务的处理是在Executor处理的,而Offset的存储是在Driver端存储的。所以,处理完业务之后,首先将业务数据保存在一个临时的数据库中。在Driver端保存offset的时候,从临时数据库中读取业务处理数据,和保存offset做一个事务。同时保存到数据库。
flume+kafka+sparkStreaming+Mysql+Redis
代码结构:
cmcc_MonitorV2.scala(主类)
package cn.pig.app
import cn.pig.utils._
import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object cmcc_MonitorV2 {
def main(args: Array[String]): Unit = {
//取消日志显示
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
//SparkCOnf属性配置
val conf = new SparkConf().setAppName("中国移动实时监控平台_V2").setMaster("local[*]")
//RDD序列化 节约内存
conf.set("spark.serialize","org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.kafka.maxRatePerPartition","10000") //拉取数据
conf.set("spark.streaming.kafka.stopGracefullyOnShutdown","true") //优雅的停止
//创建SparkStreaming
val ssc = new StreamingContext(conf,Seconds(2))
/**
* 提取数据库中的存储的偏移量
*/
val currOffser: Map[TopicPartition, Long] = OffsetManager.getMyCurrentOffset
//使用广播的方式匹配省份
val provinceName: Broadcast[Map[String, AnyRef]] = ssc.sparkContext.broadcast(AppParameters.provinces)
//创建直接从kafka中读取数据的对象
val stream = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String ,String ](AppParameters.topic,AppParameters.kafkaParams,currOffser))
/**
* 开始计算
*/
stream.foreachRDD(baseRdd =>{
val offsetRanges: Array[OffsetRange] = baseRdd.asInstanceOf[HasOffsetRanges].offsetRanges
val baseData = ApiUtils.Api_BaseDate(baseRdd)
/**
* 计算每日的业务概况
*/
ApiUtils.Api_general_total(baseData)
/**
* 计算实时充值办理业务趋势
*/
ApiUtils.api_general_hour(baseData)
/**
* 计算全国各省充值业务失败量分布
*/
ApiUtils.api_general_province(baseData,provinceName)
/**
* 实时统计每分钟的充值金额和订单量
*/
ApiUtils.api_realtime_minute(baseData)
/**
* 存储偏移量
*/
OffsetManager.saveCurrentOffset(offsetRanges)
})
ssc.start()
ssc.awaitTermination()
}
}
ApiUtils.scala(4个业务处理的方法主要写在这,主类调用它)
package cn.pig.utils
import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
object ApiUtils {
/**
* 解析文件 获取基础数据 并保存到内存中
* @param baseRdd
* @return
*/
def Api_BaseDate(baseRdd: RDD[ConsumerRecord[String, String]]): RDD[(String, String, List[Double], String, String)] = {
val baseData: RDD[(String, String, List[Double], String, String)] =
baseRdd.map(rdd => JSON.parseObject(rdd.value()))
.filter(x => x.getString("serviceName").equals("reChargeNotifyReq"))
.map(rdd => {
//事物结果
val result = rdd.getString("bussinessRst")
//获得充值金额
val fee = rdd.getString("chargefee").toDouble
//获取省份
val provinceCode = rdd.getString("provinceCode")
println(provinceCode)
//获取充值得发起时间和结束时间
val requestId = rdd.getString("requestId")
//获取日期
val data = requestId.substring(0, 8)
//小时
val hour = requestId.substring(8, 10)
//分钟
val minute = requestId.substring(10, 12)
//充值结束的时间
val receiveTime = rdd.getString("receiveNotifyTime")
val time = CalculateTools.getDate(requestId, receiveTime)
val SuccedResult: (Int, Double, Long) = if (result.equals("0000")) (1, fee, time) else (0, 0, 0)
(data, hour,List[Double](1, SuccedResult._1, SuccedResult._2, SuccedResult._3), provinceCode,minute)
}).cache()
baseData
}
/**
* 业务概况
* 充值订单量、充值金额、充值时长
* @param baseData
*/
def Api_general_total(baseData: RDD[(String, String, List[Double], String,String)]) = {
baseData.map(tp => (tp._1, tp._3)).reduceByKey((list1, list2) => {
list1.zip(list2).map(tp => tp._1 + tp._2)
}).foreachPartition(partition => {
/**
* 保存到redis中
*/
val redis = Jpools.getJedis
partition.foreach(tp => {
println(tp._1)
redis.hincrBy("A-" + tp._1, "total", tp._2(0).toLong)
redis.hincrBy("A-" + tp._1, "success", tp._2(1).toLong)
redis.hincrBy("A-" + tp._1, "money", tp._2(2).toLong)
redis.hincrBy("A-" + tp._1, "time", tp._2(3).toLong)
//redis.expire("A-" + tp._1, 60 * 60 * 48)
})
redis.close()
})
}
/**
* 实时充值办理业务趋势
* 每小时的订单量和成功的订单量
* @param baseData
*/
def api_general_hour(baseData: RDD[(String, String, List[Double], String,String)]) = {
baseData.map(tp => ((tp._1, tp._2), (tp._3))).reduceByKey((list1, list2) => {
list1.zip(list2).map(tp => tp._1 + tp._2)
}).foreachPartition(tp => {
val redis = Jpools.getJedis
tp.foreach(data => {
redis.hincrBy("B-" + data._1._1, "total-" + data._1._2, data._2(0).toLong)
redis.hincrBy("B-" + data._1._1, "success-" + data._1._2, data._2(1).toLong)
//redis.expire("B-" + data._1._1, 60 * 60 * 48)
})
redis.close()
})
}
/**
* 全国各省充值业务失败量分布
* 使用广播的方式获取省份名称
* 日期 时间hour list(成功标记位,金额,
*/
def api_general_province(baseData: RDD[(String, String, List[Double], String,String)],provinceName:Broadcast[Map[String, AnyRef]]) = {
baseData.map(tp => ((tp._1,tp._4),tp._3)).reduceByKey((list1,list2)=>{
list1.zip(list2).map(tp =>tp._1+tp._2)
})
.foreachPartition(tp => {
val redis = Jpools.getJedis
tp.foreach(data =>{
//redis.hincrBy("C-"+data._1._1,"total-"+provinceName.value.getOrElse(data._1._2,data._1._2),data._2(0).toLong)
redis.hincrBy("C-"+data._1._1,provinceName.value.getOrElse(data._1._2,data._1._2)+"",data._2(1).toLong)
// redis.expire("C-"+data._1._1,60*60*48)
})
redis.close()
})
}
/**
* 实时统计每分钟的充值金额和订单量
*/
def api_realtime_minute(baseData: RDD[(String, String, List[Double], String,String)]) = {
baseData.map(tp => ((tp._1,tp._2,tp._5),List(tp._3(1),tp._3(2)))).reduceByKey((list1,list2)=>{
list1.zip(list2).map(tp =>tp._1+tp._2)
})
.foreachPartition(tp => {
val redis = Jpools.getJedis
tp.foreach(data =>{
redis.hincrBy("D-"+data._1._1,"Num-"+data._1._2+data._1._3,data._2(0).toLong)
redis.hincrBy("D-"+data._1._1,"Money-"+data._1._2+data._1._3,data._2(1).toLong)
//redis.expire("D-"+data._1._1,60*60*48)
})
redis.close()
})
}
}
application.conf(配置文件,在resources目录下)
//配置kafka参数
kafka.topic = "myTopic"
kafka.groupId = "lkp"
kafka.broker = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
//redis配置参数
redis.host = "hadoop02" //主机名
redis.index = "2" //保存到redis中那个库
//配置mysql数据库信息(scalikejdbc)
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop01:3306/test?characterEncoding=utf-8"
db.default.user="root"
db.default.password="root"
//省份对应
province = {
100 = "北京"
200 = "广东"
210 = "上海"
220 = "天津"
230 = "重庆"
240 = "辽宁"
250 = "江苏"
270 = "湖北"
280 = "四川"
290 = "陕西"
311 = "河北"
351 = "山西"
371 = "河南"
431 = "吉林"
451 = "黑龙江"
471 = "内蒙古"
531 = "山东"
551 = "安徽"
571 = "浙江"
591 = "福建"
731 = "湖南"
771 = "广西"
791 = "江西"
851 = "贵州"
871 = "云南"
891 = "西藏"
898 = "海南"
931 = "甘肃"
951 = "宁夏"
971 = "青海"
991 = "新疆"
}
AppParametes.scala(主要读取配置文件,让其他类调用)
package cn.pig.utils
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer
object AppParameters {
/**
* kafka配置设置
*/
//读取配置文件 读取顺序:application.conf-->application.json-->application.properties
val config = ConfigFactory.load()
val topic: Array[String] = config.getString("kafka.topic").split(",")
val groupId = config.getString("kafka.groupId")
val brokers = config.getString("kafka.broker")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
/**
* Redis设置
* 主机 和 第几个数据库数据库
*/
val redis_host = config.getString("redis.host")
val redis_index = config.getString("redis.index").toInt
/**
* 省份设置
*/
import scala.collection.JavaConversions._
val provinces = config.getObject("province").unwrapped().toMap
}
CalculateTools.scala(计算日期的类)
package cn.pig.utils
import java.text.SimpleDateFormat
import org.apache.commons.lang.time.FastDateFormat
object CalculateTools {
def getDate(requestId:String,endTime:String)={
val startTime = requestId.substring(0,17)
//val format = FastDateFormat.getInstance("yyyyMMddHHmmssSSS")
val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")
format.parse(endTime).getTime - format.parse(startTime).getTime
}
}
Jpools.scala(Redis数据库连接池)
package cn.pig.utils
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool
/**
* Redis数据库池
*/
object Jpools {
private val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxIdle(5) //最大的空闲连接,默认为6
poolConfig.setMaxTotal(2000) //支持最大的连接数 默认为8
private lazy val jedisPool = new JedisPool(poolConfig,AppParameters.redis_host,6379)
def getJedis = {
val jedis = jedisPool.getResource()
jedis.select(AppParameters.redis_index)
jedis
}
}
OffsetManager.scala(使用scalikejdbc连接mysql保存Offset )
package cn.pig.utils
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import scalikejdbc._
import scalikejdbc.config._
object OffsetManager {
//加载配置文件 application.conf
DBs.setup()
/**
* 获取自己存储的偏移量信息
* @return
*/
def getMyCurrentOffset :Map[TopicPartition,Long] = {
DB.readOnly(implicit session =>
SQL("select * from streaming_offset where groupId = ?").bind(AppParameters.groupId)
.map(rs =>
(
new TopicPartition(rs.string("topicName"),rs.int("partitionId")),
rs.long("offset")
)
).list().apply().toMap
)
}
/**
* 持久化存储当前的偏移量
*/
def saveCurrentOffset(offsetRanges: Array[OffsetRange]): Unit ={
DB.localTx(implicit session =>{
offsetRanges.foreach(or =>{
SQL("replace into streaming_offset values(?,?,?,?)")
.bind(or.topic,or.partition,or.untilOffset,AppParameters.groupId)
.update()
.apply()
})
})
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.com.pig</groupId>
<artifactId>cmcc_moniter</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.2.1</spark.version>
<mysql.version>5.1.38</mysql.version>
<redis.version>2.9.0</redis.version>
<config.version>1.3.3</config.version>
<fastjson.version>1.2.47</fastjson.version>
<scalikejdbc.version>2.5.0</scalikejdbc.version>
</properties>
<dependencies>
<!-- spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${redis.version}</version>
</dependency>
<!-- streaming-kafka-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 指定kafka-client API的版本-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>${config.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId>
<version>2.5.0</version>
</dependency>
<!-- scalikejdbc-config -->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-config_2.11</artifactId>
<version>${scalikejdbc.version}</version>
</dependency>
</dependencies>
</project>
Idea+Servlet+echart
MapVo.class(地图的Bean)
package cn.pig.cmcc.beans;
/**
* 充值量业务实体类
*/
public class MapVo {
/**
* 省份和数量
*/
private String name;
private int value;
public MapVo(String name, int value) {
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
public MapVo() {
}
}
MinutesKpiVo.class
package cn.pig.cmcc.beans;
public class MinutesKpiVo {
private String money;
private String counts;
public MinutesKpiVo(String money, String counts) {
this.money = money;
this.counts = counts;
}
public MinutesKpiVo() {
}
public String getMoney() {
return money;
}
public void setMoney(String money) {
this.money = money;
}
public String getCounts() {
return counts;
}
public void setCounts(String counts) {
this.counts = counts;
}
}
Servlet.java(处理地图请求的Servlet)
package cn.pig.cmcc.controller;
import cn.pig.cmcc.beans.MapVo;
import cn.pig.cmcc.services.IMapIndexService;
import cn.pig.cmcc.services.MapIndexService;
import com.alibaba.fastjson.JSONObject;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
/**
* 充值成功业务分布Servlet访问接口
*/
@WebServlet(name = "Servlet",urlPatterns = "/mapIndex.cmcc")
public class Servlet extends HttpServlet {
//实例化service对象
IMapIndexService service = new MapIndexService();
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
resp.setCharacterEncoding("utf-8");
resp.setContentType("application/json");
//接收前端传递的参数
String day = req.getParameter("day");
//调用service
List<MapVo> voList = service.findAllBy(day);
//将数据返回给前端
String jsonStr = JSONObject.toJSONString(voList);
//将jsonzfc写到前端
resp.getWriter().write(jsonStr);
}
}
Servlet_Minute.class(实时显示充值金额和充值数量的Servlet)
package cn.pig.cmcc.controller;
import cn.pig.cmcc.beans.MapVo;
import cn.pig.cmcc.beans.MinutesKpiVo;
import cn.pig.cmcc.services.IMapIndexService;
import cn.pig.cmcc.services.IMinuteService;
import cn.pig.cmcc.services.MapIndexService;
import cn.pig.cmcc.services.MinuteKpiService;
import com.alibaba.fastjson.JSONObject;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
/**
* 充值成功业务分布Servlet访问接口
*/
@WebServlet(name = "Servlet_Minute",urlPatterns = "/minutesKpi.cmcc")
public class Servlet_Minute extends HttpServlet {
//实例化service对象
IMinuteService service = new MinuteKpiService();
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
resp.setContentType("application/json;charset=utf-8");
//接收参数
String day = req.getParameter("day");
//获取时间
Date date = new Date();
SimpleDateFormat format = new SimpleDateFormat("HHmm");
String time = format.format(date);
MinutesKpiVo vo = service.findBy(day,time);
//将对象转换成json 并输出
resp.getWriter().write(JSONObject.toJSONString(vo));
}
}
IMapIndexService.java(地图业务接口)
package cn.pig.cmcc.services;
import cn.pig.cmcc.beans.MapVo;
import java.util.List;
public interface IMapIndexService {
/**
* 通过日期
* @param day
* @return
*/
List<MapVo> findAllBy(String day);
}
IMinuteService.class(实时显示业务接口)
package cn.pig.cmcc.services;
import cn.pig.cmcc.beans.MinutesKpiVo;
public interface IMinuteService {
/**
* 根据日期和时间获取数据
* @param date
* @param hourMinutes
* @return
*/
MinutesKpiVo findBy(String date,String hourMinutes);
}
MapIndexService.class(处理地图业务)
package cn.pig.cmcc.services;
import cn.pig.cmcc.beans.MapVo;
import cn.pig.cmcc.utils.Constants;
import cn.pig.cmcc.utils.Jpools;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class MapIndexService implements IMapIndexService{
@Override
public List<MapVo> findAllBy(String day) {
List<MapVo> list = new ArrayList<>();
//从redis中读取数据
Jedis jedis = Jpools.getJedis();
Map<String,String> all = jedis.hgetAll(Constants.MAP_PREFIX+day);
for(Map.Entry<String,String> entry:all.entrySet()){
MapVo map = new MapVo();
map.setName(entry.getKey());
map.setValue(Integer.parseInt(entry.getValue()));
list.add(map);
}
return list;
}
}
MinuteKpiService.class(实时业务处理)
package cn.pig.cmcc.services;
import cn.pig.cmcc.beans.MinutesKpiVo;
import cn.pig.cmcc.utils.Constants;
import cn.pig.cmcc.utils.Jpools;
import redis.clients.jedis.Jedis;
public class MinuteKpiService implements IMinuteService {
/**
* 根据日期和时间获取数据
* @param day
* @param hourMinutes
* @return
*/
@Override
public MinutesKpiVo findBy(String day, String hourMinutes) {
MinutesKpiVo vo = new MinutesKpiVo();
//获取数据
Jedis jedis = Jpools.getJedis();
//获取最近一分钟的充值金额,获取最近一分钟的充值笔数
String money = jedis.hget(Constants.MUNITE_PREFIX+day,Constants.MINUTES_FIELD_M_PREFIX+hourMinutes);
//获取最近一分钟的充值笔数
String num = jedis.hget(Constants.MUNITE_PREFIX+day,Constants.MINUTES_FIELD_NUM_PREFIX+hourMinutes);
jedis.close();
vo.setCounts(num);
vo.setMoney(money);
return vo;
}
}
Constants.class(redis数据库的数据前缀)
package cn.pig.cmcc.utils;
public class Constants {
/**
* 地图Key的前缀
*
*/
public static final String MAP_PREFIX="C-";
/**
* 每分钟充值金额和笔数的前缀
*/
public static final String MUNITE_PREFIX="D-";
/**
* 每分钟充值笔数
*/
public static final String MINUTES_FIELD_NUM_PREFIX="Num-";
/**
* 每分钟充值金额
*/
public static final String MINUTES_FIELD_M_PREFIX="Money-";
}
Jpools.class(Redis数据库连接池)
package cn.pig.cmcc.utils;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* redis数据库连接池访问类
*/
public class Jpools {
private static Config load = ConfigFactory.load();
private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
private static JedisPool jedisPool = null;
static{
poolConfig.setMaxIdle(load.getInt("redis.pool.maxIdle"));
poolConfig.setMaxTotal(load.getInt("redis.pool.maxIdle"));
jedisPool = new JedisPool(poolConfig,load.getString("redis.ip"),load.getInt("redis.port"));
}
/**
* 提供redis访问连接
* @return
*/
public static Jedis getJedis(){
Jedis jedis = jedisPool.getResource();
jedis.select(load.getInt("redis.db.index"));
return jedis;
}
}
application.conf
# 可用连接实例的最大数目,默认值为8;
# 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
redis.pool.maxActive=1024
# 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
redis.pool.maxIdle=200
# 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
redis.pool.testOnBorrow=true
# 当调用return Object方法时,是否进行有效性检查
redis.pool.testOnReturn=true
# redis 服务器地址
redis.ip="hadoop02"
# redis 端口
redis.port=6379
# redis数据库
redis.db.index = 5
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.pig</groupId>
<artifactId>cmcc_visual_2</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<name>cmcc_visual_2 Maven Webapp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>
</dependencies>
<build>
<finalName>cmcc_visual_2</finalName>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.20.1</version>
</plugin>
<plugin>
<artifactId>maven-war-plugin</artifactId>
<version>3.2.0</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
index.jsp
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
<meta charset="utf-8">
<title>CMS Report</title>
</head>
<body>
<!-- 为ECharts准备一个具备大小(宽高)的Dom -->
<%--<div id="main" style="height:500px;border:1px solid #ccc;padding:10px;"></div>--%>
<div id="main" style="height:500px;border:1px solid #ccc;padding:10px;"></div>
<script src="js/jquery-3.1.1.min.js"></script>
<script src="js/echarts2.x/echarts-all.js"></script>
<%--<script src="js/echarts/echarts.min.js"></script>--%>
<script src="js/echarts2.x/theme/dark.js"></script>
<script type="text/javascript">
// 基于准备好的dom,初始化echarts图表
var myChart = echarts.init(document.getElementById('main'));
myChart.setTheme(dark);
var option = {
title : {
text: '全国充值业务成功量分布',
x:'center'
},
tooltip : {
trigger: 'item'
},
legend: {
orient: 'vertical',
x:'left',
data:['成功量']
},
dataRange: {
// min: 0,
// max: 2500,
// x: 'left',
// y: 'bottom',
// text:['高','低'], // 文本,默认为数值文本
// calculable : true
x: 'left',
y: 'bottom',
splitList: [
{start: 10000, color: '#E1022A'}, // label:自定义label color:自定义颜色
{start: 5000, end: 9999, color: '#E19106'},
{start: 1000, end: 4999, color:'#6CAF00'},
{start: 1, end: 999, color:'#A3E10A'},
{start: 0, end: 0, color: '#C5CDDB'}
]// ,
// color: ['#FF7E50', '#E09107', '#A3E00B']
},
toolbox: {
show: true,
orient : 'vertical',
x: 'right',
y: 'center',
feature : {
mark : {show: true},
dataView : {show: true, readOnly: false},
restore : {show: true},
saveAsImage : {show: true}
}
},
roamController: {
show: true,
x: 'right',
mapTypeControl: {
'china': true
}
},
series : [
{
name: '成功量',
type: 'map',
mapType: 'china',
roam: false,
itemStyle:{
normal:{label:{show:true}},
emphasis:{label:{show:true}}
},
data: []
}
]
};
// ajax getting data...............
$.get("mapIndex.cmcc", { day: "20170412"},function(data){
option.series[0].data = data;
// 为echarts对象加载数据
myChart.setOption(option);
});
</script>
</body>
</html>
realtime.jsp
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
<meta charset="utf-8">
<title>demo</title>
<!-- 引入 ECharts 文件 -->
</head>
<body>
<h1>每分钟实时充值统计</h1>
<!-- 为ECharts准备一个具备大小(宽高)的Dom -->
<div id="main" style="height:500px;border:1px solid #ccc;padding:10px;"></div>
<script src="js/jquery-3.1.1.min.js"></script>
<script src="js/echarts2.x/echarts-all.js"></script>
<script type="text/javascript">var timeTicket;var timeOutTicket</script>
<script type="text/javascript">
var now = new Date();
function pad2(n) { return n < 10 ? '0' + n : n }
function generateTimeRequestNumber(date) {
return date.getFullYear().toString() + pad2(date.getMonth() + 1) + pad2(date.getDate()) + pad2(date.getHours()) + pad2(date.getMinutes());//+ pad2(date.getSeconds());
}
// 初始化图标对象
var myChart = echarts.init(document.getElementById('main'));
myChart.setTheme("macarons");
var option = {
title : {
text: '实时充值'
},
tooltip : {
trigger: 'axis'
},
legend: {
data:['充值笔数', '充值金额']
},
toolbox: {
show : true,
feature : {
mark : {show: true},
dataView : {show: true, readOnly: false},
magicType : {show: true, type: ['line', 'bar']},
restore : {show: true},
saveAsImage : {show: true}
}
},
dataZoom : {
show : false,
start : 0,
end : 100
},
xAxis : [
{
type : 'category',
boundaryGap : true,
data : (function (){
var res = [];
var len = 10;
while (len--) {
res.unshift(generateTimeRequestNumber(now));
now = new Date(now - 60000);
}
return res;
})()
},
{
type : 'category',
boundaryGap : true,
data : (function (){
var res = [];
var len = 10;
while (len--) {
res.push(len + 1);
}
return res;
})()
}
],
yAxis : [
{
type : 'value',
scale: true,
name : '笔数',
boundaryGap: [0, 0]
},
{
type : 'value',
scale: true,
name : '金额',
boundaryGap: [0, 0]
}
],
series : [
{
name:'充值金额',
type:'bar',
xAxisIndex: 1,
yAxisIndex: 1,
data:(function (){
var res = [];
var len = 10;
while (len--) {
res.push(0);
}
return res;
})()
},
{
name:'充值笔数',
type:'line',
data:(function (){
var res = [];
var len = 10;
while (len--) {
res.push(0);
}
return res;
})()
}
]
};
clearInterval(timeTicket);
timeTicket = setInterval(function () {
var lastData = 0;
var d = 0;
var axisData = generateTimeRequestNumber(new Date());
// Ajax 发送到后台,从数据库中获取数据
$.get("minutesKpi.cmcc", { day: "20170412"},function(data){
d = parseInt(data.money); // 充值金额
lastData = data.counts; // 充值笔数
// 动态数据接口 addData
myChart.addData([
[
0, // 系列索引
d, // 新增数据, [柱状态图数据]
false, // 新增数据是否从队列头部插入
false // 是否增加队列长度,false则自定删除原有数据,队头插入删队尾,队尾插入删队头
],
[
1, // 系列索引
lastData, // 新增数据 [ 折线的数量]
false, // 新增数据是否从队列头部插入
false, // 是否增加队列长度,false则自定删除原有数据,队头插入删队尾,队尾插入删队头
axisData // 坐标轴标签, x轴的时间
]
]);
});
}, 60000); // setInterval 每隔60秒执行一次realTimeData()
myChart.setOption(option);
</script>
</body>
</html>
1 mysql中offset的存储
2 Redis中业务数据的存储
3 页面地图的显示
4 实时充值显示(按分钟显示)