第04课:基于内容的推荐(工程示例)

在第03课中了解了基于内容推荐的算法逻辑,以及我们的示例计算逻辑之后,可以直接通过工程示例来进一步学习。

工程逻辑

Spark 2.0 之后,不用再构建 Sparkcontext 了,以创建一个复合多功能的 SparkSession 替代,可以正常的从 HDFS 读取文件,也可以从 Hive 中获取 DataFrame 等。

val sparkSession = SparkSession
  .builder()
  .appName("base-content-Recommand") //spark任务名称
  .enableHiveSupport()
  .getOrCreate()

那三个表可以先 load 到 Hive 中,然后 Spark 直接从 Hive 中读取,形成 DataFrame。

//从hive中,获取rating评分数据集,最终形成如下格式数据(movie,avg_rate)
val movieAvgRate = sparkSession.sql("select movieid,round(avg(rate),1) as avg_rate  from tx.tx_ratings group by movieid").rdd.map{
  f=>
    (f.get(0),f.get(1))
}
//获取电影的基本属性数据,包括电影id,名称,以及genre类别
val moviesData = sparkSession.sql("select movieid,title,genre from tx.tx_movies").rdd
//获取电影tags数据,这里取到所有的电影tag
val tagsData = sparkSession.sql("select movieid,tag from tx.tx_tags").rdd

先对 tag 进行处理,很多 tag 其实说的是同一个东西,我们需要进行一定程度上的合并,这样才能让 tag 更加的合理(读到这里可能有读者有意见了,只是一个实验案例而已,搞这么复杂),举个简单例子,blood、bloods、bloody 其实都是想说这个电影很血腥暴力,但是不同的人使用的词不同的(这点大伙儿可以自由查看实验数据),所以我们需要进行一定程度上的合并。

val tagsStandardizeTmp = tagsStandardize.collect()
val tagsSimi = tagsStandardize.map{
  f=>
    var retTag = f._2
    if (f._2.toString.split(" ").size == 1) {
      var simiTmp = ""
      val tagsTmpStand = tagsStandardizeTmp
                    .filter(_._2.toString.split(" ").size != 1 )
                    .filter(f._2.toString.size < _._2.toString.size)
                    .sortBy(_._2.toString.size)
      var x = 0
    val loop = new Breaks
      tagsTmpStand.map{
        tagTmp=>
          val flag = getEditSize(f._2.toString,tagTmp._2.toString)
          if (flag == 1){
            retTag = tagTmp._2
            loop.break()
          }
      }
      ((f._1,retTag),1)
    } else {
      ((f._1,f._2),1)
    }
}

其中,getEditSize 是计算两个词的编辑距离,当编辑距离在一定阈值的时候,进行两个词的合并,具体逻辑见代码。

def getEditSize(str1:String,str2:String): Int ={
  if (str2.size > str1.size){
    0
  } else {
    //计数器
    var count = 0
    val loop = new Breaks
    //以较短的str2为中心,进行遍历,并逐个比较字符
    val lengthStr2 = str2.getBytes().length
    var i = 0
    for ( i <- 1 to lengthStr2 ){
      if (str2.getBytes()(i) == str1.getBytes()(i)) {
        //逐个匹配字节,相等则计数器+1
        count += 1
      } else {
        //一旦出现前缀不一致则中断循环,开始计算重叠度
        loop.break()
      }
    }
    //计算重叠度,当前缀重叠度大于等于2/7时,进行字符串合并,从长的往短的合并
    if (count.asInstanceOf[Double]/str1.getBytes().size.asInstanceOf[Double] >= (1-0.286)){
      1
    }else{
      0
    }
  }
}

继续对 tag 进行处理,统计 tag 频度,取 TopN 个作为电影对应的 tag 属性。

val movieTag = tagsSimi.reduceByKey(_+_).groupBy(k=>k._1._1).map{
  f=>
    (f._1,f._2.map{
      ff=>
        (ff._1._2,ff._2)
    }.toList.sortBy(_._2).reverse.take(10).toMap)
}

接下来处理年龄、年份和名称,这个会简单点儿,进行分词处理的话,怎么简单怎么来了,直接使用第三方的 HanLP 进行关键词抽取作为分词结果,直接屏蔽了停用词。

val moviesGenresTitleYear = moviesData.map{
  f=>
    val movieid = f.get(0)
    val title = f.get(1)
    val genres = f.get(2).toString.split("|").toList.take(10)
    val titleWorlds = HanLP.extractKeyword(title.toString, 10).toList
    val year = movieYearRegex.movieYearReg(title.toString)
    (movieid,(genres,titleWorlds,year))
}

取年份的正则函数如下,是个 Java 写的精通工具类(Scala 和 Java 混写,简直无比美妙)。

package utils;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
 * Desc: 抽取年份公式
 */
public class movieYearRegex {
    private  static String moduleType = ".* \\(([1-9][0-9][0-9][0-9])\\).*";
    public static void main(String[] args){
        System.out.println(movieYearReg("GoldenEye (1995)"));
    }
    public static int movieYearReg(String str){
        int retYear = 1994;
        Pattern patternType = Pattern.compile(moduleType);
        Matcher matcherType = patternType.matcher(str);
        while (matcherType.find()) {
            retYear = Integer.parseInt(matcherType.group(1));
        }
        return retYear;
    }
}

通过 join 进行数据合并,生成一个以电影 id 为核心的属性集合。

val movieContent = movieTag.join(movieAvgRate).join(moviesGenresTitleYear).map{
  f=>
    //(movie,tagList,titleList,year,genreList,rate)
    (f._1,f._2._1._1,f._2._2._2,f._2._2._3,f._2._2._1,f._2._1._2)
}

相似计算开始之前,还记得我们之前说的吗,可以进行候选集阉割,我们先根据一些规则裁剪一下候选集。

val movieContentBase = movieContent.map{
  f=>
    val currentMoiveId = f._1
    val currentTagList = f._2  //[(tag,score)]
    val currentTitleWorldList = f._3
    val currentYear = f._4
    val currentGenreList = f._5
    val currentRate = f._6.asInstanceOf[java.math.BigDecimal].doubleValue()
    val recommandMovies = movieConetentTmp.map{
      ff=>
        val tagSimi = getCosTags(currentTagList,ff._2)
        val titleSimi = getCosList(currentTitleWorldList,ff._3)
        val genreSimi = getCosList(currentGenreList,ff._5)
        val yearSimi = getYearSimi(currentYear,ff._4)
        val rateSimi = getRateSimi(ff._6.asInstanceOf[java.math.BigDecimal].doubleValue())
        val score = 0.4*genreSimi + 0.25*tagSimi + 0.1*yearSimi + 0.05*titleSimi + 0.2*rateSimi
        (ff._1,score)
    }.toList.sortBy(k=>k._2).reverse.take(20)
    (currentMoiveId,recommandMovies)
}.flatMap(f=>f._2.map(k=>(f._1,k._1,k._2))).map(f=>Row(f._1,f._2,f._3))

最后,将结果存入 Hive 中,Hive 中提前建好结果表。

//我们先进行DataFrame格式化申明
val schemaString2 = "movieid movieid_recommand score"
val schemaContentBase = StructType(schemaString2.split(" ")
  .map(fieldName=>StructField(fieldName,if (fieldName.equals("score")) DoubleType else  StringType,true)))
val movieContentBaseDataFrame = sparkSession.createDataFrame(movieContentBase,schemaContentBase)
//将结果存入hive,需要先进行临时表创建
val userTagTmpTableName = "mite_content_base_tmp"
val userTagTableName = "mite8.mite_content_base_reco"
movieContentBaseDataFrame.registerTempTable(userTagTmpTableName)
sparkSession.sql("insert into table " + userTagTableName + " select * from " + userTagTmpTableName)

到这里,基本大的代码逻辑就完了,可能还有一些边边角角的代码遗漏了,但不妨碍主干了。

在下一篇中,我们将进入到下一个常见推荐算法的了解,同样是“最最最”系列,最最最经典的推荐算法,基于协同过滤的推荐算法。

学习路径同样先进行理论了解,然后进行示例的计算逻辑解析,最后再配合工程代码来进一步学习,下一篇见。

(全文完)

(转载本站文章请注明作者和出处 第04课:基于内容的推荐(工程示例)