Press "Enter" to skip to content

scala智能化菜品推荐建立推荐模型

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

 

§能力目标

 

 

    1. 能够以基于用户的协同过滤算法建模

 

    1. 能够对基于物品的协同过滤算法建模

 

    1. 能够对基于Spark ALS的协同过滤算法建模(拓展)

 

 

文章目录

1.以基于用户的协同过滤算法建模
3.寻找与目标用户最近邻的K个用户
4.通过这K个用户进行推荐
二、以基于物品的协同过滤算法建模
1.打包执行以上两个创建推荐模型的程序
三、以基于Spark ALS算法建模

提示:以下是本篇文章正文内容,下面案例可供参考

 

做此篇文章的项目请自行写完上一篇我的博客文章项目(链接放下面了!!)

 

Scala智能化菜品推荐数据预处理(点击这里)

 

一、推荐算法的选择

 

协同过滤算法,包括多种可实现的算法。

 

本例将选用3种不同的协同过滤算法,分别来建立推荐模型,最后进行综合评估。协同过滤算法包括:

(1) 基于用户的协同过滤算法。
(2) 基于物品的协同过滤算法。
(3) 基于Spark ALS的协同过滤算法。

训练集、验证集、测试集保存路径
trainDataPath: /data/MealRatings/trainRatings
validateDataPath:/data/MealRatings/validateRatings
testDataPath:/data/MealRatings/testRatings

 

1.以基于用户的协同过滤算法建模

 

基于用户的协同过滤,就是通过不同用户对物品的评分来评测用户之间的相似性,搜索目标用户的最近邻用户,然后根据最近邻用户对物品的评分向目标用户进行推荐。其具体实现过程可以描述如下。

 

2.计算相似度

 

用户之间的相似度通过每个用户对物品的评分向量计算得到。相似度的计算可以使用任何向量相似度计算公式,常用的相似度计算公式有Jaccard公式、余弦相似度公式、欧式距离公式等

 

3.寻找与目标用户最近邻的K个用户

 

在计算出各个用户之间的相似度后,可以找到所有与目标用户的相似度大于某一阈值的近邻用户(初步的粗略过滤),然后对这些用户按照相似度值进行排序,得到前K个近邻用户。

 

4.通过这K个用户进行推荐

 

在获得K个近邻用户后,怎幺推荐呢?这里的方式有多种。比如使用相似度和所有K个用户的物品对应加权进行推荐。

 

参考上述算法原理,以Spark编程来逐步实现。(1)首先加载训练数据集,为减小用户评分过于稀疏的可能影响,在此可以使用“单用户评价过的最小物品数”对其进行过滤。(2)然后根据用户对物品的评分向量获得用户相似度.(3)最后匹配训练集数据生成推荐模型。具体过程的实现代码如下所示:

 

package cn.mealdata
import org.apache.spark.{
 SparkConf, SparkContext}
import scala.math._
object UserBaseModelCreate {
 
  def main(args: Array[String]): Unit = {
 
    if (args.length != 5)  {
 
      System.err.println("Usage: cn.mealdata.UserBasedModelCreate <trainDataPath> <modelPath> "+
      "<minItemsRatedPerUser> <recommendItemNum><splitter>")
    }
    val trainDataPath = args(0)
    val modelPath=args(1)
    val minItemsRatedPerUser=args(2).toInt
    val recommendItemNum=args(3).toInt
    val splitter=args(4)
    val appName=" UserBased CF Create Model "
    val conf=new SparkConf().setAppName("appName")
    val sc=new SparkContext()
    val trainDataRaw=sc.textFile(trainDataPath).map{
 
      x=>val fields=x.slice(1,x.size-1).split(splitter);
        (fields(0).toInt,fields(1).toInt,fields(2).toDouble)}
    val trainDataFilered=trainDataRaw.groupBy(_._1).
      filter(data=>data._2.toList.size >= minItemsRatedPerUser).flatMap(_._2)
    val trainUserItemRating=trainDataFilered.map{
  case (user,item,rating)=>(user,(item,rating))}
    val trainUserRating=trainDataFilered.map{
 case (user,item,rating)=>(user,rating)}.groupByKey().map{
 
      x=>(x._1,x._2.reduce(_ + _)/x._2.count(x=>true))}
    val userItemBase=trainUserItemRating.join(trainUserRating).map(x=>(x._1,x._2._1._1,x._2._1._2,x._2._2))
    val itemUserBase=userItemBase.map(x=>(x._2,(x._1,x._3,x._4)))
    val itemMatrix=itemUserBase.join(itemUserBase).filter((f=>f._2._1._1 < f._2._2._1))
    println("itemMatrix records count : "+ itemMatrix.count)
    val userSimilarityBase = itemMatrix.map(
      f=>((f._2._1._1,f._2._2._1),(f._2._1._2,f._2._1._3,f._2._2._2,f._2._2._3)))
    val userSimilarityPre = userSimilarityBase.map(data=>{
 
      val user1=data._1._1
      val user2=data._1._2
      val similarity = (min(data._2._1,data._2._3))/(data._2._2+data._2._4)
      ((user1,user2),similarity)
    }).combineByKey(
      x=>x,
      (x:Double,y:Double)=>(x+y),
      (x:Double,y:Double)=>(x+y)).cache()
    val userSimilarity1=userSimilarityPre.map(x=>(x._1._1,(x._1._2,x._2)))
    val userSimilarity2=userSimilarityPre.map(x=>(x._1._2,(x._1._1,x._2)))
    val statisticsPre1=trainUserItemRating.map(x=>(x._1,x._2._1)).join(userSimilarity1).
      map(x=>(x._2._2._1,(x._2._1,x._2._2._2))).cache()
    val statisticsPre2=trainUserItemRating.map(x=>(x._1,x._2._1)).join(userSimilarity2).
      map(x=>(x._2._2._1,(x._2._1,x._2._2._2))).cache()
    val statistics=statisticsPre1.union(statisticsPre2).combineByKey(
      (x:(Int,Double)) =>List(x),
      (c:List[(Int,Double)],x:(Int,Double))=>c:+x,
      (c1:List[(Int,Double)],c2:List[(Int,Double)])=>c1:::c2).cache()
    val dataModel=statistics.
      map(data=>{
 
        val key=data._1;
        val value=data._2.sortWith(_._2>_._2);
        if (value.size>recommendItemNum){
 
          (key,value.slice(0,recommendItemNum))
        } else{
 
          (key,value)
        }
      }).map(x=>(x._1,x._2.map(x=>x._1)))
    println("Model records count : " + dataModel.count)
    dataModel.repartition(6).saveAsObjectFile(modelPath)
    println("Model saved")
    sc.stop()
  }
}

 

将用户相似度模型与训练集数据进行匹配,获得每个用户的K个可推荐物品列表,也可以视为推荐结果集。将推荐结果集存储在HDFS上,后续将进行模型评价

 

二、以基于物品的协同过滤算法建模

 

基于物品的协同过滤推荐算法,其基本思想是用户对物品的预测评分可以由该用户对与该物品相似度最高的K个邻居物品的评分通过加权平均计算得到,如下图所示,对物品1感兴趣的用户也都对物品2n感兴趣,因此物品1和物品2
n的相似度较高,它们属于相似物品,而用户t目前对物品2-n感兴趣,但还没发现物品1,因此可将物品1推荐给用户t。

 

参考上述算法原理,以Spark 编程来逐步实现。

 

(1)首先加载训练数据集,为减小用户评分过于稀疏的可能影响,在此可以使用“单用户评价过的最小物品数”对其进行过滤。

 

(2)然后根据用户对物品的评分向量获得物品相似度。

 

(3)最后匹配训练集数据生成推荐模型。

 

具体实现过程如下所示。

 

package cn.mealdata
import  org.apache.spark.{
 SparkContext,SparkConf}
import  scala.math._
object ItemBasedModelCreate {
 
  def main(args: Array[String]): Unit = {
 
      if(args.length !=5){
 
        System.err.println("Usage:com.tipdm.ItemBasedModelCreate <trainDataPath> <minRatedNumPerUser>" +
          "<recommendItemNum> <modelPath> <splitter>")
      }
      val  trainDataPath = args(0)
      val minRatedNumPerUser=args(2).toInt
      val recommendIteNum=args(2).toInt
      val  modelPath =args(3)
      val splitter=args(4)
      val appName=" ItemBased CF Create Model"
      val  conf=new SparkConf().setAppName("appName")
      val  sc=new SparkContext()
      val trainData=sc.textFile(trainDataPath).map{
 x=> val fields=x.slice(1,x.size-1).split(splitter);
      (fields(0).toInt,fields(1).toInt,fields(2).toDouble)}
      val trainDataFiltered =trainData.groupBy(_._1).
        filter(data=>data._2.toList.size >= minRatedNumPerUser).flatMap(_._2).cache()
      val trainUserItemRating =trainData.map{
  case (user,item,rating) => (item ,(user,rating))}
      val trainItemRating =trainData.map{
 case (user,item,rating) => (item,rating)}.groupByKey().map{
 
        x=> (x._1,x._2.reduce(_+_)/x._2.count(x=>true))
      }
      val itemUserBase=trainUserItemRating.join(trainItemRating).
        map(x=>(x._2._1._1,(x._1,x._2._1._2,x._2._2))).cache()
      val itemMatrix=itemUserBase.join(itemUserBase).filter((f => f._2._1._1 < f._2._2._1))
      val  itemSimilarityBase = itemMatrix.
        map(f => ((f._2._1._1,f._2._2._1),(f._2._1._2,f._2._1._3,f._2._2._2,f._2._2._3)))
      val  itemSimilarityPre =itemSimilarityBase.map(data => {
 
        val item1=data._1._1
        val item2=data._1._2
        val similarity =(min(data._2._1,data._2._3))*1.0/(data._2._2+data._2._4)
        ((item1,item2),similarity)
      }).combineByKey(
        x=>x,
        (x:Double,y:Double) => (x+y),
        (x:Double,y:Double) => (x+y))
      val itemSimilarity=itemSimilarityPre.map(x=> ((x._1._2,x._1._1),x._2)).union(itemSimilarityPre).
        map(x =>(x._1._1,(x._1._2,x._2)))
      println("itemSimilarity  records count: "+itemSimilarity.count)
      val dataModelPre = itemSimilarity.combineByKey(
        (x:(Int,Double)) => List(x),
        (c:List[(Int,Double)],x:(Int,Double)) => c:+x,
        (c1:List[(Int,Double)],c2:List[(Int,Double)]) => c1:::c2)
      val  dataModel =trainDataFiltered.map(x=>(x._2,x._1)).join(dataModelPre)
      val recommendModel =dataModel.flatMap(joined => {
 
        joined._2._2.map(f => (joined._2._1,f._1,f._2))}).sortBy( x=>
        (x._1,x._3),ascending = false).
        map(x=> (x._1,x._2)).
        combineByKey(
          (x:Int) => List(x),
          (c:List[Int],x:Int) => c:+x,
          (c1:List[Int],c2:List[Int])=> c1:::c2).map(x =>
            (x._1,x._2.take(recommendIteNum)))
          println("Recommend Model count: " + recommendModel.count)
    recommendModel.repartition(numPartitions=6).saveAsObjectFile(modelPath)
    sc.stop()
  }
}

 

通过计算获得物品之间的相似度模型,与训练集数据进行匹配,获得每个用户的可推荐物品列表,将推荐结果集存储到HDFS上

 

1.打包执行以上两个创建推荐模型的程序

 

1.1第一个程序

 

[[email protected] mealRating]# /opt/spark/bin/spark-submit --master local --class cn.mealdata.UserBasedModelCreate /root/mealRating/HadoopSpark3-1.0-SNAPSHOT.jar /data/MealRatings/trainRatings/* /data/UserBasedModel/ 2 2 ,

 

说明:

 

(1) /opt/spark/bin/spark-submit 提交命令

 

(2) –master local 程序运行方式

 

(3) –class cn.mealdata.UserBasedModelCreate 程序在项目工程中的引用路径(可以直接在项目中拷贝

 

Copy -Copy Reference)

 

(4) /root/mealRating/HadoopSpark3-1.0-SNAPSHOT.jar

 

打包后的jar包程序,这里已经上传到Linux中/root/mealRating/目录

 

(5) /data/MealRatings/trainRatings/* 训练集路径

 

(6) /data/UserBasedModel/ 模型存储路径

 

(7) 2 用户评价次数过滤条件为2次(>=2次)

 

(8) 2 单个用户的最大推荐物品数量

 

(9) , 训练集中原始数据的分隔符

 

1.2 查看结果

 

[[email protected] ~]# hdfs dfs -ls /data/UserBasedModel/
Found 7 items
-rw-r--r--   1 root supergroup          0 2022-04-12 11:47 /data/UserBasedModel/_SUCCESS
-rw-r--r--   1 root supergroup      64816 2022-04-12 11:47 /data/UserBasedModel/part-00000
-rw-r--r--   1 root supergroup      64806 2022-04-12 11:47 /data/UserBasedModel/part-00001
-rw-r--r--   1 root supergroup      64859 2022-04-12 11:47 /data/UserBasedModel/part-00002
-rw-r--r--   1 root supergroup      64912 2022-04-12 11:47 /data/UserBasedModel/part-00003
-rw-r--r--   1 root supergroup      64854 2022-04-12 11:47 /data/UserBasedModel/part-00004
-rw-r--r--   1 root supergroup      64897 2022-04-12 11:47 /data/UserBasedModel/part-00005

 

1.3 第二个程序

 

[[email protected] mealRating]# /opt/spark/bin/spark-submit --master local --class cn.mealdata.ItemBasedModelCreate /root/mealRating/HadoopSpark3-1.0-SNAPSHOT.jar /data/MealRatings/trainRatings/* 2 2  /data/ItemBasedModel/  ,

 

说明:

 

(1) /opt/spark/bin/spark-submit 提交命令

 

(2) –master local 程序运行方式

 

(3) –class cn.mealdata.ItemBasedModelCreate 程序在项目工程中的引用路径(可以直接在项目中拷贝

 

Copy -Copy Reference)

 

(4) /root/mealRating/HadoopSpark3-1.0-SNAPSHOT.jar

 

打包后的jar包程序,这里已经上传到Linux中/root/mealRating/目录

 

(5) /data/MealRatings/trainRatings/* 训练集路

 

(6) 2 用户评价次数过滤条件为2次(>=2次)

 

(7) 2 单个用户的最大推荐物品数量

 

(8) , 训练集中原始数据的分隔符

 

(9) /data/ItemBasedModel/ 模型存储路径

 

1.4 查看结果

 

[[email protected] ~]# hdfs dfs -ls /data/ItemBasedModel/
Found 7 items
-rw-r--r--   1 root supergroup          0 2022-04-12 12:09 /data/ItemBasedModel/_SUCCESS
-rw-r--r--   1 root supergroup      64811 2022-04-12 12:09 /data/ItemBasedModel/part-00000
-rw-r--r--   1 root supergroup      64811 2022-04-12 12:09 /data/ItemBasedModel/part-00001
-rw-r--r--   1 root supergroup      64864 2022-04-12 12:09 /data/ItemBasedModel/part-00002
-rw-r--r--   1 root supergroup      64907 2022-04-12 12:09 /data/ItemBasedModel/part-00003
-rw-r--r--   1 root supergroup      64864 2022-04-12 12:09 /data/ItemBasedModel/part-00004
-rw-r--r--   1 root supergroup      64902 2022-04-12 12:09 /data/ItemBasedModel/part-00005

 

三、以基于Spark ALS算法建模

 

1.ALS推荐算法

 

即Alternating Least Squares的写法,为交替最小二乘法,该方法常用于基于矩阵分解的推荐系统中。

 

它将用户(user)对物品(item)的评分矩阵分解为两个矩阵:一个是用户对物品隐含特征(指的是使用这些隐含的特征可以较好地表示这个用户的评价体系)的偏好矩阵,另一个是物品所包含的隐含特征(指的是使用这些隐含的特征可以较好地表示这个物品)的矩阵。

 

在这个矩阵分解的过程中,评分缺失项会被填充,也就是说可以基于这个填充的评分来对用户没有评价过的商品进行排序,最终获得预测填充评分最高的多个物品,然后对用户进行推荐。

 

2.Spark ALS算法

 

Spark的MLlib中已经包含了ALS算法包,开发者可以直接调用它,设置相关技术参数来建模。

 

Spark ALS算法存在于org.apache.spark.mllib.recommendation包中,该包共有三个类。

 

Rating:是用户、项目和评分的三元组(user,product,rating)

 

ALS:ALS提供了求解带偏置矩阵分解的交替最小二乘算法

 

MatrixFactorizationModel:ALS求解矩阵返回的结果类型,即算法返回的模型类

 

3.以Spark ALS算法建模

 

直接调用Spark ALS算法的train方法来进行建模

 

Spark ALS train算法API

 

def train(ratings:RDD[Rating],rank:Int,iterations:Int,lambda:Double):MatrixFactorizationModel
Train a matrix factorization model given an RDD of ratings by users for a subset of products. The ratings matrix is approximated as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, ALS is run iteratively with a level of parallelism automatically based on the number of partitions in ratings.
ratings:RDD of Rating objects with userID, productID, and rating
rank:number of features to use
iterations:number of iterations of ALS(recommended: 10-20)
lambda:regularization parameter (recommended: 0.01)

 

4.建模时的参数寻优

 

使用Spark ALS建模型需要输入多个参数,不同组的参数对于模型的准确度影响较大,因此有必要在参数范围中寻找一组最优的参数。

 

使用训练集数据,输入不同组建模参数进行建模,再利用模型计算验证集中模型预测评分及实际评分的均方根误差RMSE,取RMSE值最小的那组参数为最优参数。

 

在获得最优的参数组之后,使用这些参数来建模,执行Spark ALS中的train方法。

 

Spark ALS建模前的参数寻优的实现过程代码如下所示:

 

package cn.mealdata
import org.apache.spark.mllib.recommendation.{
 MatrixFactorizationModel, ALS, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.{
 SparkContext, SparkConf}
/**
 * 为创建ALS推荐模型寻找最优参数组,需要输入以下参数
 * trainDataPath: 训练数据的路径
 * validateDataPath: 训练数据的路径
 * listRank: 可选的用户项目子矩阵的阶值列表
 * listIteration: 可选的循环次数值列表
 * listLambda: 可选的防止过拟合参数值列表
 * paraOutputPath:最优参数组存储目录
 * splitter:输入原始数据分隔符
 */object ALSModelOptimize {
 
  val appName = "Optimize ALS Model Parameters"
  val conf = new SparkConf().setAppName(appName)
  val sc = new SparkContext(conf)
  sc.setLogLevel("WARN")
  def main(args: Array[String]) = {
 
    if (args.length != 7) {
 
      System.err.println("Usage:ALSModelOptimize requires: 7 input fields ")
    }
    // 匹配输入参数
    val trainDataPath = args(0)
    val validateDataPath = args(1)
    val listRank = args(2).split(",").map(_.toInt)
    val listIteration = args(3).split(",").map(_.toInt)
    val listLambda = args(4).split(",").map(_.toDouble)
    val paraOutputPath = args(5)
    val splitter = args(6)
    // 定义计算均方根误差的函数:computeRMSE
    def computeRMSE(model:MatrixFactorizationModel, data:RDD[Rating]): Double = {
 
      val usersProducts = data.map(x=>(x.user,x.product))
      val ratingsAndPredictions = data.map{
 case Rating(user,product,rating)=>((user,product),rating)
      }.join(model.predict(usersProducts).map{
 case Rating(user,product,rating)=>((user,product),rating)}).values ;
      math.sqrt(ratingsAndPredictions.map(x=>(x._1-x._2)*(x._1-x._2)).mean())}
    // 加载训练集数据
    val trainData = sc.textFile(trainDataPath).map{
 x=>val fields=x.slice(1,x.size-1).split(splitter);
      (fields(0).toInt,fields(1).toInt,fields(2).toDouble)}
    val trainDataRating= trainData.map(x=>Rating(x._1,x._2,x._3))
    // 加载验证集数据
    val validateData = sc.textFile(validateDataPath).map{
 x=>val fields=x.slice(1,x.size-1).split(splitter);
      (fields(0).toInt,fields(1).toInt,fields(2).toDouble)}
    val validateDataRating= validateData.map(x=>Rating(x._1,x._2,x._3))
    // 初始化最优参数,取极端值
    var bestRMSE = Double.MaxValue
    var bestRank = -10
    var bestIteration = -10
    var bestLambda = -1.0
    // 参数寻优
    for( rank<- listRank;lambda<-listLambda;iter<-listIteration) {
 
      val model = ALS.train(trainDataRating,rank,iter,lambda);
      val validationRMSE = computeRMSE(model,validateDataRating);
      if(validationRMSE<bestRMSE){
 
        bestRMSE=validationRMSE;
        bestRank=rank;
        bestLambda=lambda;
        bestIteration=iter}
    }
    // 输出最优参数组
    println("BestRank:Iteration:BestLambda => BestRMSE")
    println(bestRank + ": " + bestIteration + ": " + bestLambda + " => " + bestRMSE)
    val result = Array(bestRank + "," + bestIteration + "," + bestLambda)
    sc.parallelize(result).repartition(1).saveAsTextFile(paraOutputPath)
    sc.stop()
  }
}

 

在获得最优的参数组之后,使用这些参数来建模,执行Sspark ALS中的train方法,具体“以最优参数组建立模型”代码如下:

 

package cn.mealdata
import org.apache.spark.{
 SparkContext, SparkConf}
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
/**
 * 创建ALS模型,需要输入以下参数
 * trainDataPath: 输入原始数据,包含(用户,项目,评分)
 * modelPath:模型存储目录
 * rank : 用户、项目子矩阵
 * iteration: 循环次数;
 * lambda : 防止过拟合参数;
 * splitter: 输入原始数据分隔符;
 */object ALSModelCreate {
 
  val appName = "Create ALS Model "
  val conf = new SparkConf().setAppName(appName)
  val sc = new SparkContext(conf)
  sc.setLogLevel("WARN")
  def main(args: Array[String]) = {
 
    if (args.length != 6) {
 
      System.err.println("Usage: ALSModelCreate requires: 6 input fields <trainDataPath> <modelPath>  " +
        "<rank> <iteration> <lambda> <splitter>")
    }
    // 匹配输入参数
    val trainDataPath = args(0)
    val modelPath = args(1)
    val rank = args(2).toInt
    val iteration = args(3).toInt
    val lambda = args(4).toDouble
    val splitter = args(5)
    // 加载训练集数据
    val trainData = sc.textFile(trainDataPath).map{
 x=>val fields=x.slice(1,x.size-1).split(splitter);
      (fields(0).toInt,fields(1).toInt,fields(2).toDouble)}
    val trainDataRating= trainData.map(x=>Rating(x._1,x._2,x._3))
    // 建立ALS模型
    val model = ALS.train(trainDataRating, rank, iteration, lambda)
    // 存储ALS模型
    model.save(sc,modelPath)
    println("Model saved")
    sc.stop()
  }
}

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注