val conf=new SparkConf().setMaster("local").setAppName("CF-user") val sc=new SparkContext(conf) val rawData=sc.textFile("E:\\mllib\\ml-100k\\ml-100k\\u.data", 1) val rawRatings=rawData.map { _.split("\t").take(3) }
//一个由rating模型构成的RDD val ratings=rawRatings.map { caseArray(user,movie,rating)=> Rating(user.toInt,movie.toInt,rating.toDouble) }
输出:Rating(196,242,3.0) 用户ID 电影ID 评分的Rating类
训练推荐模型
1 2
//返回一个MaxrixFactorizationModel对象 ,用户因子和物品因子保存在(id,factor)RDD中,分别为model.userFeatures,model.productFeatures val model=ALS.train(ratings, 50, 10,0.01)
使用推荐模型
用户推荐
指定给用户推荐物品,通过模型求出用户可能喜好程度最高的前K个商品
1 2
//预测789用户对123电影的评分 val predictedRating=model.predict(789, 123)
生成电影推荐
1 2 3 4 5
//给789用户推荐前10个物品 val userId=789 val K=10 val topKrecs=model.recommendProducts(userId, K) print(topKrecs.mkString("\t"))
检验推荐内容
1 2 3 4 5 6 7 8 9 10 11
val movies=sc.textFile("E:\\mllib\\ml-100k\\ml-100k\\u.item", 1) val titles=movies.map { line => line.split("\\|").take(2) }.map { array => (array(0).toInt,array(1) )}.collectAsMap()
val moviesForUser=ratings.keyBy { _.user }.lookup(789) println(moviesForUser.size)
val itemID=567 val itemFactor=model.productFeatures.lookup(itemID).head val itemVector=new DoubleMatrix(itemFactor) cos(itemVector,itemVector)
//求各个商品的相似度 val sims=model.productFeatures.map{ case (id,factor)=> val factorVector=new DoubleMatrix(factor) val sim=cos(factorVector,itemVector) (id,sim)
} //取出与物品567相似的前10个物品 val K=10 val sortedSims=sims.top(K)(Ordering.by[(Int,Double),Double]{ case(id,similarity) => similarity} ) println(sortedSims.take(10).mkString("\n"))
检查相似度
1 2 3 4 5 6
//检验推荐内容 val movies=sc.textFile("E:\\mllib\\ml-100k\\ml-100k\\u.item", 1) val titles=movies.map { line => line.split("\\|").take(2) }.map { array => (array(0).toInt,array(1) )}.collectAsMap() val sortedSim=sims.top(K+1)(Ordering.by[(Int,Double),Double] { case (id,similarity) => similarity }) sortedSim.slice(1, 11).map{case (id,sim)=>(titles(id),sim)}.mkString("\n").foreach { print }
评估函数
均方差
1 2 3 4 5 6 7 8 9 10 11 12
val userProducts=ratings.map { caseRating(user,product,rating)=> (user,product) } val predictions=model.predict(userProducts).map { caseRating(user,product,rating)=> ((user,product),rating) }
val ratingAndprodict=ratings.map { caseRating(user,product,rating)=> ((user,product),rating)}.join(predictions) import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictANDtrue=ratingAndprodict.map{case ((user,product),(actual,predicted))=>(predicted,actual)} //需要一个键值对类型的RDD ,一个(预测值,实际值)的集合 val regressionMetrics=new RegressionMetrics(predictANDtrue)
//得到物品因子向量 val itemFactors=model.productFeatures.map{ case(id,factor)=>factor}.collect() val itemMatrix=new DoubleMatrix(itemFactors) val imBroadcast=sc.broadcast(itemMatrix) //得到用户因子向量 val allRecs=model.userFeatures.map{case(id,array)=> val userVector=new DoubleMatrix(array) val scores=imBroadcast.value.mmul(userVector)//用户因子和电影因子矩阵乘积 val sortedWithid=scores.data.zipWithIndex.sortBy(-_._1) val recommenedIds=sortedWithid.map(_._2+1).toSeq (userId,recommenedIds) //用户ID ,电影ID } val userMovies=ratings.map { caseRating(user,product,rating)=> (user,product) }.groupBy(_._1) val predictedANDTrueForRanking=allRecs.join(userMovies).map{ case (userId,(predicted,actuallwithid))=> val autual=actuallwithid.map(_._2) (predicted.toArray,autual.toArray)
} import org.apache.spark.mllib.evaluation.RankingMetrics val rankingMetrics=new RankingMetrics(predictedANDTrueForRanking) println(rankingMetrics.meanAveragePrecision)