用前面7天的做训练样本(20170506-20170512),用第8天的做测试样本(20170513)
其中一个广告ID对应一个商品(宝贝),一个宝贝属于一个类目,一个宝贝属于一个品牌。
主要包括
分批处理,chunksize=100
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
# 构建结构对象
schema = StructType([
StructField("userId", IntegerType()),
StructField("timestamp", LongType()),
StructField("btag", StringType()),
StructField("cateId", IntegerType()),
StructField("brandId", IntegerType())
])
# 从hdfs加载数据为dataframe,并设置结构
behavior_log_df = spark.read.csv("hdfs://localhost:8020/datasets/behavior_log.csv", header=True, schema=schema)
behavior_log_df.show()
behavior_log_df.count()
print("判断数据是否有空值:", behavior_log_df.count(), behavior_log_df.dropna().count())
# 约7亿条目723268134 723268134
# 本数据集无空值条目,可放心处理
print("查看userId的数据情况:", behavior_log_df.groupBy("userId").count().count())
# 约113w用户
print("查看btag的数据情况:", behavior_log_df.groupBy("btag").count().collect()) # collect会把计算结果全部加载到内存,谨慎使用
pivot透视操作,把某列里的字段值转换成行并进行聚合运算
(pyspark.sql.GroupedData.pivot)
# 统计每个用户对各类商品的pv、fav、cart、buy数量
cate_count_df = behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.cateId).pivot("btag",["pv","fav","cart","buy"]).count()
cate_count_df.printSchema() # 此时还没有开始计算
# spark ml的模型训练是基于内存的,如果数据过大,内存空间小,迭代次数过多的化,可能会造成内存溢出,报错
# 设置Checkpoint的话,会把所有数据落盘,这样如果异常退出,下次重启后,可以接着上次的训练节点继续运行
# 但该方法其实指标不治本,因为无法防止内存溢出,所以还是会报错
# 如果数据量大,应考虑的是增加内存、或限制迭代次数和训练数据量级等
spark.sparkContext.setCheckpointDir("hdfs://localhost:8020/checkPoint/")
#设置检查点,避免迭代训练的过程中 挂掉,训练几步缓存当前的参数,如果挂掉了可以从检查点加载缓存
def process_row(r):
# 处理每一行数据:r表示row对象
# 偏好评分规则:
# m: 用户对应的行为次数
# 该偏好权重比例,次数上限仅供参考,具体数值应根据产品业务场景权衡
# pv: if m<=20: score=0.2*m; else score=4
# fav: if m<=20: score=0.4*m; else score=8
# cart: if m<=20: score=0.6*m; else score=12
# buy: if m<=20: score=1*m; else score=20
# 注意这里要全部设为浮点数,spark运算时对类型比较敏感,要保持数据类型都一致
pv_count = r.pv if r.pv else 0.0
fav_count = r.fav if r.fav else 0.0
cart_count = r.cart if r.cart else 0.0
buy_count = r.buy if r.buy else 0.0
pv_score = 0.2*pv_count if pv_count<=20 else 4.0
fav_score = 0.4*fav_count if fav_count<=20 else 8.0
cart_score = 0.6*cart_count if cart_count<=20 else 12.0
buy_score = 1.0*buy_count if buy_count<=20 else 20.0
rating = pv_score + fav_score + cart_score + buy_score
# 返回用户ID、分类ID、用户对分类的偏好打分
return r.userId, r.cateId, rating
cate_rating_df
ALS模型 是一种基于模型的推荐算法,基于最小二乘法对稀疏矩阵进行分解,可以依照分解的两个矩阵,对新的用户和物品数据进行评估。分解的两个矩阵的隐因子,可以看做是用户或物品的隐含特征,例如可以是用户的性格、教育程度、爱好等。 参考:为什么Spark中只有ALS 高度易并行化的——它的每个子任务之间没有什么依赖关系 显式:
隐式:
在隐反馈模型中是没有评分的,所以在式子中rui被pui所取代,pui是偏好的表示,仅仅表示用户和物品之间有没有交互,而不表示评分高低或者喜好程度。比如用户和物品之间有交互就让pui等于1,没有就等于0。函数中还有一个cui的项,它用来表示用户偏爱某个商品的置信程度,比如交互次数多的权重就会增加。
# model.recommendForAllUsers(N) 给所有用户推荐TOP-N个物品 召回
ret = model.recommendForAllUsers(3)
# 由于是给所有用户进行推荐,此处运算时间也较长
ret.show()
# 推荐结果存放在recommendations列中,
ret.select("recommendations").show()
import redis
host = "192.168.19.8"
port = 6379
# 召回到redis
def recall_cate_by_cf(partition):
# 建立redis 连接池
pool = redis.ConnectionPool(host=host, port=port)
# 建立redis客户端
client = redis.Redis(connection_pool=pool)
for row in partition:
client.hset("recall_cate", row.userId, [i.cateId for i in row.recommendations])
# 对每个分片的数据进行处理 #mapPartition Transformation map
# foreachPartition Action操作 foreachRDD
result.foreachPartition(recall_cate_by_cf)
# 注意:这里这是召回的是用户最感兴趣的n个类别
# 总的条目数,查看redis中总的条目数是否一致
result.count()
模型存在HDFS上
# 将模型进行存储
model.save("hdfs://localhost:9000/models/userBrandRatingModel.obj")
# 测试存储的模型
from pyspark.ml.recommendation import ALSModel
# 从hdfs加载模型
my_model = ALSModel.load("hdfs://localhost:9000/models/userBrandRatingModel.obj")
my_model
# model.recommendForAllUsers(N) 给用户推荐TOP-N个物品
my_model.recommendForAllUsers(3).first()
只有广告展示位pid对比较重要,且数据不同数据之间的占比约为6:4,因此pid可以作为一个关键特征 nonclk和clk在这里是作为目标值,不做为特征
热编码只能对字符串类型的列数据进行处理
StringIndexer
对指定字符串列数据进行特征处理,如将性别数据“男”、“女”转化为0和1OneHotEncoder
对特征列数据,进行热编码,通常需结合StringIndexer一起使用Pipeline
让数据按顺序依次被处理,将前一次的处理结果作为下一次的输入【引申】用Embedding解决特征过多的问题: 如果特征过多,用独热编码,将会造成大量稀疏向量。采用embedding转成低维稠密向量,类似于word2vec的词向量。 [Word2vec] 包括skip-gram(给出词预测上下文)和CBOW(给出上下文预测词)两种训练模式。Softmax层优化方法:①分层softmax:类似树形分类器,每个节点都可以是一个二分类器。常用词在顶部,类似哈夫曼树。②负采样:上下文词和目标词构成正样本;用相同的上下文词,再在字典找那个随机选一个词,标记为0. [embedding]生成方法:①矩阵分解和因子分解机②利用word2vec方法③node2vec伪交互行为方法
本样本数据集共计8天数据 前七天为训练数据、最后一天为测试数据
from datetime import datetime
datetime.fromtimestamp(1494691186)
print("该时间之前的数据为训练样本,该时间以后的数据为测试样本:", datetime.fromtimestamp(1494691186-24*60*60))
该时间之前的数据为训练样本,该时间以后的数据为测试样本: 2017-05-12 23:59:46
# 训练样本:
train_sample = raw_sample_df.filter(raw_sample_df.timestamp<=(1494691186-24*60*60))
print("训练样本个数:")
print(train_sample.count())
# 测试样本
test_sample = raw_sample_df.filter(raw_sample_df.timestamp>(1494691186-24*60*60))
print("测试样本个数:")
print(test_sample.count())
训练样本个数: 23249291 测试样本个数: 3308670
# 注意:由于本数据集中存在NULL字样的数据,无法直接设置schema,只能先将NULL类型的数据处理掉,然后进行类型转换
# 替换掉NULL字符串,替换掉
df = df.replace("NULL", "-1")
只选取price作为特征数据,因为价格本身是一个统计类型连续数值型数据,且能很好的体现广告的价值属性特征,通常也不需要做其他处理(离散化、归一化、标准化等),所以这里直接将当做特征数据来使用
# 注意:这里的null会直接被pyspark识别为None数据,也就是na数据,所以这里可以直接利用schema导入数据
但根据我们的经验,我们的广告推荐其实和用户的消费水平、用户所在城市等级都有比较大的关联,因此在这里pvalue_level、new_user_class_level都是比较重要的特征,我们不考虑舍弃
from pyspark.mllib.regression import LabeledPoint
# 剔除掉缺失值数据,将余下的数据作为训练数据
# user_profile_df.dropna(subset=["pvalue_level"]): 将pvalue_level中的空值所在行数据剔除后的数据,作为训练样本
train_data = user_profile_df.dropna(subset=["pvalue_level"]).rdd.map(
lambda r:LabeledPoint(r.pvalue_level-1, [r.cms_segid, r.cms_group_id, r.final_gender_code, r.age_level, r.shopping_level, r.occupation])
)
# 筛选出缺失值条目,作为预测样本
pl_na_df = user_profile_df.na.fill(-1).where("pvalue_level=-1")
new_user_profile_df = user_profile_df.dropna(subset=["pvalue_level"]).unionAll(spark.createDataFrame(pdf, schema=schema))
new_user_profile_df.show()
# 注意:unionAll的使用,两个df的表结构必须完全一样
困难点: 利用随机森林对new_user_class_level的缺失值进行预测 可以发现由于这两个字段的缺失过多,所以预测出来的值已经大大失真,但如果缺失率在10%以下,这种方法是比较有效的一种 解决办法: 低维转高维方式 我们接下来采用将变量映射到高维空间的方法来处理数据,即将缺失项也当做一个单独的特征来对待,保证数据的原始性 由于该思想正好和热独编码实现方法一样,因此这里直接使用热独编码方式处理数据
# 使用热独编码转换pvalue_level的一维数据为多维,其中缺失值单独作为一个特征值
# 需要先将缺失值全部替换为数值,与原有特征一起处理
from pyspark.sql.types import StringType
user_profile_df = user_profile_df.na.fill(-1)
user_profile_df.show()
# 热独编码时,必须先将待处理字段转为字符串类型才可处理
user_profile_df = user_profile_df.withColumn("pvalue_level", user_profile_df.pvalue_level.cast(StringType()))\
.withColumn("new_user_class_level", user_profile_df.new_user_class_level.cast(StringType()))
user_profile_df.printSchema()
# 对pvalue_level进行热独编码,求值
stringindexer = StringIndexer(inputCol='pvalue_level', outputCol='pl_onehot_feature')
encoder = OneHotEncoder(dropLast=False, inputCol='pl_onehot_feature', outputCol='pl_onehot_value')
pipeline = Pipeline(stages=[stringindexer, encoder])
pipeline_fit = pipeline.fit(user_profile_df)
user_profile_df2 = pipeline_fit.transform(user_profile_df)
# pl_onehot_value列的值为稀疏向量,存储热独编码的结果
user_profile_df2.printSchema()
user_profile_df2.show()
VectorAssembler
将多个数值列按顺序汇总成一个向量列。from pyspark.ml.feature import VectorAssembler
feature_df = VectorAssembler().setInputCols(["age_level", "pl_onehot_value", "nucl_onehot_value"]).setOutputCol("features").transform(user_profile_df3)
feature_df.show()
根据广告点击样本数据集(raw_sample)、广告基本特征数据集(ad_feature)、用户基本信息数据集(user_profile)构建出了一个完整的样本数据集,并按日期划分为了训练集(前七天)和测试集(最后一天),利用逻辑回归进行训练。
Dataframe数据合并:pyspark.sql.DataFrame.join
# raw_sample_df和ad_feature_df合并条件
condition = [raw_sample_df.adgroupId==ad_feature_df.adgroupId]
_ = raw_sample_df.join(ad_feature_df, condition, 'outer')
# _和user_profile_df合并条件
condition2 = [_.userId==user_profile_df.userId]
datasets = _.join(user_profile_df, condition2, "outer")
# 查看datasets的结构
datasets.printSchema()
# 查看datasets条目数
print(datasets.count())
按probability升序排列数据,probability表示预测结果的概率 如果预测值是0,其概率是0.9248,那么反之可推出1的可能性就是1-0.9248=0.0752,即点击概率约为7.52% 因为前面提到广告的点击率一般都比较低,所以预测值通常都是0,因此通常需要反减得出点击的概率
改进:进一步提升训练精确度,将类别特征转为多维特征,提高特征空间的维度, 类别性特征都可以考虑进行热独编码,将单一变量变为多变量,相当于增加了相关特征的数量
这里主要是利用我们前面训练的ALS模型进行协同过滤召回
如果是离线,为每个用户召回排序的结果存在redis中,线上服务需求推荐时,在redis中调取出来; 优点:操作简单 缺点:不重新算的话 数据库中数据不变,实时性不好
如果是在线的话,获取到用户id,到数据库中找到用户特征,找到所有商品的特征,将用户特征和商品特征送入逻辑回归模型中计算点击率,做排序 若用户对于推荐的某物品 累计几次都没看 则将此物品从召回集中删除,实时的影响到召回结果 若用户近期对于某些物品点击的多,也可以实时的更新用户特征(例如消费档次。。。
CTR预测模型 + 特征 ==> 预测结果 ==> TOP-N列表
1.基于用户的:为用户推荐和他兴趣相似的其他用户喜欢的物品 (1)首先根据用户对物品的打分情况,计算用户与其他用户的相似程度,找出最相似的n个用户 (2)根据这n个用户对此物品的评分情况以及用户相似性程度可以得出用户对物品的评分。如果评分较高,则推荐。 2.基于物品的:为用户推荐和他之前喜欢的物品相似的物品 (1)计算物品之间的相似度。 (2)根据物品的相似度和用户的历史行为给用户生成推荐列表。 3.计算两个向量之间的相似程度 (1)杰卡德相似系数:两个集合的交集占并集的比例 (2)余弦相似度:向量内积/向量2范数乘积 (3)皮尔逊相关系数:减平均值 4.应用场景 UserCF:适用于用户少、物品多、时效性较强的场合(新闻推荐) ItemCF: 适用于物品少、用户多、用户兴趣固定的场合。 5.缺陷: (1)泛化能力弱,热门物品具有很强的头部效应,容易跟大量物品产生相似,而尾部物品由于特征向量稀疏,导致很少被推荐;【矩阵分解技术,在协同过滤共现矩阵的基础上,使用更稠密的隐向量表示用户和物品,挖掘用户和物品的隐含兴趣和隐含特征,弥补协同过滤模型处理稀疏矩阵能力不足的问题。】 (2)仅利用了用户与物品的交互信息,没有利用到物品本身和用户本身的属性【以逻辑回归模型为核心的推荐模型,引用了更多的特征】
优点:由于隐向量的存在,使得任意的用户和物品之间都可以得到预测分值,而求解隐向量的过程其实是对评分矩阵进行全局拟合的过程,这个过程中考虑了所有的用户和评分,因此隐向量是利用全局信息生成的,有更强的泛化能力。 缺点:只用到了评分矩阵,没有考虑到用户特征、物品特征和上下文特征。【逻辑回归模型以及因子分解机模型可以解决。】
项目简介:利用移动设备的内嵌传感器采集用户的步态信息,对当前用户进行步态认证。
坐标系转换 用户坐标系:X轴指向上,Y轴为运动方向,Z轴为横向。 X轴:重力过滤 Y轴:PCA降维,找到方差最大的方向 Z轴:与X轴和Y轴垂直
LSTM:遗忘门、输入门、输出门、还包括细胞状态
GRU: 更新门(类似于LSTM的遗忘门和输入门)、重置门(控制需要保留多少之前的记忆),去除了细胞状态,使用隐藏状态进行信息的传递。
GRU z:更新门 r:重置门
(2)RMSprop: 对AdaGrad的改进,按照衰减系数累积历史的梯度平方值
(3)Adam: RMSprop(过去梯度平方的指数衰减平均值)+动量项(过去梯度的指数衰减平均值)
动量项:可以使得梯度方向不变的维度上速度变快,梯度方向有所改变的维度上更新速度变慢,这样可以加快收敛并减小振荡。
回归模型: 1 线性回归:自变量和因变量必须满足线性关系 2 套索回归:线性回归+L1正则,有助于特征选择 3 岭回归:线性回归+L2正则
LR 逻辑回归 分类 ①原理:假设数据服从伯努利分布(抛硬币),在线性回归的基础上加了一个sigmoid函数(非线性映射),通过极大似然函数的方法,运用梯度下降求解参数,达到将数据二分类的目的。 ②优点:简单、占内存小、便于并行。
缺点:需要手动交叉特征;处理非线性问题麻烦,需离散化。 【为什么要特征交叉,特征切分:举例辛普森悖论:在某个条件下的两组数据,分别讨论时都会满足某种性质,可是一旦合并考虑,却可能导致相反的结论。】 为什么不用平方损失函数: ①若用,会发现梯度的更新速度和sigmoid函数本身的梯度相关,而sigmoid函数在它定义域内的梯度都不大于0.25,训练会非常慢 ②会使得损失函数不是凸优化的。
似然函数:
损失函数:
梯度更新:
决策树
支持向量机
集成学习 ⽅差反映的是模型每⼀次输出结果与模型输出期望之间的误差,即模型的稳定性。
随机森林
GBDT 原理:只能用回归树。每一颗树学的是之前所有树结论和的残差,用损失函数的负梯度来拟合本轮损失的近似值。无论是分类问题还是回归问题,都可通过其损失函数的负梯度拟合,区别仅在于损失函数不同导致的负梯度不同。
缺点:由于弱学习器之间的依赖关系,难以并行训练数据。
XGBoost 相较于GBDT的优点:
LightGBM 相较于xgboost
Adaboost
缺点:对异常样本敏感,异常样本在迭代中可能会获得较高的权重,影响最终学习器的预测准确性。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/100172.html原文链接: