第08课:协同推荐(工程示例)

接上个章节,由于本系列不是专门针对算法进行详解,而是针对整个推荐系统进行知识体系构建,所以,基于协同的推荐,我们这里只做基于模型的推荐的工程示例讲解。

截止 Spark 2.X 系列,Spark 的 MlLib 只实现了基于矩阵分解的协同(也就是经典的基于 ALS 协同过滤),没有实现更常规的基于物品或者基于用户的协同过滤。

但从上面的原理我们知道,其实基于物品基于用户的协同核心就在于构建基础向量矩阵以及计算相似的两个方面。由于 MlLib 实现了算法模型,所以从敲代码的维度上来说,代码量反而会远远低于基于用户、基于物品的协同,甚至会少于之前的基于物品相似或者基于用户画像的推荐了。

顺带说一句,基于 ALS 的推荐代码,其实网上很容易找,算法 MlLib 中的经典算法了,很多人都实现了,不过万变不离其宗。

数据源说明

老规矩,如果依然是上面两个算法工程示例中所用到的数据源,基于上个章节对协同的原理认知,我们这里只用数据源中的评分数据,共 100 万条电影评分记录来做协同过滤的基础数据。

工程示例

先 Hive 数据表中,将 rating 评分数据取出来(当然,如果你的机子跑不动,就 limit 一下简单取些数,以跑通模型为目的)。

val ratingDataOrc = sparkSession.sql("select userid,movieid,rate,timestame  from mite8.mite_ratings limit 50000")

将取出的评分数据,以时间构建 Key-value 键值对,形成(Int,Ratings)格式的数据,其实这是一个中间处理过程,方便后续的数据输入。

val ratings = ratingDataOrc.rdd.map(f =>
  (java.lang.Long.parseLong(f.get(3).toString)%10,
  Rating(java.lang.Integer.parseInt(f.get(0).toString),
    java.lang.Integer.parseInt(f.get(1).toString),
    f.get(2).asInstanceOf[java.math.BigDecimal].doubleValue())))

这里,鉴于计算能力,我就不进行全局用户的候选集推荐计算了,只拿 ID=1 的用户当成实验,获取 ID=1 的用户候选推荐列表,先取该用户的行为数据。

val personalRatingsData = ratingDataOrc.where("userid = 1").rdd.map{
  f=>
    Rating(java.lang.Integer.parseInt(f.get(0).toString),
      java.lang.Integer.parseInt(f.get(1).toString),
      f.get(2).asInstanceOf[java.math.BigDecimal].doubleValue())
}

基于上上面的 K-V 中间数据,我们以取余的方式,将数据分成 6:2:2 三个比例,分别进行模型训练、数据校验及结果测试。

val training = ratings.filter(x => x._1 < 6).values
  .union(personalRatingsData).repartition(numPartions).persist()
val validation = ratings.filter(x => x._1 >=6 && x._1 < 8).values
  .repartition(numPartions).persist()
val test = ratings.filter(x => x._1 > 8).values.persist()

ALS 的推荐效果评估,一般我们是以均方根差来离线衡量推荐的准确度,所以,这里涉及到了 ALS 参数调优的问题,我们通过数据来最终确定参数,并确定最终的 Model,分别取 ranks、lambdas、numIters 作为调优对象。

var count = 0
//进行三层循环遍历,找最佳的Rmse值,对应的model
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
  val model = ALS.train(training, rank, numIter, lambda)
  //计算均根方差值,传入的是model以及校验数据
  val validationRmse = computeRmse(model, validation, numValidation)
  count += 1
  //选取最佳值,均方根误差越小越OK
  if (validationRmse < bestValidationRmse) {
    bestModel = Some(model)
    bestValidationRmse = validationRmse
    bestLambda = lambda
    bestRank = rank
    bestNumIter = numIter
  }
}

基于上面最终选择的参数,输出 Model,我们基于这个模型,去做最后的推荐,注意需要去除 ID=1 的用户已经观看过的电影。

//推荐前十部最感兴趣的电影,注意需要剔除该用户(userid=1)已经评分的电影,即去重
val myRatedMovieIds = personalRatingsData.map(f=>f.product).collect().toSet
val candidates = movies.keys.filter(!myRatedMovieIds.contains(_))
//为用户1推荐十部movies,我们只做用户ID=1的推荐
val candRDD: RDD[(Int, Int)] = candidates.map((1, _))
val recommendations:RDD[Rating] = bestModel.get.predict(candRDD)
val recommendations_ = recommendations.collect().sortBy(-_.rating).take(20)

存储推荐的结果,主要 Row 需要先进行格式化。

//结果存储用户1的推荐结果
val alsBaseReDataFrame = sparkSession.sparkContext
  .parallelize(recommendations_.map(f=> (f.user,f.product,f.rating)))
  .map(f=>Row(f._1,f._2,f._3))
//DataFrame格式化申明
val schemaString = "userid movieid score"
val schemaAlsBase = StructType(schemaString.split(" ")
  .map(fieldName=>StructField(fieldName,if (fieldName.equals("score")) DoubleType else  IntegerType,true)))
val movieAlsBaseDataFrame = sparkSession.createDataFrame(alsBaseReDataFrame,schemaAlsBase)
//将结果存入hive
val itemBaseReTmpTableName = "mite_alsbasetmp"
val itemBaseReTableName = "mite8.mite_als_base_re"
movieAlsBaseDataFrame.registerTempTable(itemBaseReTmpTableName)
sparkSession.sql("insert into table " + itemBaseReTableName + " select * from " + itemBaseReTmpTableName)

最后再补上求均方根差的函数。

def computeRmse(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double = {
  //调用model的predict预测方法,把预测数据初始化model中,并且生成预测rating
  val predictions:RDD[Rating] = model.predict((data.map(x => (x.user, x.product))))
  val dataTmp = data.map(x => ((x.user, x.product), x.rating))
  //通过join操作,把相同user-product的value合并成一个(double,double)元组,前者为预测值,后者为实际值
  val predictionsAndRatings = predictions.map{
    x => ((x.user, x.product), x.rating)
  }.join(dataTmp).values
  //均方根误差能够很好的反应出测量的精密度,对于偏离过大或者过小的测量值较为敏感
  //计算过程为观测值与真实值偏差的平方,除于观测次数n,然后再取平方根
  //reduce方法,执行的是值累加操作
  math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce( _ + _ )/n)
}

补充

至此,整个代码逻辑就结束了,其实不难发现,被框架封装的算法,其实使用起来更加的简单,如果抛开校验以及优化模型的过程,总共代码都没有几行。

这里大家可能对为什么协同能够发现新物品,而基于用户兴趣的画像推荐不能,原则上说基于画像会将思维局限于画像兴趣的偏好内,但兴趣本身就会升级的,这是通过历史的单个用户的行为所不能推测的。

而基于协同不一样,他一方面考虑的用户的历史行为,另一方面他参考了该用户的周围协同的行为,而对于大部分人来说,共有的行为轨迹其实很多时候能够一定程度上体现用户的自我认知,以及认知升级的过程,这意味着物品之间的关联性本身就通过共有的用户行为天然关联,而协同就是要挖掘这种潜在的关联性,这无关物品之间的属性差异。

所以,从这个维度上说,协同是容易产生惊喜推荐的一种机制。

最后

结合我们学习的原理会发现,实际上诸如 ALS 这种基于模型的协同,实际上其可解释性是比较弱的,所以在实际的过程中,很多人更愿意使用基于用户,甚至是基于物品的协同推荐。

从工程实现的角度说,如果要使用基于用户或者基于物品的协同,其开发的工作量会远大于基于 ALS 的算法实现。

基于篇幅这里就不再针对协同过滤的其他算法进行工程示例讲解,并且由于整个文章课程体系还是以构建推荐系统体系为目的,所以,如果对于其他两个协同算法的工程示例感兴趣的读者,请联系我。

在第09课将进行其他的一些推荐算法的综合了解,最终我们会发现,实际上在整个体系中,算法层实际上是很灵活的,不用局限于说使用固定哪些算法,只要能达到我们合理推荐的目的,实际上都是可行的。

然后,我们将进入到推荐系统整体架构的维度上,对推荐系统整个系统架构进行认知。如果说推荐算法是整个推荐系统的心脏,那么整个推荐架构就是整个推荐系统的骨骼,同样重要。

(全文完)

(转载本站文章请注明作者和出处 第08课:协同推荐(工程示例)