前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据量大了跑不动?PySpark特征工程总结

数据量大了跑不动?PySpark特征工程总结

作者头像
炼丹笔记
发布2022-04-06 19:10:45
3.2K0
发布2022-04-06 19:10:45
举报
文章被收录于专栏:炼丹笔记

PySpark Feature Tool

1. 数据准备

我们定义了一些测试数据,方便验证函数的有效性;同时对于大多数初学者来说,明白函数的输入是什么,输出是什么,才能更好的理解特征函数和使用特征:

代码语言:javascript
复制
df = spark.createDataFrame([
    ('zhu', "Hi I heard about pySpark"),
    ('xiang', "I wish python could use case classes"),
    ('yu', "Logistic regression models are neat")
], ["id", "sentence"])
# functionTestData
+-----+------------------------------------+
|id   |sentence                            |
+-----+------------------------------------+
|zhu  |Hi I heard about pySpark.           |
|xiang|I wish python could use case classes|
|yu   |Logistic regression models are neat |
+-----+------------------------------------+

2.数据读取

代码语言:javascript
复制
# !/usr/bin/env python
# -*- coding: utf-8 -*-
########################################################################################################################
#  Creater        : Zhu Xiangyu.DOTA
#  Creation Time  : 2022-2-22 22:22:22
#  Description    : PySpark 特征工程工具集
#  Script Version : 2.0.0.9
########################################################################################################################

import math
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DOTA_Features_Tool').enableHiveSupport().getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 1000)
spark.conf.set("spark.default.parallelism", 2000)

def get_params():
    return {
    # Function Can be Used
    'column1' : "TFIDF",            # 词频-逆向文件频率
    'column2' : "Word2Vec",
    'column3' : "CountVectorizer",
    'column4' : "OneHotEncoder",
    'column5' : "StringIndexer",
    'column6' : "IndexToString",
    'column7' : "PCA",
    'column8' : "Binarizer",
    'column9' : "Tokenizer",
    'column10': "StopWordsRemover", #
    'column11': "NGram",            #
    'column12': "DCT",              # 离散余弦变换
    'column13': "ChiSqSelector",    # 卡方校验
    'column14': "PearsonCorr",      # 皮尔逊系数
    }

def main():
    # Reset params
    ######################################################################################
    #
    # 库名.表名
    dataset_Name = ""
    dataset = spark.sql("select * from {dataset_Name}".format(dataset_Name = dataset_Name)).fillna(0)
    #
    # 结果存储目标 库名.表名
    saveAsTable_Name = ""
    #
    # 指定对列col进行function操作 {col:function}
    params = {'sentence': "TFIDF"}
    #
    ######################################################################################
    #
    # functionTestData
    df = spark.createDataFrame([
        ('zhu', "Hi I heard about pySpark"),
        ('xiang', "I wish python could use case classes"),
        ('yu', "Logistic regression models are neat")
    ], ["id", "sentence"])
    # Feature Transform
    features = featureTool(dataset,params) # Test-Model : dataset = df
    features.show(5)
    # Save Feature as table
    saveResult(features,saveAsTable_Name)

3.数据存储

代码语言:javascript
复制
# SaveTableAs
def saveResult(res,saveAsTable_Name='dota_tmp.dota_features_tool_save_result', saveFormat="orc",saveMode="overwrite"):
    res.write.saveAsTable(name=saveAsTable_Name, format=saveFormat,mode=saveMode)

4.特征函数

代码语言:javascript
复制
def featureTool(df,params):
    dataCols,targetCols = df.columns,params.keys()
    exeColumns = list(params.keys())[0]
    exeDefFunction = params[exeColumns]
    print(exeColumns+"-->"+exeDefFunction+"(df,{exeColumns})".format(exeColumns=exeColumns))
    exeOrder = "feat={exeDef}(df,'{exeCols}','{outputCol}')".format(exeCols=exeColumns,exeDef=exeDefFunction,outputCol=exeDefFunction+'_'+exeColumns)
    print("exeOrder : "+exeOrder)
    exec(exeOrder)
    return feat

01

TFIDF

代码语言:javascript
复制
def TFIDF(df,inputCol="sentence",outputCol="tfidf", numFeatures=20):
    """
    词频-逆向文件频率(TF-IDF)是一种在文本挖掘中广泛使用的特征向量化方法,它可以体现一个文档中词语在语料库中的重要程度。
    # 总结:一个词语在一篇文章中出现次数越多, 同时在所有文档中出现次数越少, 越能够代表该文章.
    """
    from pyspark.ml.feature import HashingTF, IDF, Tokenizer
    tokenizerX = Tokenizer(inputCol=inputCol, outputCol="words")
    wordsDataX = tokenizerX.transform(df)
    hashingTFX = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=numFeatures)
    featurizedData = hashingTFX.transform(wordsDataX)
    idfX = IDF(inputCol="rawFeatures", outputCol=outputCol)
    idfModel = idfX.fit(featurizedData)
    tfidfRes = idfModel.transform(featurizedData).drop('words','rawFeatures')
    return tfidfRes

权重计算方法经常会和余弦相似度(cosine similarity)一同使用于向量空间模型中,用以判断两份文件之间的相似性。当前,真正在搜索引擎等实际应用中广泛使用的是Tf-idf 模型。Tf-idf 模型的主要思想是:如果词w在一篇文档d中出现的频率高,并且在其他文档中很少出现,则认为词w具有很好的区分能力,适合用来把文章d和其他文章区分开来。

上述代码输出结果如下:

代码语言:javascript
复制
TFIDF Output
+-----+------------------------------------+----------------------------------------------------------------------------------------+
|id   |sentence                            |tfidf                                                                                   |
+-----+------------------------------------+----------------------------------------------------------------------------------------+
|zhu  |Hi I heard about pySpark            |(20,[0,9,17],[0.6931471805599453,0.5753641449035617,1.3862943611198906])                |
|xiang|I wish python could use case classes|(20,[2,9,13,15],[0.6931471853,1.15072071234,0.285178085,0.28768207245178085])           |
|yu   |Logistic regression models are neat |(20,[4,6,13,15,18],[0.69314718053,0.693147453,0.287682078085,0.2876085,0.693149453])    |
+-----+------------------------------------+----------------------------------------------------------------------------------------+

02

Word2Vec

word2vec模型其实就是简单化的神经网络,它可以将文本数据向量化。词向量具有良好的语义特性,是表示词语特征的常用方式。词向量每一维的值代表一个具有一定的语义和语法上解释的特征。

所以,可以将词向量的每一维称为一个词语特征。词向量具有多种形式,distributed representation 是其中一种。一个 distributed representation 是一个稠密、低维的实值向量。distributed representation 的每一维表示词语的一个潜在特征,该特 征捕获了有用的句法和语义特性。可见,distributed representation 中的 distributed 一词体现了词向量这样一个特点:将词语的不同句法和语义特征分布到它的每一个维度去表示。

代码语言:javascript
复制
def Word2Vec(df,inputCol="sentence",outputCol="w2v",vectorSize=100, minCount=5, numPartitions=1,
    stepSize=0.025, maxIter=1, seed=None, windowSize=5, maxSentenceLength=1000):
    """
    Word2vec:将word转化为vector,word是顺序有意义的实体,比如文档中单词、用户依次点击的商品。
    Word2vec 得到实体向量,可以用来度量实体间相似度,在此基础上,以下方向都可以应用:分类,聚类,推荐,句子向量,短文本分类。
    #
    # 两种实现方式
    # Skip-gram:用一个词语作为输入,来预测它周围的上下文。
    # CBOW :用一个词语的上下文作为输入,来预测这个词语本身。
    #
    Spark 的 Word2vec 是一个Estimator,它采用一系列代表文档的词语来训练word2vecmodel。[Spark实现的是Skip-gram模型]
    该模型将每个词语映射到一个固定大小的向量。
    word2vecmodel使用文档中每个词语的平均数来将文档转换为向量,
    然后这个向量可以作为预测的特征,来计算文档相似度计算等等。
    """
    from pyspark.ml.feature import Word2Vec
    from pyspark.sql.functions import split
    # Input data: Each row is a bag of words from a sentence or document.
    df = df.withColumn("words",split(df[inputCol],' '))
    word2VecX = Word2Vec(
                        vectorSize = vectorSize,
                        minCount = minCount,
                        inputCol = "words",
                        outputCol = outputCol,
                        numPartitions = numPartitions,
                        stepSize = stepSize,
                        maxIter = maxIter,
                        seed = seed,
                        windowSize = windowSize,
                        maxSentenceLength = maxSentenceLength
                        )
    w2vModel = word2VecX.fit(df)
    w2vRes = w2vModel.transform(df).drop('words')
    return w2vRes

上述代码输出结果如下:

代码语言:javascript
复制
Word2Vec Output
+-----+--------------------+--------------------+
|   id|            sentence|                 w2v|
+-----+--------------------+--------------------+
|  zhu|Hi I heard about ...|[0.08936496693640...|
|xiang|I wish python cou...|[7.36715538161141...|
|   yu|Logistic regressi...|[-0.0063562680035...|
+-----+--------------------+--------------------+

03

Countvectorizer

Countvectorizer旨在通过计数来将一个文档转换为向量。

代码语言:javascript
复制
def CountVectorizer(df,inputCol="sentence",outputCol="cv",vectorSize=200000, minCount=1.0):
    """
    Countvectorizer旨在通过计数来将一个文档转换为向量。
    当不存在先验字典时,Countvectorizer可作为Estimator来提取词汇,并生成一个Countvectorizermodel。
    该模型产生文档关于词语的稀疏表示,其表示可以传递给其他算法如LDA。
    #
    在fitting过程中,countvectorizer将根据语料库中的词频排序选出前vocabsize个词。
    一个可选的参数minDF也影响fitting过程中,它指定词汇表中的词语在文档中最少出现的次数。
    另一个可选的二值参数控制输出向量,如果设置为真那么所有非零的计数为1。这对于二值型离散概率模型非常有用。
    """
    from pyspark.ml.feature import CountVectorizer
    from pyspark.sql.functions import split
    df = df.withColumn("words",split(df[inputCol],' '))
    CountVectorizerX = CountVectorizer(inputCol="words", outputCol=outputCol, vocabSize=vectorSize, minDF=minCount)
    cvModelX = CountVectorizerX.fit(df)
    cvRes = cvModelX.transform(df).drop('words')
    return cvRes

上述代码输出结果如下:

代码语言:javascript
复制
CountVectorizer Output
+-----+------------------------------------+----------------------------------------------------+
|id   |sentence                            |cv                                                  |
+-----+------------------------------------+----------------------------------------------------+
|zhu  |Hi I heard about pySpark            |(16,[0,2,4,12,13],[1.0,1.0,1.0,1.0,1.0])            |
|xiang|I wish python could use case classes|(16,[0,3,5,6,8,10,14],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|yu   |Logistic regression models are neat |(16,[1,7,9,11,15],[1.0,1.0,1.0,1.0,1.0])            |
+-----+------------------------------------+----------------------------------------------------+

04

OneHotEncoder

将类别特征映射为二进制向量,其中只有一个有效值(为1,其余为0)。

代码语言:javascript
复制
def OneHotEncoder(df,inputCol="category",outputCol="categoryVec"):
    """
    将类别特征映射为二进制向量,其中只有一个有效值(为1,其余为0)。
    """
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    stringIndexerX = StringIndexer(inputCol=inputCol, outputCol="categoryIndex")
    modelX = stringIndexerX.fit(df)
    indexed = modelX.transform(df)
    encoderX = OneHotEncoder(inputCol="categoryIndex", outputCol=outputCol)
    encodedX = encoderX.transform(indexed).drop("categoryIndex")
    return encodedX

上述代码输出结果如下:

代码语言:javascript
复制
OneHotEncoder Output
+-------+--------+-------------+-------------+
|     id|category|categoryIndex|  categoryVec|
+-------+--------+-------------+-------------+
|    zhu|       a|          0.0|(2,[0],[1.0])|
|xiangyu|       b|          2.0|    (2,[],[])|
|     yu|       c|          1.0|(2,[1],[1.0])|
|     is|       a|          0.0|(2,[0],[1.0])|
| coming|       a|          0.0|(2,[0],[1.0])|
|    now|       c|          1.0|(2,[1],[1.0])|
+-------+--------+-------------+-------------+

05

StringIndexer

将标签索引化,然后索引数值根据标签出现的频率进行排序。

代码语言:javascript
复制
def StringIndexer(df,inputCol="category",outputCol="categoryVec"):
    """
    将标签索引化,然后索引数值根据标签出现的频率进行排序。
    """
    from pyspark.ml.feature import StringIndexer
    indexerX = StringIndexer(inputCol=inputCol, outputCol=outputCol)
    indexedX = indexerX.fit(df).transform(df)
    return indexedX

上述代码输出结果如下:

代码语言:javascript
复制
StringIndexer Output
+-----+--------------------+-----------+
|   id|            sentence|categoryVec|
+-----+--------------------+-----------+
|  zhu|Hi I heard about ...|        2.0|
|xiang|I wish python cou...|        0.0|
|   yu|Logistic regressi...|        1.0|
+-----+--------------------+-----------+

06

IndexToString

与StringIndexer对应,IndexToString将索引化标签还原成原始字符串。

代码语言:javascript
复制
def IndexToString(df,inputCol="categoryVec",outputCol="category"):
    """
    与StringIndexer对应,IndexToString将索引化标签还原成原始字符串。
    """
    from pyspark.ml.feature import IndexToString
    converterX = IndexToString(inputCol=inputCol, outputCol=outputCol)
    convertedX = converterX.transform(df)
    return convertedX

上述代码输出结果如下:

代码语言:javascript
复制
IndexToString Output
IndexToString(StringIndexer(df,"sentence"))
+-----+--------------------+-----------+--------------------+
|   id|            sentence|categoryVec|            category|
+-----+--------------------+-----------+--------------------+
|  zhu|Hi I heard about ...|        2.0|Hi I heard about ...|
|xiang|I wish python cou...|        0.0|I wish python cou...|
|   yu|Logistic regressi...|        1.0|Logistic regressi...|
+-----+--------------------+-----------+--------------------+

07

PCA

主成分分析是一种对数据进行旋转变换的统计学方法,其本质是在线性空间中进行一个基变换,使得变换后的数据投影在一组新的"坐标轴"上的方差最大化,随后,裁剪掉变换后方差很小的"坐标轴",剩下的新的"坐标轴"即被称为主成分,它们可以再一个较低维度的子空间中尽可能地表示原有数据的性质。

代码语言:javascript
复制
PCA Input
+--------------------+
|            features|
+--------------------+
| (5,[1,3],[1.0,7.0])|
|[2.0,0.0,3.0,4.0,...|
|[4.0,0.0,0.0,6.0,...|
+--------------------+

上述代码输出结果如下:

代码语言:javascript
复制
def PCA(df,vectorSize=3, inputCol="features", outputCol="pcaFeatures"):
    """
    主成分分析是一种对数据进行旋转变换的统计学方法,其本质是在线性空间中进行一个基变换,
    使得变换后的数据投影在一组新的"坐标轴"上的方差最大化,
    随后,裁剪掉变换后方差很小的"坐标轴",剩下的新的"坐标轴"即被称为主成分,
    它们可以再一个较低维度的子空间中尽可能地表示原有数据的性质。
    """
    from pyspark.ml.feature import PCA
    pcaX = PCA(k=vectorSize, inputCol=inputCol, outputCol=outputCol)
    modelX = pcaX.fit(df)
    pcaRes = modelX.transform(df)
    return pcaRes

上述代码输出结果如下:

代码语言:javascript
复制
PCA Output
+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+

0808

Binarizer

把数值型特征值转化成二进制(0/1)输出,设置一个阈值,大于阈值的输出1,小于阈值的输出0。

代码语言:javascript
复制
def Binarizer(df,threshold=0.5, inputCol="feature", outputCol="binarized_feature"):
    """
    把数值型特征值转化成二进制(0/1)输出,设置一个阈值,大于阈值的输出1,小于阈值的输出0
    """
    from pyspark.ml.feature import Binarizer
    binarizerX = Binarizer(threshold=threshold, inputCol=inputCol, outputCol=outputCol)
    binarizedX = binarizerX.transform(df)
    return binarizedX

上述代码输出结果如下:

代码语言:javascript
复制
Binarizer Output
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
+---+-------+-----------------+

09

Tokenizer

分词器:提供默认分词,也提供正则表达式分词

代码语言:javascript
复制
def Tokenizer(df,inputCol="sentence", outputCol="words", pattern="\\W"):
    """
    分词器:提供默认分词,也提供正则表达式分词
    """
    from pyspark.ml.feature import RegexTokenizer
    regexTokenizer = RegexTokenizer(inputCol=inputCol, outputCol=outputCol, pattern=pattern)
    regexTokenized = regexTokenizer.transform(df)
    return regexTokenized

上述代码输出结果如下:

代码语言:javascript
复制
Tokenizer Output
+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |
+-----------------------------------+------------------------------------------+------+

10

StopWordsRemover

停用词过滤

代码语言:javascript
复制
def StopWordsRemover(df,inputCol="words", outputCol="words2",add_stopwords=[]):
    """
    停用词过滤
    """
    from pyspark.ml.feature import StopWordsRemover
    remover = StopWordsRemover(inputCol=inputCol, outputCol=outputCol).setStopWords(add_stopwords)
    # 添加停用词
    # remover = remover.setStopWords(Array("saw","Mary"))
    removed = remover.transform(df)
    return removed

上述代码输出结果如下:

代码语言:javascript
复制
StopWordsRemover Output
+---+----------------------------+--------------------+
|id |words                       |words2              |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+

11

NGram

把单词转成一个个连续词输出。

代码语言:javascript
复制
def NGram(df,n=2, inputCol="words", outputCol="ngrams"):
    """
    把单词转成一个个连续词输出
    """
    from pyspark.ml.feature import NGram
    ngram = NGram(n=2, inputCol=inputCol, outputCol=outputCol)
    ngramDF = ngram.transform(df)
    return ngramDF

上述代码输出结果如下:

代码语言:javascript
复制
NGram Output
+---+--------------------------------------------+----------------------------------------------------------------------+
|id |words                                       |ngrams                                                                |
+---+--------------------------------------------+----------------------------------------------------------------------+
|0  |[Hi, I, heard, about, Spark]                |[Hi I, I heard, heard about, about Spark]                             |
|1  |[I, wish, python, could, use, case, classes]|[I wish, wish python, python could, could use, use case, case classes]|
|2  |[Logistic, regression, models, are, neat]   |[Logistic regression, regression models, models are, are neat]        |
+---+--------------------------------------------+----------------------------------------------------------------------+

12

DCT

离散余弦变换是将时域的N维实数序列转换成频域的N维实数序列的过程(有点类似离散傅里叶变换)。

代码语言:javascript
复制
def DCT(df, inverse=False, inputCol="features", outputCol="featuresDCT"):
    """
    离散余弦变换是将时域的N维实数序列转换成频域的N维实数序列的过程(有点类似离散傅里叶变换)。
    """
    from pyspark.ml.feature import DCT
    dct = DCT(inverse=inverse, inputCol=inputCol, outputCol=outputCol)
    dctDf = dct.transform(df)
    return dctDf

13

ChiSqSelector

ChiSqSelector代表卡方特征选择。ChiSqSelector根据独立卡方检验,然后选取类别标签主要依赖的特征。 selectorType Supported options: numTopFeatures (default), percentile and fpr. - 1、numTopFeatures:通过卡方检验选取最具有预测能力的Top(num)个特征 - 2、percentile:类似于上一种方法,但是选取一小部分特征而不是固定(num)个特征 - 3、fpr:选择P值低于门限值的特征,这样就可以控制false positive rate来进行特征选择。

代码语言:javascript
复制
def ChiSqSelector(df, featuresCol='features', labelCol='label',numTopFeatures=50,outputCol="selectedFeatures",
    selectorType='numTopFeatures', percentile=0.1, fpr=0.05):
    """
    ChiSqSelector代表卡方特征选择。ChiSqSelector根据独立卡方检验,然后选取类别标签主要依赖的特征。
    """
    # selectorType Supported options: numTopFeatures (default), percentile and fpr.
    # 1、numTopFeatures:通过卡方检验选取最具有预测能力的Top(num)个特征
    # 2、percentile:类似于上一种方法,但是选取一小部分特征而不是固定(num)个特征
    # 3、fpr:选择P值低于门限值的特征,这样就可以控制false positive rate来进行特征选择
    from pyspark.ml.feature import ChiSqSelector
    selector = ChiSqSelector(
                            numTopFeatures = numTopFeatures,
                            featuresCol = featuresCol,
                            outputCol = outputCol,
                            labelCol = labelCol,
                            selectorType = selectorType,
                            percentile = percentile,
                            fpr = fpr
                            )
    result = selector.fit(df).transform(df)
    print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
    return result

上述代码输出结果如下:

代码语言:javascript
复制
# ChiSqSelector Output with top 1 features selected
+---+------------------+-------+----------------+
| id|          features|  label|selectedFeatures|
+---+------------------+-------+----------------+
|  7|[0.0,0.0,18.0,1.0]|    1.0|          [18.0]|
|  8|[0.0,1.0,12.0,0.0]|    0.0|          [12.0]|
|  9|[1.0,0.0,15.0,0.1]|    0.0|          [15.0]|
+---+------------------+-------+----------------+

14

PearsonCorr

皮尔逊相关系数( Pearson correlation coefficient) 用于度量两个变量X和Y之间的相关(线性相关),其值介于-1与1之间。

代码语言:javascript
复制
def PearsonCorr(df,featureCol='feature',labelCol='label'):
    """
    皮尔逊相关系数( Pearson correlation coefficient)
    用于度量两个变量X和Y之间的相关(线性相关),其值介于-1与1之间。
    """
    return df.corr(featureCol,labelCol,method=None)

参考资料

  • https://zhuanlan.zhihu.com/p/112137809
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-02-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 炼丹笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档