首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >使用Elasticsearch、Spark构建推荐系统 #2:深入分析

使用Elasticsearch、Spark构建推荐系统 #2:深入分析

原创
作者头像
flavorfan
发布于 2022-04-08 07:24:41
发布于 2022-04-08 07:24:41
3.8K00
代码可运行
举报
文章被收录于专栏:范传康的专栏范传康的专栏
运行总次数:0
代码可运行

Elasticsearch-spark-based recommender系统方案的两个关键步骤:

  1. ALS算法将user-item的交互历史建模构建相关共享隐变量空间(user matrix 和item matirx);
  2. 基于Elasticsearch将推荐问题转换为搜索问题。

1. 训练ALS模型

1) 数据预处理

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
ratings_from_es = spark.read.format("es").load("ratings")
ratings_from_es.show(5)

数据从es中读取,实际可以从其他源处理(clickhouse,csv等),另外可以分割为train、valid、test数据集

2)训练ALS模型

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", regParam=0.02, rank=VECTOR_DIM, seed=54)
model = als.fit(ratings_from_es)
model.userFactors.show(5)
model.itemFactors.show(5)

3)将ALS模型的user和itemfactor vector存储到Elasticsearch

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
from pyspark.sql.functions import lit, current_timestamp, unix_timestamp
ver = model.uid
ts = unix_timestamp(current_timestamp())
movie_vectors = model.itemFactors.select("id",\
                                         col("features").alias("model_factor"),\
                                         lit(ver).alias("model_version"),\
                                         ts.alias("model_timestamp"))
movie_vectors.show(5)
user_vectors = model.userFactors.select("id",\
                                        col("features").alias("model_factor"),\
                                        lit(ver).alias("model_version"),\
                                        ts.alias("model_timestamp"))
user_vectors.show(5)

movie_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "update") \
    .save("movies", mode="append")
user_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "index") \
    .save("users", mode="append")   

2. 使用Elasticsearch进行推荐:Script score query

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def vector_query(query_vec, vector_field, q="*", cosine=False):   
    if cosine:
        score_fn = "doc['{v}'].size() == 0 ? 0 : cosineSimilarity(params.vector, '{v}') + 1.0"
    else:
        score_fn = "doc['{v}'].size() == 0 ? 0 : sigmoid(1, Math.E, -dotProduct(params.vector, '{v}'))"
       
    score_fn = score_fn.format(v=vector_field, fn=score_fn)    
    return {
    "query": {
        "script_score": {
            "query" : { 
                "query_string": {
                    "query": q
                }
            },
            "script": {
                "source": score_fn,
                "params": {
                    "vector": query_vec
                }
            }
        }
    }
}

def get_similar(the_id, q="*", num=10, index="movies", vector_field='model_factor'):
    response = es.get(index=index, id=the_id)
    src = response['_source']
    if vector_field in src:
        query_vec = src[vector_field]
        q = vector_query(query_vec, vector_field, q=q, cosine=True)
        results = es.search(index=index, body=q)
        hits = results['hits']['hits']
        return src, hits[1:num+1]    
    
def get_user_recs(the_id, q="*", num=10, users="users", movies="movies", vector_field='model_factor'):
    response = es.get(index=users, id=the_id)
    src = response['_source']
    if vector_field in src:
        query_vec = src[vector_field]
        q = vector_query(query_vec, vector_field, q=q, cosine=False)
        results = es.search(index=movies, body=q)
        hits = results['hits']['hits']
        return src, hits[:num]

def get_movies_for_user(the_id, num=10, ratings="ratings", movies="movies"):
    response = es.search(index=ratings, q="userId:{}".format(the_id), size=num, sort=[{"rating":"desc"}])
    hits = response['hits']['hits']
    ids = [h['_source']['movieId'] for h in hits]
    movies = es.mget(body={"ids": ids}, index=movies, _source_includes=['tmdbId', 'title'])
    movies_hits = movies['docs']
    tmdbids = [h['_source'] for h in movies_hits]
    return tmdbids

通过Elasticsearch的script score query for vector functions从factor vector中生成推荐,具体通过vector_query进行封装,用cosine距离计算同种(user或者item)相似度,用prudoct点乘对user计算推荐物品。

3. 深入分析

1) 为什么不使用spark ml直接推荐?

其一,工程和学术做trade-off的结果,在model serving过程中对几百万个候选集逐一跑一遍模型的时间开销显然太大了,因此在通过Elasticsearch最近邻搜索的方法高效很多,复杂度nlogn vs logn。

其二,可以添加丰富灵活的query,直接对候选集进行多维度的过滤操作。比如:杭州地区(地点)20年代(年龄)用户喜欢的火锅店(品类)。

2) implicit vs explicit

显式反馈的目标函数

隐式反馈的目标函数

隐式反馈的数据场景远远多于显式反馈,spark.ml.recommender.ALS对两种都支持

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class pyspark.ml.recommendation.ALS(
  rank=10, 
  maxIter=10, 
  regParam=0.1, 
  numUserBlocks=10, 
  numItemBlocks=10, 
  implicitPrefs=False, 
  alpha=1.0, 
  userCol='user', 
  itemCol='item', 
  seed=None, 
  ratingCol='rating', 
  nonnegative=False, 
  checkpointInterval=10, 
  intermediateStorageLevel='MEMORY_AND_DISK', 
  finalStorageLevel='MEMORY_AND_DISK', 
  coldStartStrategy='nan')

关键参数的选择

3) 隐式反馈的评估 MPR, MRR

隐式反馈的评估基于召回的MPR(mean percent ranking)平均百分比排名。

另外一个评估指标是MRR(Mean Reciprocal Rank):

具体相关的计算pyspark代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(
    predictions
    .withColumn('rank', row_number().over(Window.partitionBy('userId').orderBy(desc('prediction'))))
    .where(col('counts') > 0) # Notice: this excludes users with no actions at all
    .groupby('userId')
    .agg(
        count('*').alias('n'),
        sum(1 - col('prediction')).alias('sum_pred'),
        sum(col('rank') / n_genres).alias('sum_perc_rank'),
        min('rank').alias('min_rank')
    )
    .agg(
        (sum('sum_pred') / sum('n')).alias('avg 1-score'),
        (sum('sum_perc_rank') / sum('n')).alias('MPR'), # the lower the better
        mean(1 / col('min_rank')).alias('MRR')          # the higher the better
    )
    .withColumn('MPR*k', col('MPR') * n_genres)
    .withColumn('1/MRR', 1/col('MRR'))
).show()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【机器学习】从电影数据集到推荐系统
你们可能曾经花上几分钟甚至几个小时去选择一部电影单独看或者和家人一起看,不幸的是没有成功?你希望有人在这种时候替你做决定,这正是推荐系统的作用。
黄博的机器学习圈子
2021/07/07
3.4K1
【机器学习】从电影数据集到推荐系统
使用Spark MLlib给豆瓣用户推荐电影
问题导读: 1.常用的推荐算法有哪些? 2.推荐系统是什么样的流程? 3.从这个推荐系统我们能学到什么? 推荐算法就是利用用户的一些行为,通过一些数学算法,推测出用户可能喜欢的东西。 随着电子商务规模的不断扩大,商品数量和种类不断增长,用户对于检索和推荐提出了更高的要求。由于不同用户在兴趣爱好、关注领域、个人经历等方面的不同,以满足不同用户的不同推荐需求为目的、不同人可以获得不同推荐为重要特征的个性化推荐系统应运而生。 推荐系统成为一个相对独立的研究方向一般被认为始自1994年明尼苏达大学GroupLen
用户1410343
2018/03/27
2.1K0
使用Spark MLlib给豆瓣用户推荐电影
【Spark Mllib】K-均值聚类——电影类型
代码实现中,首先需要引入必要的模块,设置模型参数: K(numClusters)、最大迭代次数(numIteration)和训练次数(numRuns)。然后,对电影的系数向量运行K-均值算法。最后,在用户相关因素的特征向量上训练K-均值模型:
小爷毛毛_卓寿杰
2019/02/13
1.4K0
【Spark Mllib】K-均值聚类——电影类型
pyspark 特征工程
曾经在15、16年那会儿使用Spark做机器学习,那时候pyspark并不成熟,做特征工程主要还是写scala。后来进入阿里工作,特征处理基本上使用PAI 可视化特征工程组件+ODPS SQL,复杂的话才会自己写python处理。最近重新学习了下pyspark,笔记下如何使用pyspark做特征工程。
小爷毛毛_卓寿杰
2021/03/20
2.3K0
利用Spark MLIB实现电影推荐
源码及数据集:https://github.com/luo948521848/BigData
Java架构师必看
2021/07/22
1.2K0
Elasticsearch 使用不同分词器导致搜索排名的问题
seth-shi
2023/12/18
2500
推荐系统 —— 实践 Spark ALS算法
这里就不啰嗦了,直接贴代码,然后拿来运行就可以看到结果了,不过请注意该代码是基于 movelens 数据,所以想要运行你还得去下载一下这个数据,百度一下就有了噢 ALS算法也是spark提供的唯一的协同过滤推荐算法,其基本原理类似与 LFM,基于矩阵分解的隐因子算法。嗯,纯属过一把推经瘾。。。哈哈 package com.text import org.apache.spark.ml.recommendation import org.apache.spark.{SparkConf, SparkCon
solve
2019/10/30
1.5K0
看完这篇还不会 Elasticsearch 搜索,那我就哭了!
本文主要介绍 ElasticSearch 搜索相关的知识,首先会介绍下 URI Search 和 Request Body Search,同时也会学习什么是搜索的相关性,如何衡量相关性。
武培轩
2020/03/13
9150
AI大模型全栈工程师课程笔记 - RAG 检索增强生成
课程学习自 知乎知学堂 https://www.zhihu.com/education/learning
Michael阿明
2023/12/09
1.7K0
AI大模型全栈工程师课程笔记 - RAG 检索增强生成
Spark机器学习实战 (十二) - 推荐系统实战
将结合前述知识进行综合实战,以达到所学即所用。在推荐系统项目中,讲解了推荐系统基本原理以及实现推荐系统的架构思路,有其他相关研发经验基础的同学可以结合以往的经验,实现自己的推荐系统。
JavaEdge
2019/04/21
3.7K0
Spark机器学习实战 (十二) - 推荐系统实战
ElasticSearch 6.x 学习笔记:19.搜索高亮
参照官方文档 https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-request-highlighting.html
程裕强
2022/05/06
5910
spark杂记:movie recommendation using ALS
版权声明:本文为博主原创文章,未经博主允许不得转载。有问题可以加微信:lp9628(注明CSDN)。 https://blog.csdn.net/u014365862/article/details/88982729
MachineLP
2019/05/26
1K0
大数据技术之_24_电影推荐系统项目_06_项目体系架构设计 + 工具环境搭建 + 创建项目并初始化业务数据 + 离线推荐服务建设 + 实时推荐服务建设 + 基于内容的推荐服务建设
  用户可视化:主要负责实现和用户的交互以及业务数据的展示, 主体采用 AngularJS2 进行实现,部署在 Apache 服务上。(或者可以部署在 Nginx 上)   综合业务服务:主要实现 JavaEE 层面整体的业务逻辑,通过 Spring 进行构建,对接业务需求。部署在 Tomcat 上。 【数据存储部分】   业务数据库:项目采用广泛应用的文档数据库 MongDB 作为主数据库,主要负责平台业务逻辑数据的存储。   搜索服务器:项目采用 ElasticSearch 作为模糊检索服务器,通过利用 ES 强大的匹配查询能力实现基于内容的推荐服务。   缓存数据库:项目采用 Redis 作为缓存数据库,主要用来支撑实时推荐系统部分对于数据的高速获取需求。 【离线推荐部分】   离线统计服务:批处理统计性业务采用 Spark Core + Spark SQL 进行实现,实现对指标类数据的统计任务。   离线推荐服务:离线推荐业务采用 Spark Core + Spark MLlib 进行实现,采用 ALS 算法进行实现。   工作调度服务:对于离线推荐部分需要以一定的时间频率对算法进行调度,采用 Azkaban 进行任务的调度。 【实时推荐部分】   日志采集服务:通过利用 Flume-ng 对业务平台中用户对于电影的一次评分行为进行采集,实时发送到 Kafka 集群。   消息缓冲服务:项目采用 Kafka 作为流式数据的缓存组件,接受来自 Flume 的数据采集请求。并将数据推送到项目的实时推荐系统部分。   实时推荐服务:项目采用 Spark Streaming 作为实时推荐系统,通过接收 Kafka 中缓存的数据,通过设计的推荐算法实现对实时推荐的数据处理,并将结果合并更新到 MongoDB 数据库。
黑泽君
2019/05/23
5.5K0
大数据技术之_24_电影推荐系统项目_06_项目体系架构设计 + 工具环境搭建 + 创建项目并初始化业务数据 + 离线推荐服务建设 + 实时推荐服务建设 + 基于内容的推荐服务建设
「最佳实践」腾讯云 Elasticsearch 8.13.3 向量混合检索
本文描述问题及解决方法同样适用于 腾讯云 Elasticsearch Service(ES)。
岳涛
2025/03/08
6850
「最佳实践」腾讯云 Elasticsearch 8.13.3 向量混合检索
实现基于内部文档的ChatBot
大群口嗨一时爽,不得不为公司HR做了一个基于内部文档的ChatBot。大概花了2周的个人业余时间,算起来有2个工作日。Open AI ChatGPT Key缺乏、网络延迟以及Open LLM性能不佳的问题,索性不使用LLM进行搜索关联文档后输出优化。
flavorfan
2023/07/21
1K1
实现基于内部文档的ChatBot
基于大数据框架的协同过滤算法餐饮推荐系统【Update2023-11-05】
首先这位作者的推荐系统给了我很大的构思启发。 Github地址:https://github.com/share23/Food_Recommender 他的系统采用实时大数据技术组件,具体有Spark Streaming,HDFS分布式存储,Hbase存储计算,消息队列采用Kafka,Flume,其中的餐饮数据是用python生成,加上linux的contab模拟流式数据。推荐模块使用ALS算法加评分。 他的系统架构和技术组件选用给了我很大帮助,包括系统业务逻辑代码,让我顺利完成了我的毕业设计。
火之高兴
2024/07/25
4260
Elasticsearch 命令操作小全
dynamic 可以分为动态映射(dynamic mapping)和静态(显式)映射(explicit mapping)和精确(严格)映射(strict mappings),具体由dynamic属性控制。
用户2825413
2020/04/15
6390
Elasticsearch 命令操作小全
使用Elasticsearch、Spark构建推荐系统 #1:概述及环境构建
推荐系统是机器学习当前最著名、最广泛使用,且已经证明价值的落地案例。尽管有许多资源可用作训练推荐模型的基础,但解释如何实际部署这些模型来创建大型推荐系统的资源仍然相对较少。
flavorfan
2022/03/18
3.7K0
使用Elasticsearch、Spark构建推荐系统 #1:概述及环境构建
Spark机器学习实战 (十二) - 推荐系统实战
将结合前述知识进行综合实战,以达到所学即所用。在推荐系统项目中,讲解了推荐系统基本原理以及实现推荐系统的架构思路,有其他相关研发经验基础的同学可以结合以往的经验,实现自己的推荐系统。
JavaEdge
2022/11/30
1.9K0
Spark机器学习实战 (十二) - 推荐系统实战
ElasticSearch基础入门学习笔记
本笔记的内容主要是在从0开始学习ElasticSearch中,按照官方文档以及自己的一些测试的过程。
Ryan-Miao
2020/02/24
6060
推荐阅读
相关推荐
【机器学习】从电影数据集到推荐系统
更多 >
交个朋友
加入[数据] 腾讯云技术交流站
获取数据实战干货 共享技术经验心得
加入数据技术工作实战群
获取实战干货 交流技术经验
加入[数据库] 腾讯云官方技术交流站
数据库问题秒解答 分享实践经验
换一批
领券
一站式MCP教程库,解锁AI应用新玩法
涵盖代码开发、场景应用、自动测试全流程,助你从零构建专属AI助手
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档