Press "Enter" to skip to content

Spark ML的特征处理实战

本文来自OPPO互联网技术团队,转载请注名作者。同时欢迎关注我们的公众号:OPPO_tech,与你分享OPPO前沿互联网技术及活动。

 

一 、特征处理的意义

 

通常情况下,我们得到的数据中包含脏数据或者噪声。在模型训练前,需要对这些数据进行预处理,否则再好的模型也只能“garbage in,garbage out”。

 

数据预处理主要包括三部分,特征提取、特征转换和特征选择。

 

二、特征提取

 

特征提取一般指从原始数据中抽取特征的过程。

 

1. 计数向量器(Countvectorizer)

 

(1) 定义及用途:计数向量器将所有的文本词语进行编号,并统计该词语在文档中的词频作为特征向量。

 

(2) spark ml中代码实例

 

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val df = spark.createDataFrame(Seq(
      (0, Array("a","e","a","d","b")),
      (1, Array("a","c","b","d","c","f","a","b")),
      (2, Array("a","f"))
)).toDF("id", "words")    
var cv_model = new CountVectorizer().setInputCol("words").setOutputCol("features").setVocabSize(10).setMinDF(2).fit(df)
val cv1 = cv_model.transform(df)
cv1.show(false)

 

 

注意点:计数向量器会将所有数据整合到一起进行去重形成一张词表,通过 setVocabSize 和 setMinDF 这两个参数来确定是否进入词表。其中 setVocabSize 决定词表的长度,而 setMinDF 决定要在多少个不同的样本中出现才进入词表。上例中设置词表的长度为10,至少在两个样本中出现才会进入词表,则能进入词表的只有a,b,d,f。c和e仅在一条数据中出现所以并不会统计词频。

 

2. 词频-逆向文件频率(TF-IDF)

 

(1) 定义及用途:通俗的理解就是计算一个词区别一篇文档的程度。通过在一篇文档中的词频和该词在文档库中多少篇文档中出现综合来评估。仅通过词频来区分一篇文档是不合理的。

 

比如文档中会多次出现能代表通用含义的词,但是这些词对于文档的识别并无意义。我们需要的是一些特别的词,它出现的次数多,并且能在少数的文档中出现,这些词才能够识别文档。举个极端的例子,比如“我们”这个词可能出现在N多篇文档中然而并没用处。很多童鞋会说我们可以通过停用词去除掉这些词呀,对。而我说的就是这一类在停用词之外,出现范围很广但是并无识别用处的词。

 

(2) spark ml中代码实例

 

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
val wordsData = spark.createDataFrame(Seq(
      "传奇 游戏 战士".split(" "),
      "苹果 梨 香蕉".split(" "),
      "苹果 手机 流畅".split(" ")
    ).map(Tuple1.apply)).toDF("words")
wordsData.show(false)    
// step1 hashingTF
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(2000)
val featurizedData = hashingTF.transform(wordsData)
// step2 计算IDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("words","features").show(false)

 

 

注意:setNumFeatures是设置特征的长度。在这三条数据中,除了苹果外其他的词都仅出现一次,所以可以识别文档的价值都比较大。而苹果同时出现在两条数据中,所以能识别文档的价值被打压,变得很低。

 

3.词转向量(Word2Vec)

 

(1) 定义及用途:词转向量是把单词映射到向量空间中,通过一组向量来代表单词。通过计算向量的距离可以代表词的相似度。

 

(2) spark ml中代码实例

 

import org.apache.spark.ml.feature.Word2Vec
val documentDF = spark.createDataFrame(Seq(
      "传奇 游戏 战士".split(" "),
      "苹果 梨 香蕉".split(" "),
      "传奇 游戏 种类多".split(" "),
      "苹果 手机 流畅".split(" ")
    ).map(Tuple1.apply)).toDF("text")
val word2Vec = new Word2Vec().setInputCol("text").setOutputCol("result").setVectorSize(10).setMinCount(2)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.show(false)

 

 

注意:setVectorSize设置向量的长度。setMinCount设置词在样本中出现的最少次数。比如在上例中我们设置向量长度为10,至少在两条样本中出现才会转化向量。则满足条件的有”苹果”、“传奇””游戏”这三个词,所以第一条数据和第三条数据向量距离完全一样,因为“战士”和“种类多”均只出现一次,并不会用于转化成向量,如果setMinCount设置为1,那幺第一条和第三条的向量空间距离会很近,但不会完全一样,因为也会考虑“战士”和“种类多”这两个词。

 

三、特征转换

 

1. 连续型数据转换成离散数据

 

1.1 二值化(Binarizer)

 

 

    1. 定义及用途:根据阈值量将连续型数据转化成0-1特征的过程。

 

    1. 注意点:特征值大于阈值将映射为1.0,特征值小于等于阈值将映射为0.0;二值化输入inputCol 支持向量(Vector)和双精度(Double)类型。

 

 

1.2 离散化重组(Bucketizer)

 

 

    1. 定义及用途:根据分段规则将连续型数据转化成各自对应的分段区间内。

 

    1. spark ml中代码实例:

 

 

import org.apache.spark.ml.feature.Bucketizer
val data = Array(-8.0, -0.5, -0.3, 0.0, 0.2, 9.0)
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val bucketizer = new Bucketizer().setInputCol("features").setOutputCol("bucketedFeatures").setSplits(splits)
bucketizer.transform(dataFrame).show(false)

 

 

注意点:上例中第三行代码,分段规则制定为(负无穷,0.5),[-0.5,0),[0,0.5),[0.5,正负穷)四段。每个分段是左闭右开[a,b)方式。当不确定分裂的上下边界时,应当添加Double.NegativeInfinity和Double.PositiveInfinity以免越界。

 

1.3 分位数离散化(QuantileDiscretizer)

 

 

    1. 定义及用途:根据分位数规则将连续型数据转化成各自对应的分段内。

 

    1. spark ml中代码实例:

 

 

import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
var df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer().setInputCol("hour").setOutputCol("result").setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show()

 

 

注意点:setNumBuckets设置分位数分桶数量为3。则将hour数据分成3段。

 

2.字符串和索引相互转换

 

2.1 字符串-索引变换(StringIndexer)

 

 

    1. 定义及用途:将字符串特征转化成索引。很多模型训练过程中只接受数值特征,所以需要将字符串转换成数值从而进行训练。

 

    1. spark ml中代码实例:

 

 

import org.apache.spark.ml.feature.StringIndexer
val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")
val indexer = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex")
val indexed = indexer.fit(df).transform(df)
indexed.show(false)

 

 

注意:索引按照标签频率排序。最常见的标签索引0即代表频率最高的标签。在新数据集中可能会遇到新出现的字符串。比如训练集中只有a,b,c,在新的数据集中会有a,b,c,d。针对新出现的字符串d,有两种策略来处理。 第一种是抛出一个异常(默认情况下),第二种是通过掉用setHandleInvalid(“skip”)来彻底忽略包含这类标签的行。

 

2.2 索引-字符串(IndexToString)

 

定义及用途:一般是和上面的字符串-索引变换器配套使用。先通过字符串-索引变换器将字符串特征转换成数值类型特征,模型训练完成后通过索引-字符串转换器将数值特征还原成字符串特征。

 

3. 正则化(Normalizer)

 

 

    1. 定义及用途:正则化的作用范围是每一行数据,即每一条样本数据。将每一条数据通过计算p-范数进行规范化。正则化操作可以使输入数据标准化并提高后期学习算法的效果。

 

    1. spark ml中代码实例:

 

 

import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.{Vector,Vectors}
val data=Seq(Vectors.dense(-1,1,1,8,56),Vectors.dense(-1,3,-1,-9,88),Vectors.dense(0,5,1,10,96), Vectors.dense(0,5,1,11,589),Vectors.dense(0,5,1,11,688))
val df=spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")    
val normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures").setP(1.0)
normalizer.transform(df).show(false)

 

 

4. 规范化(StandardScaler)

 

 

    1. 定义及用途:规范化的作用范围是每一列数据,即每一维特征。标准化每个特征使得其有统一的标准差。一方面,同一个特征中不同的样本的取值可能会相差非常大,一些异常小或异常大的数据会误导模型的正确训练;另一方面,如果数据的分布很分散也会影响训练结果。以上两种方式都体现在方差会非常大。

 

    1. spark ml中代码实例:

 

 

import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.linalg.{Vector,Vectors}
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0)))).toDF("id", "features")
val scaler = new StandardScaler().setInputCol("features")
.setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false)
val scalerModel = scaler.fit(dataFrame)
val scaledData = scalerModel.transform(dataFrame)
scaledData.show(false)

 

 

注意:上述将每一列的标准差缩放到1。如果特征的标准差为零,则该特征在向量中返回的默认值为0.0。

 

5. 主成分分析 (PCA)

 

(1) 定义及用途:主成分分析(PCA)是一种统计学方法,本质是在线性空间中进行一个基变换,使得变换后的数据投影到低维空间的方差最大化。根据变换后方差大小确定坐标轴的权重或者重要性,权重高的成为主成分。 主要应用于降维。

 

(2) spark ml中代码实例:

 

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.{Vector,Vectors}
val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0), Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val scaledDataFrame = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").fit(df).transform(df)
val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures").setK(3).fit(scaledDataFrame)
val pcaDF = pca.transform(scaledDataFrame)
pcaDF.select("features","pcaFeatures").show(false)

 

 

注意:通过setK来设置降低到K维空间。上例中原来有5维特征,通过pca降低到3维特征中。pca前一定要对特征向量进行规范化。因为各主成分之间值变化太大,有数量级的差别。标准化特征向量后各主成分之间基本在同一个水平,结果更合理。K值选择问题,可以先选择一个较大的值,通过pcaModel.explainedVariance计算模型的方差,当方差趋于稳定值,选择对应的K值是一个不错的选择。

 

6. 向量-索引变换(VectorIndexer)

 

(1) 定义及用途:主要用于批量将离散型特征转化为类别特征

 

(2) spark ml中代码实例:

 

import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.linalg.Vectors
val data=Seq(Vectors.dense(-1,1,1,8,56),
             Vectors.dense(-1,3,-1,-9,88),
             Vectors.dense(0,5,1,10,96), 
             Vectors.dense(0,5,1,11,589))
val df=spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")    
val indexer = new VectorIndexer().setInputCol("features").setOutputCol("indexed").setMaxCategories(3)
val indexerModel = indexer.fit(df)
indexerModel.transform(df).show(false)

 

 

注意:设置setMaxCategories为K,将特征数量小于等于K的特征转化为索引。比如上例中设置setMaxCategories为3,第二列特征有三类,则重新编码为0,1,2。

 

7. SQL转换器(SQLTransformer)

 

(1) 定义及用途:很多习惯了使用sql来进行数据处理的童鞋可以使用sql转换器处理特征。

 

(2) spark ml中代码实例:

 

import org.apache.spark.ml.feature.SQLTransformer
val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()

 

 

8. 独热编码(OneHotEncoder)

 

独热编码将标签指标映射为二值变量。

 

9. 最大值-最小值缩放(MinMaxScaler)

 

将独立的特征值转换到指定的范围内,通常为[0,1]。

 

10. 特征向量合并(VectorAssembler)

 

将原始特征和不同特征转换器生成的特征合并为单个特征向量。输入列的值将按指定顺序依次添加到一个新向量中。

 

四、特征选择

 

特征选择是从特征向量中选择那些更简单有效的特征。适用于在高维数据分析中剔除冗余特征,提升模型的性能。特征选择后的特征是原来特征的一个子集。

 

1. 向量机(VectorSlicer)

 

基于已有的特征库,通过索引或者列名来选择部分需要的特征。

 

2. R公式(RFormula)

 

通过R模型公式产生一个特征向量和一个标签列。适合在需要做OneHotEncoder的时候,可以一个简单的代码把所有的离散特征转化成数值化表示。

 

3. 卡方特征选择(ChiSqSelector)

 

(1) 定义及用途:卡方特征选择根据分类的卡方独立性检验来对特征排序。主要适用于有一堆特征,但是我们并不知道哪些有用,哪些没用。可以通过卡方特征选择来快速筛选特征。缺点是速度比较慢。

 

(2) spark ml中代码实例:

 

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.feature.VectorIndexer
val data = Seq( (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0), 
  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0), (9, Vectors.dense(1.0, 0.0, 15.0, 0.2), 0.0))
val df = spark.createDataset(data).toDF("id", "features", "clicked")
val selector = new ChiSqSelector().setNumTopFeatures(2).setFeaturesCol("features").setLabelCol("clicked").setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
result.show(false)

 

 

参考资料:

 

http://spark.apache.org/docs/…
http://www.apache.wiki/pages/…
https://blog.csdn.net/liuling…

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注