SparkCore案例
# -*- coding: utf-8 -*-
# Program function:测试结巴分词
import jieba
import re
# jieba.cut
# 方法接受四个输入参数:
# 需要分词的字符串;
# cut_all 参数用来控制是否采用全模式;
# HMM 参数用来控制是否使用 HMM 模型;
# use_paddle 参数用来控制是否使用paddle模式下的分词模式,paddle模式采用延迟加载方式,通过enable_paddle接口安装paddlepaddle-tiny,并且import相关代码;
str = "我来到北京清华大学"
print(list(jieba.cut(str))) # ['我', '来到', '北京', '清华大学'],默认的是精确模式
print(list(jieba.cut(str, cut_all=True))) # ['我', '来到', '北京', '清华', '清华大学', '华大', '大学'] 完全模式
# 准备的测试数据
str1 = "00:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html"
print(re.split("\s+", str1)[2]) # [360安全卫士]
print(re.sub("\[|\]", "", re.split("\s+", str1)[2])) #360安全卫士
print(list(jieba.cut(re.sub("\[|\]", "", re.split("\s+", str1)[2])))) # [360安全卫士] --->['360', '安全卫士']
# -*- coding: utf-8 -*-
# Program function:搜狗分词之后的统计
'''
* 1-读取数据
* 2-完成需求1:搜狗关键词统计
* 3-完成需求2:用户搜索点击统计
* 4-完成需求3:搜索时间段统计
* 5-停止sparkcontext
'''
from pyspark import SparkConf, SparkContext
import re
import jieba
if __name__ == '__main__':
# 准备环境变量
conf = SparkConf().setAppName("sougou").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")
# TODO*1 - 读取数据
sougouFileRDD = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/sougou/SogouQ.reduced")
# print("sougou count is:", sougouFileRDD.count())#sougou count is: 1724264
# 00:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html
resultRDD=sougouFileRDD \
.filter(lambda line:(len(line.strip())>0) and (len(re.split("\s+",line.strip()))==6))\
.map(lambda line:(
re.split("\s+", line)[0],
re.split("\s+", line)[1],
re.sub("\[|\]", "", re.split("\s+", line)[2]),
re.split("\s+", line)[3],
re.split("\s+", line)[4],
re.split("\s+", line)[5]
))
# print(resultRDD.take(2))
#('00:00:00', '2982199073774412', '360安全卫士', '8', '3', 'download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html')
#('00:00:00', '07594220010824798', '哄抢救灾物资', '1', '1', 'news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml')
# TODO*2 - 完成需求1:搜狗关键词统计
print("=============完成需求1:搜狗关键词统计==================")
recordRDD = resultRDD.flatMap(lambda record: jieba.cut(record[2]))
# print(recordRDD.take(5))
sougouResult1=recordRDD\
.map(lambda word:(word,1))\
.reduceByKey(lambda x,y:x+y)\
.sortBy(lambda x:x[1],False)
# print(sougouResult1.take(5))
# TODO*3 - 完成需求2:用户搜索点击统计
print("=============完成需求2:用户搜索点击统计==================")
# 根据用户id和搜索的内容作为分组字段进行统计
sougouClick = resultRDD.map(lambda record: (record[1], record[2]))
sougouResult2=sougouClick\
.map(lambda tuple:(tuple,1))\
.reduceByKey(lambda x,y:x+y) #key,value
# 打印一下最大的次数和最小的次数和平均次数
print("max count is:",sougouResult2.map(lambda x: x[1]).max())
print("min count is:",sougouResult2.map(lambda x: x[1]).min())
print("mean count is:",sougouResult2.map(lambda x: x[1]).mean())
# 如果对所有的结果排序
# print(sougouResult2.sortBy(lambda x: x[1], False).take(5))
# TODO*4 - 完成需求3:搜索时间段统计
print("=============完成需求3:搜索时间段-小时-统计==================")
#00:00:00
hourRDD = resultRDD.map(lambda x: str(x[0])[0:2])
sougouResult3=hourRDD\
.map(lambda word:(word,1))\
.reduceByKey(lambda x,y:x+y)\
.sortBy(lambda x:x[1],False)
print("搜索时间段-小时-统计",sougouResult3.take(5))
# TODO*5 - 停止sparkcontext
sc.stop()