PySpark Feature Tool
1. 数据准备
我们定义了一些测试数据,方便验证函数的有效性;同时对于大多数初学者来说,明白函数的输入是什么,输出是什么,才能更好的理解特征函数和使用特征:
df = spark.createDataFrame([
('zhu', "Hi I heard about pySpark"),
('xiang', "I wish python could use case classes"),
('yu', "Logistic regression models are neat")
], ["id", "sentence"])
# functionTestData
+-----+------------------------------------+
|id |sentence |
+-----+------------------------------------+
|zhu |Hi I heard about pySpark. |
|xiang|I wish python could use case classes|
|yu |Logistic regression models are neat |
+-----+------------------------------------+
2.数据读取
# !/usr/bin/env python
# -*- coding: utf-8 -*-
########################################################################################################################
# Creater : Zhu Xiangyu.DOTA
# Creation Time : 2022-2-22 22:22:22
# Description : PySpark 特征工程工具集
# Script Version : 2.0.0.9
########################################################################################################################
import math
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DOTA_Features_Tool').enableHiveSupport().getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 1000)
spark.conf.set("spark.default.parallelism", 2000)
def get_params():
return {
# Function Can be Used
'column1' : "TFIDF", # 词频-逆向文件频率
'column2' : "Word2Vec",
'column3' : "CountVectorizer",
'column4' : "OneHotEncoder",
'column5' : "StringIndexer",
'column6' : "IndexToString",
'column7' : "PCA",
'column8' : "Binarizer",
'column9' : "Tokenizer",
'column10': "StopWordsRemover", #
'column11': "NGram", #
'column12': "DCT", # 离散余弦变换
'column13': "ChiSqSelector", # 卡方校验
'column14': "PearsonCorr", # 皮尔逊系数
}
def main():
# Reset params
######################################################################################
#
# 库名.表名
dataset_Name = ""
dataset = spark.sql("select * from {dataset_Name}".format(dataset_Name = dataset_Name)).fillna(0)
#
# 结果存储目标 库名.表名
saveAsTable_Name = ""
#
# 指定对列col进行function操作 {col:function}
params = {'sentence': "TFIDF"}
#
######################################################################################
#
# functionTestData
df = spark.createDataFrame([
('zhu', "Hi I heard about pySpark"),
('xiang', "I wish python could use case classes"),
('yu', "Logistic regression models are neat")
], ["id", "sentence"])
# Feature Transform
features = featureTool(dataset,params) # Test-Model : dataset = df
features.show(5)
# Save Feature as table
saveResult(features,saveAsTable_Name)
3.数据存储
# SaveTableAs
def saveResult(res,saveAsTable_Name='dota_tmp.dota_features_tool_save_result', saveFormat="orc",saveMode="overwrite"):
res.write.saveAsTable(name=saveAsTable_Name, format=saveFormat,mode=saveMode)
4.特征函数
def featureTool(df,params):
dataCols,targetCols = df.columns,params.keys()
exeColumns = list(params.keys())[0]
exeDefFunction = params[exeColumns]
print(exeColumns+"-->"+exeDefFunction+"(df,{exeColumns})".format(exeColumns=exeColumns))
exeOrder = "feat={exeDef}(df,'{exeCols}','{outputCol}')".format(exeCols=exeColumns,exeDef=exeDefFunction,outputCol=exeDefFunction+'_'+exeColumns)
print("exeOrder : "+exeOrder)
exec(exeOrder)
return feat
01
TFIDF
def TFIDF(df,inputCol="sentence",outputCol="tfidf", numFeatures=20):
"""
词频-逆向文件频率(TF-IDF)是一种在文本挖掘中广泛使用的特征向量化方法,它可以体现一个文档中词语在语料库中的重要程度。
# 总结:一个词语在一篇文章中出现次数越多, 同时在所有文档中出现次数越少, 越能够代表该文章.
"""
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
tokenizerX = Tokenizer(inputCol=inputCol, outputCol="words")
wordsDataX = tokenizerX.transform(df)
hashingTFX = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=numFeatures)
featurizedData = hashingTFX.transform(wordsDataX)
idfX = IDF(inputCol="rawFeatures", outputCol=outputCol)
idfModel = idfX.fit(featurizedData)
tfidfRes = idfModel.transform(featurizedData).drop('words','rawFeatures')
return tfidfRes
权重计算方法经常会和余弦相似度(cosine similarity)一同使用于向量空间模型中,用以判断两份文件之间的相似性。当前,真正在搜索引擎等实际应用中广泛使用的是Tf-idf 模型。Tf-idf 模型的主要思想是:如果词w在一篇文档d中出现的频率高,并且在其他文档中很少出现,则认为词w具有很好的区分能力,适合用来把文章d和其他文章区分开来。
上述代码输出结果如下:
TFIDF Output
+-----+------------------------------------+----------------------------------------------------------------------------------------+
|id |sentence |tfidf |
+-----+------------------------------------+----------------------------------------------------------------------------------------+
|zhu |Hi I heard about pySpark |(20,[0,9,17],[0.6931471805599453,0.5753641449035617,1.3862943611198906]) |
|xiang|I wish python could use case classes|(20,[2,9,13,15],[0.6931471853,1.15072071234,0.285178085,0.28768207245178085]) |
|yu |Logistic regression models are neat |(20,[4,6,13,15,18],[0.69314718053,0.693147453,0.287682078085,0.2876085,0.693149453]) |
+-----+------------------------------------+----------------------------------------------------------------------------------------+
02
Word2Vec
word2vec模型其实就是简单化的神经网络,它可以将文本数据向量化。词向量具有良好的语义特性,是表示词语特征的常用方式。词向量每一维的值代表一个具有一定的语义和语法上解释的特征。
所以,可以将词向量的每一维称为一个词语特征。词向量具有多种形式,distributed representation 是其中一种。一个 distributed representation 是一个稠密、低维的实值向量。distributed representation 的每一维表示词语的一个潜在特征,该特 征捕获了有用的句法和语义特性。可见,distributed representation 中的 distributed 一词体现了词向量这样一个特点:将词语的不同句法和语义特征分布到它的每一个维度去表示。
def Word2Vec(df,inputCol="sentence",outputCol="w2v",vectorSize=100, minCount=5, numPartitions=1,
stepSize=0.025, maxIter=1, seed=None, windowSize=5, maxSentenceLength=1000):
"""
Word2vec:将word转化为vector,word是顺序有意义的实体,比如文档中单词、用户依次点击的商品。
Word2vec 得到实体向量,可以用来度量实体间相似度,在此基础上,以下方向都可以应用:分类,聚类,推荐,句子向量,短文本分类。
#
# 两种实现方式
# Skip-gram:用一个词语作为输入,来预测它周围的上下文。
# CBOW :用一个词语的上下文作为输入,来预测这个词语本身。
#
Spark 的 Word2vec 是一个Estimator,它采用一系列代表文档的词语来训练word2vecmodel。[Spark实现的是Skip-gram模型]
该模型将每个词语映射到一个固定大小的向量。
word2vecmodel使用文档中每个词语的平均数来将文档转换为向量,
然后这个向量可以作为预测的特征,来计算文档相似度计算等等。
"""
from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import split
# Input data: Each row is a bag of words from a sentence or document.
df = df.withColumn("words",split(df[inputCol],' '))
word2VecX = Word2Vec(
vectorSize = vectorSize,
minCount = minCount,
inputCol = "words",
outputCol = outputCol,
numPartitions = numPartitions,
stepSize = stepSize,
maxIter = maxIter,
seed = seed,
windowSize = windowSize,
maxSentenceLength = maxSentenceLength
)
w2vModel = word2VecX.fit(df)
w2vRes = w2vModel.transform(df).drop('words')
return w2vRes
上述代码输出结果如下:
Word2Vec Output
+-----+--------------------+--------------------+
| id| sentence| w2v|
+-----+--------------------+--------------------+
| zhu|Hi I heard about ...|[0.08936496693640...|
|xiang|I wish python cou...|[7.36715538161141...|
| yu|Logistic regressi...|[-0.0063562680035...|
+-----+--------------------+--------------------+
03
Countvectorizer
Countvectorizer旨在通过计数来将一个文档转换为向量。
def CountVectorizer(df,inputCol="sentence",outputCol="cv",vectorSize=200000, minCount=1.0):
"""
Countvectorizer旨在通过计数来将一个文档转换为向量。
当不存在先验字典时,Countvectorizer可作为Estimator来提取词汇,并生成一个Countvectorizermodel。
该模型产生文档关于词语的稀疏表示,其表示可以传递给其他算法如LDA。
#
在fitting过程中,countvectorizer将根据语料库中的词频排序选出前vocabsize个词。
一个可选的参数minDF也影响fitting过程中,它指定词汇表中的词语在文档中最少出现的次数。
另一个可选的二值参数控制输出向量,如果设置为真那么所有非零的计数为1。这对于二值型离散概率模型非常有用。
"""
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.functions import split
df = df.withColumn("words",split(df[inputCol],' '))
CountVectorizerX = CountVectorizer(inputCol="words", outputCol=outputCol, vocabSize=vectorSize, minDF=minCount)
cvModelX = CountVectorizerX.fit(df)
cvRes = cvModelX.transform(df).drop('words')
return cvRes
上述代码输出结果如下:
CountVectorizer Output
+-----+------------------------------------+----------------------------------------------------+
|id |sentence |cv |
+-----+------------------------------------+----------------------------------------------------+
|zhu |Hi I heard about pySpark |(16,[0,2,4,12,13],[1.0,1.0,1.0,1.0,1.0]) |
|xiang|I wish python could use case classes|(16,[0,3,5,6,8,10,14],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|yu |Logistic regression models are neat |(16,[1,7,9,11,15],[1.0,1.0,1.0,1.0,1.0]) |
+-----+------------------------------------+----------------------------------------------------+
04
OneHotEncoder
将类别特征映射为二进制向量,其中只有一个有效值(为1,其余为0)。
def OneHotEncoder(df,inputCol="category",outputCol="categoryVec"):
"""
将类别特征映射为二进制向量,其中只有一个有效值(为1,其余为0)。
"""
from pyspark.ml.feature import OneHotEncoder, StringIndexer
stringIndexerX = StringIndexer(inputCol=inputCol, outputCol="categoryIndex")
modelX = stringIndexerX.fit(df)
indexed = modelX.transform(df)
encoderX = OneHotEncoder(inputCol="categoryIndex", outputCol=outputCol)
encodedX = encoderX.transform(indexed).drop("categoryIndex")
return encodedX
上述代码输出结果如下:
OneHotEncoder Output
+-------+--------+-------------+-------------+
| id|category|categoryIndex| categoryVec|
+-------+--------+-------------+-------------+
| zhu| a| 0.0|(2,[0],[1.0])|
|xiangyu| b| 2.0| (2,[],[])|
| yu| c| 1.0|(2,[1],[1.0])|
| is| a| 0.0|(2,[0],[1.0])|
| coming| a| 0.0|(2,[0],[1.0])|
| now| c| 1.0|(2,[1],[1.0])|
+-------+--------+-------------+-------------+
05
StringIndexer
将标签索引化,然后索引数值根据标签出现的频率进行排序。
def StringIndexer(df,inputCol="category",outputCol="categoryVec"):
"""
将标签索引化,然后索引数值根据标签出现的频率进行排序。
"""
from pyspark.ml.feature import StringIndexer
indexerX = StringIndexer(inputCol=inputCol, outputCol=outputCol)
indexedX = indexerX.fit(df).transform(df)
return indexedX
上述代码输出结果如下:
StringIndexer Output
+-----+--------------------+-----------+
| id| sentence|categoryVec|
+-----+--------------------+-----------+
| zhu|Hi I heard about ...| 2.0|
|xiang|I wish python cou...| 0.0|
| yu|Logistic regressi...| 1.0|
+-----+--------------------+-----------+
06
IndexToString
与StringIndexer对应,IndexToString将索引化标签还原成原始字符串。
def IndexToString(df,inputCol="categoryVec",outputCol="category"):
"""
与StringIndexer对应,IndexToString将索引化标签还原成原始字符串。
"""
from pyspark.ml.feature import IndexToString
converterX = IndexToString(inputCol=inputCol, outputCol=outputCol)
convertedX = converterX.transform(df)
return convertedX
上述代码输出结果如下:
IndexToString Output
IndexToString(StringIndexer(df,"sentence"))
+-----+--------------------+-----------+--------------------+
| id| sentence|categoryVec| category|
+-----+--------------------+-----------+--------------------+
| zhu|Hi I heard about ...| 2.0|Hi I heard about ...|
|xiang|I wish python cou...| 0.0|I wish python cou...|
| yu|Logistic regressi...| 1.0|Logistic regressi...|
+-----+--------------------+-----------+--------------------+
07
PCA
主成分分析是一种对数据进行旋转变换的统计学方法,其本质是在线性空间中进行一个基变换,使得变换后的数据投影在一组新的"坐标轴"上的方差最大化,随后,裁剪掉变换后方差很小的"坐标轴",剩下的新的"坐标轴"即被称为主成分,它们可以再一个较低维度的子空间中尽可能地表示原有数据的性质。
PCA Input
+--------------------+
| features|
+--------------------+
| (5,[1,3],[1.0,7.0])|
|[2.0,0.0,3.0,4.0,...|
|[4.0,0.0,0.0,6.0,...|
+--------------------+
上述代码输出结果如下:
def PCA(df,vectorSize=3, inputCol="features", outputCol="pcaFeatures"):
"""
主成分分析是一种对数据进行旋转变换的统计学方法,其本质是在线性空间中进行一个基变换,
使得变换后的数据投影在一组新的"坐标轴"上的方差最大化,
随后,裁剪掉变换后方差很小的"坐标轴",剩下的新的"坐标轴"即被称为主成分,
它们可以再一个较低维度的子空间中尽可能地表示原有数据的性质。
"""
from pyspark.ml.feature import PCA
pcaX = PCA(k=vectorSize, inputCol=inputCol, outputCol=outputCol)
modelX = pcaX.fit(df)
pcaRes = modelX.transform(df)
return pcaRes
上述代码输出结果如下:
PCA Output
+-----------------------------------------------------------+
|pcaFeatures |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+
0808
Binarizer
把数值型特征值转化成二进制(0/1)输出,设置一个阈值,大于阈值的输出1,小于阈值的输出0。
def Binarizer(df,threshold=0.5, inputCol="feature", outputCol="binarized_feature"):
"""
把数值型特征值转化成二进制(0/1)输出,设置一个阈值,大于阈值的输出1,小于阈值的输出0
"""
from pyspark.ml.feature import Binarizer
binarizerX = Binarizer(threshold=threshold, inputCol=inputCol, outputCol=outputCol)
binarizedX = binarizerX.transform(df)
return binarizedX
上述代码输出结果如下:
Binarizer Output
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
| 0| 0.1| 0.0|
| 1| 0.8| 1.0|
| 2| 0.2| 0.0|
+---+-------+-----------------+
09
Tokenizer
分词器:提供默认分词,也提供正则表达式分词
def Tokenizer(df,inputCol="sentence", outputCol="words", pattern="\\W"):
"""
分词器:提供默认分词,也提供正则表达式分词
"""
from pyspark.ml.feature import RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol=inputCol, outputCol=outputCol, pattern=pattern)
regexTokenized = regexTokenizer.transform(df)
return regexTokenized
上述代码输出结果如下:
Tokenizer Output
+-----------------------------------+------------------------------------------+------+
|sentence |words |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark |[hi, i, heard, about, spark] |5 |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7 |
|Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5 |
+-----------------------------------+------------------------------------------+------+
10
StopWordsRemover
停用词过滤
def StopWordsRemover(df,inputCol="words", outputCol="words2",add_stopwords=[]):
"""
停用词过滤
"""
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol=inputCol, outputCol=outputCol).setStopWords(add_stopwords)
# 添加停用词
# remover = remover.setStopWords(Array("saw","Mary"))
removed = remover.transform(df)
return removed
上述代码输出结果如下:
StopWordsRemover Output
+---+----------------------------+--------------------+
|id |words |words2 |
+---+----------------------------+--------------------+
|0 |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1 |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+
11
NGram
把单词转成一个个连续词输出。
def NGram(df,n=2, inputCol="words", outputCol="ngrams"):
"""
把单词转成一个个连续词输出
"""
from pyspark.ml.feature import NGram
ngram = NGram(n=2, inputCol=inputCol, outputCol=outputCol)
ngramDF = ngram.transform(df)
return ngramDF
上述代码输出结果如下:
NGram Output
+---+--------------------------------------------+----------------------------------------------------------------------+
|id |words |ngrams |
+---+--------------------------------------------+----------------------------------------------------------------------+
|0 |[Hi, I, heard, about, Spark] |[Hi I, I heard, heard about, about Spark] |
|1 |[I, wish, python, could, use, case, classes]|[I wish, wish python, python could, could use, use case, case classes]|
|2 |[Logistic, regression, models, are, neat] |[Logistic regression, regression models, models are, are neat] |
+---+--------------------------------------------+----------------------------------------------------------------------+
12
DCT
离散余弦变换是将时域的N维实数序列转换成频域的N维实数序列的过程(有点类似离散傅里叶变换)。
def DCT(df, inverse=False, inputCol="features", outputCol="featuresDCT"):
"""
离散余弦变换是将时域的N维实数序列转换成频域的N维实数序列的过程(有点类似离散傅里叶变换)。
"""
from pyspark.ml.feature import DCT
dct = DCT(inverse=inverse, inputCol=inputCol, outputCol=outputCol)
dctDf = dct.transform(df)
return dctDf
13
ChiSqSelector
ChiSqSelector代表卡方特征选择。ChiSqSelector根据独立卡方检验,然后选取类别标签主要依赖的特征。 selectorType Supported options: numTopFeatures (default), percentile and fpr. - 1、numTopFeatures:通过卡方检验选取最具有预测能力的Top(num)个特征 - 2、percentile:类似于上一种方法,但是选取一小部分特征而不是固定(num)个特征 - 3、fpr:选择P值低于门限值的特征,这样就可以控制false positive rate来进行特征选择。
def ChiSqSelector(df, featuresCol='features', labelCol='label',numTopFeatures=50,outputCol="selectedFeatures",
selectorType='numTopFeatures', percentile=0.1, fpr=0.05):
"""
ChiSqSelector代表卡方特征选择。ChiSqSelector根据独立卡方检验,然后选取类别标签主要依赖的特征。
"""
# selectorType Supported options: numTopFeatures (default), percentile and fpr.
# 1、numTopFeatures:通过卡方检验选取最具有预测能力的Top(num)个特征
# 2、percentile:类似于上一种方法,但是选取一小部分特征而不是固定(num)个特征
# 3、fpr:选择P值低于门限值的特征,这样就可以控制false positive rate来进行特征选择
from pyspark.ml.feature import ChiSqSelector
selector = ChiSqSelector(
numTopFeatures = numTopFeatures,
featuresCol = featuresCol,
outputCol = outputCol,
labelCol = labelCol,
selectorType = selectorType,
percentile = percentile,
fpr = fpr
)
result = selector.fit(df).transform(df)
print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
return result
上述代码输出结果如下:
# ChiSqSelector Output with top 1 features selected
+---+------------------+-------+----------------+
| id| features| label|selectedFeatures|
+---+------------------+-------+----------------+
| 7|[0.0,0.0,18.0,1.0]| 1.0| [18.0]|
| 8|[0.0,1.0,12.0,0.0]| 0.0| [12.0]|
| 9|[1.0,0.0,15.0,0.1]| 0.0| [15.0]|
+---+------------------+-------+----------------+
14
PearsonCorr
皮尔逊相关系数( Pearson correlation coefficient) 用于度量两个变量X和Y之间的相关(线性相关),其值介于-1与1之间。
def PearsonCorr(df,featureCol='feature',labelCol='label'):
"""
皮尔逊相关系数( Pearson correlation coefficient)
用于度量两个变量X和Y之间的相关(线性相关),其值介于-1与1之间。
"""
return df.corr(featureCol,labelCol,method=None)
参考资料