Spark-推荐引擎

推荐引擎

基于内容的过滤

利用物品的内容或是属性信息以及某些相似度定义,来求出与该物品类似的物品,这些属性通常是文本内容(标题,名称,标签,其他元数据)

协同过滤

利用大量已有的用户偏好来估计用户对其未接触过的物品的喜好程度。

矩阵分解

显式矩阵分解

当要处理的数据是由用户所提供的自身的偏好数据,包括物品的评级,赞,喜欢等用户对物品的评价
找到用户-物品矩阵近似的低阶矩阵,UI=Uk和I*k的因子矩阵运算。

隐式矩阵分解

用户对物品的偏好不会直接给出,而是隐含在用户和物品的交互中。
用户电影评级矩阵=P*C P为用户是否看过某些电影(二元偏好矩阵) C表示观看的次数(信心权重)

最小二乘法

ALS一种求解矩阵分解问题的最优化方法,是MLlIB唯一实现的求解方法。
迭代式的求解一系列最小二乘回归问题。

1
2
3
4
ALS.train(ratings, rank, iterations,lambda)
rank:对应的ALS模型的因子个数,低阶矩阵中隐含特征个数,一般越多越好,合理取值10200
iterations:运行的迭代次数 10次左右
lambda:控制正则化过程,值越高,正则化越厉害

案例

显式评级数据提取有效特征

1
2
3
4
5
6
7
8
9
10
11
12
val conf=new SparkConf().setMaster("local").setAppName("CF-user")
val sc=new SparkContext(conf)
val rawData=sc.textFile("E:\\mllib\\ml-100k\\ml-100k\\u.data", 1)
val rawRatings=rawData.map { _.split("\t").take(3) }

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

//一个由rating模型构成的RDD
val ratings=rawRatings.map { case Array(user,movie,rating) => Rating(user.toInt,movie.toInt,rating.toDouble) }

输出:Rating(196,242,3.0) 用户ID 电影ID 评分的Rating类

训练推荐模型

1
2
//返回一个MaxrixFactorizationModel对象 ,用户因子和物品因子保存在(id,factor)RDD中,分别为model.userFeatures,model.productFeatures 
val model=ALS.train(ratings, 50, 10,0.01)

使用推荐模型

用户推荐

指定给用户推荐物品,通过模型求出用户可能喜好程度最高的前K个商品

1
2
//预测789用户对123电影的评分
val predictedRating=model.predict(789, 123)

生成电影推荐
1
2
3
4
5
//给789用户推荐前10个物品
val userId=789
val K=10
val topKrecs=model.recommendProducts(userId, K)
print(topKrecs.mkString("\t"))
检验推荐内容
1
2
3
4
5
6
7
8
9
10
11
val movies=sc.textFile("E:\\mllib\\ml-100k\\ml-100k\\u.item", 1)
val titles=movies.map { line => line.split("\\|").take(2) }.map { array => (array(0).toInt,array(1) )}.collectAsMap()

val moviesForUser=ratings.keyBy { _.user }.lookup(789)
println(moviesForUser.size)

moviesForUser.sortBy(-_.rating).take(10).map { rating => (titles(rating.product),rating.rating) }.foreach(println)

println("=======")
//对比
topKrecs.map { rating => (titles(rating.product),rating.rating) }.foreach(println)

物品推荐

给定一个物品,找到相似度最高的物品与之对应
常用方法:1.皮尔森相关系数 2.余弦相似度 3.杰卡德相似系数

生成相似电影
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.jblas.DoubleMatrix
//使用余弦相似度来测定相似度
val aMatrix=new DoubleMatrix(Array(1.0,2.0,3.0))

//定义函数计算俩个向量之间的余弦相似度
def cos(vec1:DoubleMatrix,vec2:DoubleMatrix):Double={
vec1.dot(vec2)/(vec1.norm2()*vec2.norm2())
}

val itemID=567
val itemFactor=model.productFeatures.lookup(itemID).head
val itemVector=new DoubleMatrix(itemFactor)
cos(itemVector,itemVector)

//求各个商品的相似度
val sims=model.productFeatures.map{ case (id,factor)=>
val factorVector=new DoubleMatrix(factor)
val sim=cos(factorVector,itemVector)
(id,sim)

}
//取出与物品567相似的前10个物品
val K=10
val sortedSims=sims.top(K)(Ordering.by[(Int,Double),Double]{
case(id,similarity)
=> similarity} )

println(sortedSims.take(10).mkString("\n"))
检查相似度
1
2
3
4
5
6

//检验推荐内容
val movies=sc.textFile("E:\\mllib\\ml-100k\\ml-100k\\u.item", 1)
val titles=movies.map { line => line.split("\\|").take(2) }.map { array => (array(0).toInt,array(1) )}.collectAsMap()
val sortedSim=sims.top(K+1)(Ordering.by[(Int,Double),Double] { case (id,similarity) => similarity })
sortedSim.slice(1, 11).map{case (id,sim)=>(titles(id),sim)}.mkString("\n").foreach { print }

评估函数

均方差
1
2
3
4
5
6
7
8
9
10
11
12
val userProducts=ratings.map { case Rating(user,product,rating) => (user,product) }
val predictions=model.predict(userProducts).map { case Rating(user,product,rating) => ((user,product),rating) }

val ratingAndprodict=ratings.map { case Rating(user,product,rating) => ((user,product),rating)}.join(predictions)
import org.apache.spark.mllib.evaluation.RegressionMetrics

val predictANDtrue=ratingAndprodict.map{case ((user,product),(actual,predicted))=>(predicted,actual)}
//需要一个键值对类型的RDD ,一个(预测值,实际值)的集合
val regressionMetrics=new RegressionMetrics(predictANDtrue)

println(regressionMetrics.meanSquaredError)
println(regressionMetrics.rootMeanSquaredError)
KMAP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//得到物品因子向量
val itemFactors=model.productFeatures.map{ case(id,factor)=>factor}.collect()
val itemMatrix=new DoubleMatrix(itemFactors)
val imBroadcast=sc.broadcast(itemMatrix)

//得到用户因子向量
val allRecs=model.userFeatures.map{case(id,array)=>
val userVector=new DoubleMatrix(array)
val scores=imBroadcast.value.mmul(userVector)//用户因子和电影因子矩阵乘积
val sortedWithid=scores.data.zipWithIndex.sortBy(-_._1)
val recommenedIds=sortedWithid.map(_._2+1).toSeq
(userId,recommenedIds) //用户ID ,电影ID
}
val userMovies=ratings.map { case Rating(user,product,rating) => (user,product) }.groupBy(_._1)
val predictedANDTrueForRanking=allRecs.join(userMovies).map{
case (userId,(predicted,actuallwithid))=>
val autual=actuallwithid.map(_._2)
(predicted.toArray,autual.toArray)

}
import org.apache.spark.mllib.evaluation.RankingMetrics
val rankingMetrics=new RankingMetrics(predictedANDTrueForRanking)
println(rankingMetrics.meanAveragePrecision)