本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.
一个典型的机器学习过程从数据收集开始,要经历多个步骤,才能得到需要的输出。这非常类似于流水线式工作,即通常会包含源数据ETL(抽取、转化、加载),数据预处理,指标提取,模型训练与交叉验证,新数据预测等步骤。
在介绍工作流之前,我们先来了解几个重要概念:
DataFrame:使用Spark SQL中的DataFrame作为数据集,它可以容纳各种数据类型。
较之 RDD,包含了 schema 信息,更类似传统数据库中的二维表格。
它被 ML Pipeline 用来存储源数据。例如,DataFrame中的列可以是存储的文本,特征向量,真实标签和预测的标签等。
Transformer:翻译成转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。
比如 一个模型就是一个 Transformer
。它可以把 一个不包含预测标签的测试数据集 DataFrame 打上标签,转化成另一个包含预测标签的 DataFrame。
技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame。
Estimator:翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。
在 Pipeline 里通常是被用来操作 DataFrame 数据并生产一个 Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。如一个随机森林算法就是一个 Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。
Parameter:Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和估计器可共享用于指定参数的公共API。
ParamMap是一组(参数,值)对。
PipeLine:翻译为工作流或者管道。工作流将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。
工作流如何工作
要构建一个 Pipeline工作流,首先需要定义 Pipeline 中的各个工作流阶段PipelineStage,(包括转换器和评估器),比如指标提取和转换模型训练等。有了这些处理特定问题的转换器和 评估器,就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。
引用
build.sbt
name := "SparkDemo01" version := "1.0-SNAPSHOT" scalaVersion := "2.12.15" idePackagePrefix := Some("org.example") val sparkVersion = "3.1.3" // 将阿里云仓库做为默认仓库 externalResolvers := List("my repositories" at "https://maven.aliyun.com/nexus/content/groups/public/") libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion, "org.apache.spark" %% "spark-mllib" % sparkVersion, )
DataFrame
创建
使用class
case class Rating(userId: Int, movieId: Int, rating: Float) def parseRating(str: String): Rating = { val fields = str.split("::") assert(fields.size == 3) Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat) } val rdds = spark .sparkContext .textFile("file:///D:\\spark_study\\movie.txt") .map(parseRating) val ratings = spark.createDataFrame(rdds)
使用StructType
val fields = Array(StructField("name", StringType, nullable = true), StructField("age", StringType, nullable = true)) val schema = StructType(fields) val list = Array("xiaoming,10", "xiaohong,12", "xiaogang,18") val rdd = spark.sparkContext.parallelize(list) val rowRDD = rdd.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim)) val peopleDF = spark.createDataFrame(rowRDD, schema) peopleDF.show(false)
其中
val list = Array("xiaoming,10", "xiaohong,12", "xiaogang,18") val rdd = spark.sparkContext.parallelize(list)
也可以从文件中读取
val rdd = spark.sparkContext.textFile("file:///D:\\spark_study\\movie.txt")
toDF
val testData = spark.createDataFrame(Seq( ("xiaoming", 16), ("xiaohong", 18) )).toDF("name", "age") testData.show(false)
保存文件
ratings.select("userId", "movieId", "rating") .write.format("csv") .save("file:///D:\\spark_study\\movie2")
注意
保存的路径传的是文件夹路径,不是文件的具体路径
日志配置
Logger.getLogger("org").setLevel(Level.ERROR)
简单机器计算示例-单词分析
分析句中是否包含spark
package org.example import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.sql.{Row, SparkSession} object ML01 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder(). master("local"). appName("ML01"). getOrCreate() // DataFrame val training = spark.createDataFrame(Seq( (0L, "a b spark c", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0) )).toDF("id", "text", "label") // 分词器 val tokenizer = new Tokenizer(). setInputCol("text"). setOutputCol("words") // Transformer val hashingTF = new HashingTF(). setNumFeatures(1000). setInputCol(tokenizer.getOutputCol). setOutputCol("features") val lr = new LogisticRegression(). setMaxIter(10). setRegParam(0.01) // Estimator val pipeline = new Pipeline(). setStages(Array(tokenizer, hashingTF, lr)) val model = pipeline.fit(training) // DataFrame val testData = spark.createDataFrame(Seq( (4L, "i j k spark"), (5L, "l m n"), (6L, "spark a"), (7L, "apache hadoop") )).toDF("id", "text") model.transform(testData). select("id", "text", "probability", "prediction"). collect(). foreach { case Row(id: Long, text: String, probability, prediction: Double) => println(s"($id, $text) --> probability=$probability, prediction=$prediction") } } }
运行结果
(4, i j k spark) –> probability=[0.3966631509161168,0.6033368490838832], prediction=1.0 (5, l m n) –> probability=[0.8763117923754867,0.12368820762451327], prediction=0.0 (6, spark a) –> probability=[0.09195287547527402,0.908047124524726], prediction=1.0 (7, apache hadoop) –> probability=[0.9583167817017002,0.041683218298299796], prediction=0.0
特征抽取
TF-IDF (HashingTF and IDF)
“词频-逆向文件频率”(TF-IDF)是一种在文本挖掘中广泛使用的特征向量化方法,它可以体现一个文档中词语在语料库中的重要程度。
在Spark ML库中,TF-IDF被分成两部分:TF (+hashing) 和 IDF。
TF: HashingTF 是一个Transformer,在文本处理中,接收词条的集合然后把这些集合转化成固定长度的特征向量。这个算法在哈希的同时会统计各个词条的词频。
IDF: IDF是一个Estimator,在一个数据集上应用它的fit()方法,产生一个IDFModel。 该IDFModel 接收特征向量(由HashingTF产生),然后计算每一个词在文档中出现的频次。IDF会减少那些在语料库中出现频率较高的词的权重。
查看词频
package org.example import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer} import org.apache.spark.sql.{Row, SparkSession} object ML02 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder(). master("local"). appName("ML02"). getOrCreate() // DataFrame val sentenceData = spark.createDataFrame(Seq( ("i love you you love me", 0), ("mi xue bing cheng tian mi mi", 0), ("i love you you like me", 1) )).toDF("text", "label") // 分词器 val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words") val wordsData = tokenizer.transform(sentenceData) // 打印分词后的结果 wordsData.show(false) // Transformer val hashingTF = new HashingTF(). setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(2000) val featurizedData = hashingTF.transform(wordsData) featurizedData.select("words", "rawFeatures").show(false) // Estimator val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") val idfModel = idf.fit(featurizedData) val rescaledData = idfModel.transform(featurizedData) rescaledData.select("features", "label").take(3).foreach(println) } }
我们可以看到打印
最终结果
[(2000,[240,338,369,1756],[0.5753641449035617,0.5753641449035617,0.28768207245178085,0.28768207245178085]),0] [(2000,[183,395,951,1024,1295],[0.6931471805599453,2.0794415416798357,0.6931471805599453,0.6931471805599453,0.6931471805599453]),0] [(2000,[240,338,369,1330,1756],[0.28768207245178085,0.5753641449035617,0.28768207245178085,0.6931471805599453,0.28768207245178085]),1]
Word2Vec
词意向量 单词的词义越接近 值也会越接近
package org.example import org.apache.spark.ml.feature.Word2Vec import org.apache.spark.sql.SparkSession object ML03 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder(). master("local"). appName("ML03"). getOrCreate() // DataFrame val documentDF = spark.createDataFrame(Seq( "i love you you love me".split(" "), "mi xue bing cheng tian mi mi".split(" "), "i love you you like me".split(" ") ).map(Tuple1.apply)).toDF("text") documentDF.show(false) val word2Vec = new Word2Vec(). setInputCol("text"). setOutputCol("result"). setVectorSize(3). setMinCount(0) val model = word2Vec.fit(documentDF) val result = model.transform(documentDF) result.show(false) result.select("result").take(3).foreach(println) } }
我们可以看到结果
单词计数
package org.example import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel} import org.apache.spark.sql.SparkSession object ML04 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder(). master("local"). appName("ML04"). getOrCreate() val df = spark.createDataFrame(Seq( (0, "i love you you love me".split(" ")), (1, "i love you you like me".split(" ")) )).toDF("id", "words") val cvModel: CountVectorizerModel = new CountVectorizer(). setInputCol("words"). setOutputCol("features"). setVocabSize(3). setMinDF(2). fit(df) cvModel.transform(df).show(false) val cvm = new CountVectorizerModel(Array("i", "like", "you")). setInputCol("words"). setOutputCol("features") cvm.transform(df).select("words", "features").show(false) } }
结果
最终取”i”, “like”, “you”的单词计数
特征变换–标签和索引的转化
StringIndexer
package org.example import org.apache.spark.ml.feature.StringIndexer import org.apache.spark.sql.SparkSession object ML05 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder(). master("local"). appName("ML05"). getOrCreate() val df1 = spark .createDataFrame( Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")) ) .toDF("id", "category") val indexer = new StringIndexer(). setInputCol("category"). setOutputCol("categoryIndex") val model = indexer.fit(df1) val indexed1 = model.transform(df1) indexed1.show() val df2 = spark .createDataFrame( Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a")) ) .toDF("id", "category") val indexed = model.transform(df2) indexed.show() val indexed2 = model.setHandleInvalid("skip").transform(df2) indexed2.show() } }
IndexToString
package org.example import org.apache.spark.ml.feature.{IndexToString, StringIndexer} import org.apache.spark.sql.SparkSession object ML06 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder(). master("local"). appName("ML06"). getOrCreate() val df = spark.createDataFrame(Seq( (0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c") )).toDF("id", "category") val model = new StringIndexer(). setInputCol("category"). setOutputCol("categoryIndex"). fit(df) val indexed = model.transform(df) indexed.show(false) val converter = new IndexToString(). setInputCol("categoryIndex"). setOutputCol("originalCategory") val converted = converter.transform(indexed) converted.select("id", "categoryIndex", "originalCategory").show() } }
VectorIndexer
之前介绍的 StringIndexer
是针对单个类别型特征进行转换,倘若所有特征都已经被组织在一个向量中,又想对其中某些单个分量进行处理时,Spark ML提供了 VectorIndexer
类来解决向量数据集中的类别性特征转换。
通过为其提供 maxCategories
超参数,它可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引。它基于不同特征值的数量来识别哪些特征需要被类别化,那些取值可能性最多不超过 maxCategories
的特征需要会被认为是类别型的。
在下面的例子中,我们读入一个数据集,然后使用 VectorIndexer
训练出模型,来决定哪些特征需要被作为类别特征,将类别特征转换为索引,这里设置 maxCategories
为2,即只有种类小于2的特征才被认为是类别型特征,否则被认为是连续型特征:
package org.example import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession object ML07 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder(). master("local"). appName("ML07"). getOrCreate() val data = Seq( Vectors.dense(-1.0, 1.0, 1.0), Vectors.dense(-1.0, 3.0, 1.0), Vectors.dense(0.0, 5.0, 1.0)) val df = spark .createDataFrame(data.map(Tuple1.apply)) .toDF("features") val indexer = new VectorIndexer(). setInputCol("features"). setOutputCol("indexed"). setMaxCategories(2) val indexerModel = indexer.fit(df) val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet println(s"Chose ${categoricalFeatures.size} categorical features: " + categoricalFeatures.mkString(", ")) val indexed = indexerModel.transform(df) indexed.show() } }
可以看到,0号特征只有-1,0两种取值,分别被映射成0,1,而2号特征只有1种取值,被映射成0。
特征选取–卡方选择器
特征选择(Feature Selection)指的是在特征向量中选择出那些“优秀”的特征,组成新的、更“精简”的特征向量的过程。它在高维数据分析中十分常用,可以剔除掉“冗余”和“无关”的特征,提升学习器的性能。
特征选择方法和分类方法一样,也主要分为有监督(Supervised)和无监督(Unsupervised)两种,卡方选择则是统计学上常用的一种有监督特征选择方法,它通过对特征和真实标签之间进行卡方检验,来判断该特征和真实标签的关联程度,进而确定是否对其进行选择。
和ML库中的大多数学习方法一样,ML中的卡方选择也是以 estimator
+ transformer
的形式出现的,其主要由 ChiSqSelector
和 ChiSqSelectorModel
两个类来实现。
package org.example import org.apache.spark.ml.feature.{ChiSqSelector, UnivariateFeatureSelector} import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession object ML08 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder(). master("local"). appName("ML08"). getOrCreate() val df = spark .createDataFrame(Seq( (1, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1), (2, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0), (3, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0) )) .toDF("id", "features", "label") val selector = new ChiSqSelector(). setNumTopFeatures(1). setFeaturesCol("features"). setLabelCol("label"). setOutputCol("selected-feature") val selector_model = selector.fit(df) val result = selector_model.transform(df) result.show(false) } }
现在,用卡方选择进行特征选择器的训练,为了观察地更明显,我们设置只选择和标签关联性最强的一个特征(可以通过 setNumTopFeatures(..)
方法进行设置)
结果
协同过滤算法
概念
协同过滤是一种基于一组兴趣相同的用户或项目进行的推荐,它根据邻居用户(与目标用户兴趣相似的用户)的偏好信息产生对目标用户的推荐列表。
示例
我们有这样的一个数据 分别为 用户ID::电影ID::用户对电影的评分
0::1::3.0 0::2::1.0 0::3::2.0 0::4::4.0 1::1::3.0 1::2::1.0 1::3::2.0 1::4::4.0 2::1::3.0 2::2::1.0 2::3::2.0 2::4::4.0 3::1::3.0 3::2::1.0 3::3::2.0 3::4::4.0
我们现在要根据原有的数据进行训练来预测其他用户的打分
package org.example import org.apache.log4j.{Level, Logger} import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.recommendation.ALS import org.apache.spark.sql.SparkSession object ML09 { case class Rating(userId: Int, movieId: Int, rating: Float) def parseRating(str: String): Rating = { val fields = str.split("::") assert(fields.size == 3) Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat) } def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val spark = SparkSession.builder(). master("local"). appName("ML08"). getOrCreate() val rdds = spark .sparkContext .textFile("file:///D:\\spark_study\\movie.txt") .map(parseRating) val ratings = spark.createDataFrame(rdds) println("全部数据") ratings.show(false) val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2)) println("训练数据") training.show(false) println("测试数据") test.show(false) // 显性反馈 val alsExplicit = new ALS() .setMaxIter(5) .setRegParam(0.01) .setUserCol("userId") .setItemCol("movieId") .setRatingCol("rating") // 隐性反馈 val alsImplicit = new ALS() .setMaxIter(5) .setRegParam(0.01) .setImplicitPrefs(true) .setUserCol("userId") .setItemCol("movieId") .setRatingCol("rating") val modelExplicit = alsExplicit.fit(training) val modelImplicit = alsImplicit.fit(training) val predictionsExplicit = modelExplicit.transform(test) val predictionsImplicit = modelImplicit.transform(test) println("显性反馈") predictionsExplicit.show() println("隐式反馈") predictionsImplicit.show() val evaluator = new RegressionEvaluator() .setMetricName("rmse") .setLabelCol("rating") .setPredictionCol("prediction") val rmseExplicit = evaluator.evaluate(predictionsExplicit) val rmseImplicit = evaluator.evaluate(predictionsImplicit) println(s"显式反馈偏差 = $rmseExplicit") println(s"隐性反馈偏差 = $rmseImplicit") } }
结果
隐性反馈 vs 显性反馈
显性反馈行为包括用户明确表示对物品喜好的行为,
隐性反馈行为指的是那些不能明确反应用户喜好的行为。
在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈,例如页面游览,点击,购买,喜欢,分享等等。
基于矩阵分解的协同过滤的标准方法,一般将用户商品矩阵中的元素作为用户对商品的显性偏好。
本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分,而是与所观察到的用户偏好强度关联起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。
参数说明
在 ML 中的实现有如下的参数:
numBlocks
是用于并行化计算的用户和商品的分块个数 (默认为10)。rank
是模型中隐语义因子的个数(默认为10)。maxIter
是迭代的次数(默认为10)。regParam
是ALS的正则化参数(默认为1.0)。implicitPrefs
决定了是用显性反馈ALS的版本还是用使用隐性反馈数据集的版本(默认是false,即用显性反馈)。alpha
是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准(默认为1.0)。
nonnegative
决定是否对最小二乘法使用非负的限制(默认为false)。
可以调整这些参数,不断优化结果,使均方差变小。比如:imaxIter越大,regParam越 小,均方差会越小,推荐结果较优。
错误
我在提交了一个mllib的als推荐算法,提示:
22/05/25 15:25:21 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 22/05/25 15:25:21 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 22/05/25 15:25:22 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 22/05/25 15:25:22 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
这个问题是因为als是一个分布式算法,在本地执行时失败,在–master yarn模式下执行正常
这个错误不影响执行。
Be First to Comment