本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.
第1关:基于物品的推荐算法
给用户2推荐2个商品。利用spark.mllib中的矩阵计算库,构建用户与物品的打分矩阵,然后计算物品之间的相似分数,进行推荐。实现基于用户(User CF)的协同过滤算法。
import org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.linalg.SparseVector import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, IndexedRow, MatrixEntry, RowMatrix} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ItemBasedCF { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //读入数据 val conf = new SparkConf().setAppName("ItemBasedCFModel").setMaster("local") val sc = new SparkContext(conf) val data = sc.textFile("/root/data/als/ratingdata.txt") /*MatrixEntry代表一个分布式矩阵中的每一行(Entry) * 这里的每一项都是一个(i: Long, j: Long, value: Double) 指示行列值的元组tuple。 * 其中i是行坐标,j是列坐标,value是值。*/ val parseData: RDD[MatrixEntry] = data.map(_.split(",") match { case Array(user, item, rate) => MatrixEntry(user.toLong, item.toLong, rate.toDouble) }) //CoordinateMatrix是Spark MLLib中专门保存user_item_rating这种数据样本的 val ratings = new CoordinateMatrix(parseData) /* 由于CoordinateMatrix没有columnSimilarities方法,所以我们需要将其转换成RowMatrix矩阵,调用他的columnSimilarities计算其相似性 * RowMatrix的方法columnSimilarities是计算,列与列的相似度,现在是user_item_rating,与基于用户的CF不同的是,这里不需要进行矩阵的转置,直接就是物品的相似*/ val matrix: RowMatrix = ratings.toRowMatrix() //需求:为某一个用户推荐商品。基本的逻辑是:首先得到某个用户评价过(买过)的商品,然后计算其他商品与该商品的相似度,并排序;从高到低,把不在用户评价过 //商品里的其他商品推荐给用户。 //例如:为用户2推荐商品 //第一步:得到用户2评价过(买过)的商品 take(5)表示取出所有的5个用户 2:表示第二个用户 //解释:SparseVector:稀疏矩阵 val user2pred = matrix.rows.take(5)(2) val prefs: SparseVector = user2pred.asInstanceOf[SparseVector] val uitems = prefs.indices //得到了用户2评价过(买过)的商品的ID val ipi = (uitems zip prefs.values) //得到了用户2评价过(买过)的商品的ID和评分,即:(物品ID,评分) //计算物品的相似性,并输出 val similarities = matrix.columnSimilarities() val indexdsimilar = similarities.toIndexedRowMatrix().rows.map { case IndexedRow(idx, vector) => (idx.toInt, vector) } //ij表示:其他用户购买的商品与用户2购买的该商品的相似度 val ij = sc.parallelize(ipi).join(indexdsimilar).flatMap { case (i, (pi, vector: SparseVector)) => (vector.indices zip vector.values) } /********** begin **********/ //ij1表示:其他用户购买过,但不在用户2购买的商品的列表中的商品和评分 val ij1 = ij.filter { case (item, pref) => !uitems.contains(item) } //将这些商品的评分求和,并降序排列,并推荐前两个物品 val ij2 = ij1.reduceByKey(_ + _).sortBy(_._2, false).take(2) /********** end **********/ // crgjl //取消以下1行注释 for (id <- ij2) print(id._1 + " ") sc.stop() } }
第2关:基于用户的推荐算法
根据提示,在右侧编辑器补充代码.实现:找出与用户1最相似的2个用户。
import org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry, RowMatrix} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object UserBasedCF { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) // 创建一个SparkContext val conf = new SparkConf().setAppName("UserBasedCF").setMaster("local") val sc = new SparkContext(conf) // 读入数据 val data = sc.textFile("/root/data/als/ratingdata.txt") // 解析出评分矩阵的每一行 val parseData: RDD[MatrixEntry] = data.map(_.split(",") match { case Array(user, item, rate) => MatrixEntry(user.toLong, item.toLong, rate.toDouble) }) // 构建关联矩阵 val ratings = new CoordinateMatrix(parseData) // 转置矩阵以计算列(用户)的相似性 val matrix: RowMatrix = ratings.transpose().toRowMatrix() // 计算得到用户的相似度矩阵 val similarities = matrix.columnSimilarities() // 得到某个用户对所有物品的评分 val ratingOfUser1 = ratings.entries.filter(_.i == 1). map(x => (x.j, x.value)). sortBy(_._1). map(_._1). collect(). toList. toArray // 得到用户1相对于其他用户的相似性 val similarityOfUser1 = similarities.entries.filter(_.i == 1). sortBy(_.value, false). map(_.value). collect // 需求:为用户1推荐2个商品 // 思路:找到与用户1相似性最高的两个用户,将这两个用户评过分的物品,用户1没有评过分的物品推荐给用户1 /********** begin **********/ //找到与用户1相似性最高的两个用户 val similarityTopUser = similarities.entries.filter(_.i == 1). sortBy(_.value, false). map(x=>(x.j, x.value)). collect. take(2) //println("与用户1最相似的两个用户如下:") //取消以下2行注释 for (s <- similarityTopUser) print(s._1 + " ") for (s <- similarityTopUser) { // 找到这两个用户评过分的商品,与用户1没有评过分的物品 val userId = s._1 val ratingOfTemp = ratings.entries.filter(_.i == userId). map(x => (x.j, x.value)). sortBy(_._1). map(_._1). collect(). toList. toArray // 用户1与当前用户求差集 val dis = ratingOfTemp diff ratingOfUser1 //println("用户" + userId + "要推荐给用户1的商品id为") for (id <- dis) print(id + " ") } /********** end **********/ sc.stop() } }
第3关:基于ALS的推荐算法
根据提示,在右侧编辑器补充代码。创建一个ALS
模型,使用调用fit方法,使用training
训练生成model
。
import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.recommendation.ALS import org.apache.spark.sql.{DataFrame, SparkSession} object ALS { case class Rating(userId: Int, movieId: Int, rating: Float) def parseRating(str: String): Rating = { val fields = str.split(",") assert(fields.size == 3) Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat) } def main(args: Array[String]) { val spark = SparkSession .builder .master("local") .appName("ALS") .getOrCreate() import spark.implicits._ val ratings = spark.read.textFile("data/als/ratingdata.txt") .map(parseRating) .toDF() val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2)) /********** begin **********/ // Build the recommendation model using ALS on the training data val als = new ALS() .setMaxIter(5) .setRegParam(0.01) .setUserCol("userId") .setItemCol("movieId") .setRatingCol("rating") val model = als.fit(training) /********** end **********/ // "Evaluate the model by computing the RMSE on the test data" // "Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics" //取消以下3行注释 model.setColdStartStrategy("drop") val predictions = model.transform(test) evaluatingRMSE(predictions) spark.stop() } def evaluatingRMSE(predictions:DataFrame):Unit = { val evaluator = new RegressionEvaluator() .setMetricName("rmse") .setLabelCol("rating") .setPredictionCol("prediction") val rmse = evaluator.evaluate(predictions) if (rmse <= 2){ print(" " + "good") }else{ println() predictions.show(false) } } }
第4关:基于随机森林预测贷款风险
编写一个预测贷款风险的随机森林二分类模型。
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame,SparkSession} import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.StringIndexer import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.ml.{Pipeline, PipelineStage} import org.apache.spark.mllib.evaluation.RegressionMetrics object Credit { case class Credit( creditability: Double, balance: Double, duration: Double, history: Double, purpose: Double, amount: Double, savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double, residenceDuration: Double, assets: Double, age: Double, concCredit: Double, apartment: Double, credits: Double, occupation: Double, dependents: Double, hasPhone: Double, foreign: Double ) def parseCredit(line: Array[Double]): Credit = { Credit( line(0), line(1) - 1, line(2), line(3), line(4), line(5), line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1, line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1, line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1 ) } def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = { rdd.map(_.split(",")).map(_.map(_.toDouble)) } def evaluatingAUC(predictedResultDF:DataFrame, labelstring:String):Unit = { val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelstring).setRawPredictionCol("prediction") val predictionAUC = evaluator.setMetricName("areaUnderROC").evaluate(predictedResultDF) if(predictionAUC > 0.6){ print(" " + "good") }else{ print(s"areaUnderROC: $predictionAUC") } } def main(args: Array[String]) { val spark = SparkSession .builder .appName("Credit") .master("local") .getOrCreate() import spark.implicits._ val creditDF = parseRDD(spark.sparkContext.textFile("/root/data/germancredit.csv")).map(parseCredit).toDF() creditDF.createTempView("credit") val featureCols = Array("balance", "duration", "history", "purpose", "amount", "savings", "employment", "instPercent", "sexMarried", "guarantors", "residenceDuration", "assets", "age", "concCredit", "apartment", "credits", "occupation", "dependents", "hasPhone", "foreign") /********** begin **********/ // 合并特征列。 val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features") val df2 = assembler.transform(creditDF) /********** end **********///取消以下4行注释 val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label") val df3 = labelIndexer.fit(df2).transform(df2) val splitSeed = 5043 val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed) /********** begin **********///调用随机森林API,使用trainingData训练生成模型model val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(5).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043) val model = classifier.fit(trainingData) /********** end **********///取消以下2行注释 val predictions = model.transform(testData) evaluatingAUC(predictions,"label") spark.stop() } }
第5关:基于多层感知器的手机短信分类
编写一个短信文本分类的程序。使用Spark.ml中的多层感知器(MLP,Multi Layer Perceptron Classifier)API——MultilayerPerceptronClassifer。
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.MultilayerPerceptronClassifier import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, Word2Vec} import org.apache.spark.sql.{DataFrame,SparkSession} object SMSClassifier { final val VECTOR_SIZE = 100 def evaluatingAUC(predictedResultDF:DataFrame, labelcol: String):Unit = { val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelcol).setRawPredictionCol("prediction") val predictionAUC = evaluator.setMetricName("areaUnderROC").evaluate(predictedResultDF) if(predictionAUC > 0.8){ print(" " + "good") }else{ print(s"areaUnderROC: $predictionAUC") } } def main(args: Array[String]) { val spark = SparkSession .builder .master("local") .appName("SMS Message Classification (HAM or SPAM)") .getOrCreate() val parsedRDD = spark.sparkContext.textFile("data/smsspamcollection/SMSSpamCollection").map(_.split("\t")).map(eachRow => { (eachRow(0),eachRow(1).split(" ")) }) val msgDF = spark.createDataFrame(parsedRDD).toDF("label","message") val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("indexedLabel") .fit(msgDF) /********** begin **********/ val word2Vec = new Word2Vec() .setInputCol("message") .setOutputCol("features") .setVectorSize(VECTOR_SIZE) .setMinCount(1) /********** end **********/ val layers = Array[Int](VECTOR_SIZE,6,5,2) /********** begin **********/ val mlpc = new MultilayerPerceptronClassifier() .setLayers(layers) .setBlockSize(512) .setSeed(1234L) .setMaxIter(128) .setFeaturesCol("features") .setLabelCol("indexedLabel") .setPredictionCol("prediction") /********** end **********/ val labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("predictedLabel") .setLabels(labelIndexer.labels) val Array(trainingData, testData) = msgDF.randomSplit(Array(0.8, 0.2)) /********** begin **********/ val pipeline = new Pipeline().setStages(Array(labelIndexer,word2Vec,mlpc,labelConverter)) val model = pipeline.fit(trainingData) /********** end **********/ //取消以下两行注释 val predictionResultDF = model.transform(testData) evaluatingAUC(predictionResultDF,"indexedLabel") spark.stop() } }
Be First to Comment