spark建模
Spark Mlib建模流程
- 在Jupyter Notebook中导入pyspark、数据处理numpy等相关包
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import os
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean,col,split,col,regexp_extract,when,lit,max,min
import pymssql
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf , col
- 创建本地spark进程
spark=SparkSession.builder.master("local[2]").appName("Spark_mlib").getOrCreate()
- 读入数据,并根据数据集的字段类型,创建schema
schema = StructType([
StructField("USER_ID",StringType(),nullable=True),
StructField("FLOW",DecimalType(),nullable=True),
StructField("FLOW_LAST_ONE",DecimalType(),nullable=True),
StructField("FLOW_LAST_TWO",DecimalType(),nullable=True),
StructField("MONTH_FEE",DecimalType(),nullable=True),
StructField("MONTHS_3AVG",DecimalType(),nullable=True),
StructField("BINDEXP_DATE",DateType(),nullable=True),
StructField("PHONE_CHANGE",IntegerType(),nullable=True),
StructField("AGE",IntegerType(),nullable=True),
StructField("OPEN_DATE",DateType(),nullable=True),
StructField("REMOVE_TAG",StringType(),nullable=True),
])
- 对已经导入的数据集进行处理,并去除无关字段;
Mlib_data = Mlib_data.drop(‘USER_ID’).drop(‘PHONE_CHANGE’)
- 由于模型的要求,我们需要把类别型数据均转换成数字,采用的是自定义函数分别将A/非A映射成1和0
binary_map = {‘A’:1.0, ‘B’:0.0, ‘C’:0.0, ‘D’:0.0, ‘E’:0.0, ‘F’:0.0, ‘K’:0.0,}
toNum = UserDefinedFunction(lambda k : binary_map[k],DoubleType())
- 数据特征提取
根据业务经验,选取与用户流失可能存在相关性的所有属性,进行数据审查,筛选出存在较大相关性的特征属性。本次建模数据特征主要采用目标用户通信以及消费行为等属性,根据这些维度数据合并汇总成数据挖掘特征宽表。
featureCols=[("FLOW"),("FLOW_LAST_ONE"),
("FLOW_LAST_TWO"),("MONTH_FEE"),
("MONTHS_3AVG"),("REMOVE_TAG")]
Assembler = VectorAssembler(inputCols = featureCols, outputCol = “features”)
- 模型训练
MLib分类器和回归其要求数据集以行标签+特征list的LabeledPoint行形式存储。采用自定义的LabelData()函数展示行处理方式,通过该函数将(MLib_data)转换为符合条件的数据集。为降低模型的复杂度,先利用训练集数据生成一个最大深度为2的浅决策树分类模型。
from pyspark.mlib.regression import LabeledPoint
from pyspark.mlib.tree import DecisionTree
def labelData(data):
return data.map(lambda row : LabeledPoint(row[-1],row[:-1]))
model = DecisionTree.trainClassifier(training_data, numClasses=2,
maxDeapth=2, categoricalFeaturesInfo={1:2,2:2}
impurity = ‘gini’, maxBins = 32
)
- 模型性能评估
测试数据的流失率预测结果包括模型的预测函数和实际标签,本次使用MLib’s MulticlassMetrics()进行模型评估。将预测值和标签以元组形势作为输入,然后输出评估结果。
from pyspark.mlib.evaluation import MulticlassMetrics
def getPredictionsLabels(model, test_data):
predictions = model.predict(test_data.map(lambda r : r.features))
return predictions.zip(test_data.map(lambda r :r.label))
def printMetrics(predictions_and_labels):
metrics = MulticlassMetrics(predictions_and_labels)
print(‘Precision of True ’, metrics.precision(1))
print(‘Precision of False ’, metrics.precision(0))
print(‘Recall of True ’, metrics.recall(1))
print(‘Recall of False ’, metrics.recall(0))
print(‘F-1 Score ’, metrics.fMeasure())
得出整体accuracy,F-1 score 结果不错,但是发现recall率的偏斜。假流失样本recall高,真流失的样本recall低,商业决策关注的核心是最可能流失的客户,因此需要确定模型对于真流失样本的准确性。
- 量化模型
为确保结果的可靠性,本次采用交叉验证的方式进行量化模型,使用的方法是K-折交叉验证,基本原理是将数据集随机拆分成K部分,通过训练集和测试集生成模型,产生K个模型,以平均性能得分作为整体得分。
Pipline = Pipeline(stages = [labellndexer,featureIndexer,dTree])
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth,[2,3,4,5,6,7]).build
evaluator = MulticlassClassificationEvaluator(
labelCol = ‘indexedLabel’,predictionCol = ‘prediction’,metricName= ‘f1’
)
- 预测准确率
结果发现验证性能最好的决策树模型深度是5,这意味着起始层数为2的决策树复杂程度不够,但是深度超过5的过拟合导致测试性能较差,使用MLib_model.transform()函数预测新的数据。
vectorized_test_data = vectorizeData(test_data)
transformed_data = MLib_model. transform(test_data)
print evaluator.getMetricName(),‘accuracy:’,evaluator.evaluate(transformed_data)
- 点赞
- 收藏
- 关注作者
评论(0)