在ModelArts Notebook中连接DLI Spark集群并进行机器学习建模

举报
wyhhyw123 发表于 2022/02/21 14:33:27 2022/02/21
【摘要】 本文将介绍如何基于ModelArts Notebook连接DLI远程Spark集群,并基于Sparkmagic魔法命令将本地的iris数据集发送至Spark集群,进而基于PySpark进行简单的数据探索性分析、特征工程和构建随机森林模型进行预测。

一、背景介绍

本样例基于sklearn中内置的iris数据集(鸢尾花分类数据集)介绍如何将本地notebook中的数据发送到DLI spark集群,并进行简单的特征工程和分类预测建模。

ma-spark魔法命令
%reload_ext ma_magic : 加载ModelArts内置的自研魔法命令ma_magic

%ma_spark query cluster : 使用ma_spark魔法命令查询已注册的DLI队列,执行该命令之前需要先加载ma_magic命令

%ma_spark connect --cluster_name --ak --sk : 连接指定队列,执行该命令之前需要先加载ma_magic命令

sparkmagic魔法命令

sparkmagic魔法命令仅在Sparkmagic(DLI-PySpark)类型的Kernel中可用,详情见sparkmagic

%%local : 当前cell中的代码在notebook本地执行。

%%sql : 在当前cell中仅可执行sql命令,并将提交到DLI Spark集群中执行

sparkmagic其余魔法命令可通过 %%help 查看,若不加 %% 默认支持spark语法且会将代码提交到DLI远程Spark集群执行,执行完成后会将执行结果返回至notebook进行展示。


二、Notebook实践

本样例代码结构如下

  • 1 基于ma-spark魔法命令连接DLI Spark分布式集群
  • 2 数据准备
    • a. 加载iris数据集
    • b. 本地数据集发送至远程集群
  • 3 读取集群中的数据
    • a. 查看数据
    • b. 划分训练集和验证集
    • c. 基于训练集进行简单的可视化分析
  • 4 特征工程
    • a. 特征交叉
  • 5 构建模型
    • a. 基于spark.ml构建随机森林
    • b. 训练模型和预测
    • c. 保存预测结果至notebook本地并计算AUC和f1-score

1. 基于ma-spark魔法命令连接DLI spark集群

在同一个实例中开启多个jupyter notebook执行远程Spark作业时,仅需连接DLI spark集群一次;
在Sparkmagic(DLI-PySpark)类型的Kernel中连接DLI/MRS集群,需要添加 %%local 命令;
在PySpark和python类型的kernel中连接DLI/MRS集群,请去除 %%local 命令;

查询已注册的队列

%%local
%reload_ext ma_magic
%ma_spark query cluster

连接队列

连接成功后,请先执行restart kernel,然后再执行下述spark相关操作

%%local
%reload_ext ma_magic
%ma_spark connect --cluster_name <dli-queue-name-placeholder> --ak <access-key-placeholder> --sk <secret-access-key-placeholder>

2. 数据准备

a. 加载iris数据集,并将列名转换为DLI Spark支持的格式

'''
e.g. columns: ['sepal length (cm)', ...] -> ['sepal_length', ...]
'''

%%local
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
dataset = load_iris()
data = dataset.data
target = dataset.target
columns = [col_name.split('(cm)')[0].strip().replace(' ', '_') for col_name in dataset.feature_names]
data_pd = pd.DataFrame(np.hstack([data, target.reshape(-1, 1)]), columns = columns+['target'])
data_pd.head()

b. 本地数据集发送至远程集群,并将DataFrame命名为spark_df, 可在cell中执行 %%help 查看%%send_to_spark命令详情

%%send_to_spark -i data_pd -t df -n spark_df

3. 读取集群中的数据

a. 查看数据

查看数据前5行

%%pretty
spark_df.show(5)

查看各列数据类型

spark_df.printSchema()

查看各列缺失值比率,若存在缺失值则需要以众数或者中位数等进行缺失值填充

def check_na_ratio(df, cols):
    if isinstance(cols, str):
        cols = [cols]
    if isinstance(cols, list):
        na_ratio = df.agg(*[(1-F.count(col)/F.count('*')).alias(f'{col}_na_ratio') for col in cols])
    else:
        raise TypeError(f"cols should be [str, list], but got type {type(cols)}")
    return na_ratio
  
from pyspark.sql import functions as F
na_ratio = check_na_ratio(spark_df, spark_df.columns)
na_ratio.show()

b. 划分训练集和验证集

将全部数据按照8比2的比例划分为训练集和验证集,随机种子为2021

df_train, df_val = spark_df.randomSplit([0.8, 0.2], seed=2021)
print(f'训练集样本数:{df_train.count()}, 验证集样本数:{df_val.count()}')
df_val.show()

c. 基于训练集进行简单的可视化分析

注意,在DLI远程Spark集群中仅安装了pyspark, pandas和numpy, 不支持pip install安装python库,因此无法基于matplotlib和seaborn等工具进行可视化,可以通过魔法命令将数据导入到notebook本地进行可视化操作。
此外,尽量避免将大量数据导入notebook,避免因内存或网络等原因导致失败或者其他错误。当需要将数据导入notebook本地进行可视化时,可以先在远程spark集群进行groupBy等操作,减少数据量。

### 构建各种鸢尾花类别下长度等指标的统计值:最大值,最小值和均值
df_train_agg_target = df_train.groupBy('target').agg(
    F.max('petal_length').alias('petal_length_max'),
    F.min('petal_length').alias('petal_length_min'),
    F.mean('petal_length').alias('petal_length_mean'),
    F.max('petal_width').alias('petal_width_max'),
    F.min('petal_width').alias('petal_width_min'),
    F.mean('petal_width').alias('petal_width_mean'),
    F.max('sepal_length').alias('sepal_length_max'),
    F.min('sepal_length').alias('sepal_length_min'),
    F.mean('sepal_length').alias('sepal_length_mean'),
    F.max('sepal_width').alias('sepal_width_max'),
    F.min('sepal_width').alias('sepal_width_min'),
    F.mean('sepal_width').alias('sepal_width_mean'),
)
df_train_agg_target.show()

基于sparkmagic魔法命令 %%spark 将远程spark集群中的数据加载到notebook中, 具体可在当前ipynb中执行%%help查看

%%spark -o df_train_agg_target
df_train_agg_target.select(df_train_agg_target.columns)

在notebook中进行可视化操作,画出不同鸢尾花类型下各个指标的统计值分布情况。从下图可以看出,不同鸢尾花类别的各指标差异较大,可知能够较为简单的区分三种鸢尾花类型

%%local
import warnings
import logging
import matplotlib.pyplot as plt
logging.getLogger('matplotlib.font_manager').disabled = True
warnings.filterwarnings('ignore')

x = range(1, df_train_agg_target.shape[-1])
bar_width = 0.2
color_map = dict(zip([0, 1, 2], ['red', 'orange', 'blue']))
plot_columns = [col for col in df_train_agg_target.columns if col != 'target']
for tar in df_train_agg_target.target.unique():
    tmp_df = df_train_agg_target[df_train_agg_target.target==tar]
    plt.bar(x, tmp_df[plot_columns].squeeze(), width=bar_width, label = target_name_map.get(tar), fc=color_map.get(tar))
    x = [i+bar_width for i in x]

plt.xticks(x, plot_columns, rotation=-60, fontsize=10)
plt.legend()
plt.show()

image.png

4. 特征工程

a. 特征交叉,对任意两列特征之间进行减和乘操作

原始数据有4列特征,分别是petal_length,petal_width,sepal_length和sepal_width,对任意两列特征进行减法和乘法的特征交叉操作,特征维度从4维扩展到16维

feat_cols = list(set(spark_df.columns) -set(['target']))
print(feat_cols)
for i in range(len(feat_cols)):
    for j in range(i+1, len(feat_cols)):
        df_train = df_train.withColumn(f"{feat_cols[j]}_sub_{feat_cols[i]}", F.col(feat_cols[j]) - F.col(feat_cols[i]))
        df_train = df_train.withColumn(f"{feat_cols[j]}_mul_{feat_cols[i]}", F.col(feat_cols[j]) * F.col(feat_cols[i]))

        df_val = df_val.withColumn(f"{feat_cols[j]}_sub_{feat_cols[i]}", F.col(feat_cols[j]) - F.col(feat_cols[i]))
        df_val = df_val.withColumn(f"{feat_cols[j]}_mul_{feat_cols[i]}", F.col(feat_cols[j]) * F.col(feat_cols[i]))

%%pretty
df_val.show()

5. 构建模型

a. 基于spark.ml构建随机森林

构建随机森林分类器,并指定输入特征的列名为features(向量化后的特征列),标签列的列名为target,预测输出列的列名为prediction,其余参数为训练参数,具体可查看PySpark文档 PySpark

from pyspark.ml.classification import RandomForestClassifier
rf_cls = RandomForestClassifier(
    featuresCol='features',
    labelCol='target',
    predictionCol='prediction',
    subsamplingRate=0.8,
    seed=2021
)

b. 训练模型和预测

在Spark中需要先基于VectorAssembler将指定的各列特征向量化,并在spark.DataFrame中新增指定名称的列如features,才能进一步输入到模型中进行训练和推理。

'''
e.g. inputCols = ['x', 'y', 'z']
-----------------------------              ----------------------------------------
col_name 'x' 'y' 'z' 'target'              col_name 'x' 'y' 'z' 'target' 'features'
row1      1   2   3     0          ->      row1      1   2   3      0     [1,2,3]
row2      2   3   4     1                  row2      2   3   4      1     [2,3,4]
-----------------------------              ----------------------------------------
'''

from pyspark.ml.feature import VectorAssembler
feat_cols = df_train.columns
feat_cols.remove('target')
print(feat_cols)
assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
df_train = assembler.transform(df_train)
df_val = assembler.transform(df_val)

model = rf_cls.fit(df_train.select('features', 'target'))
pred = model.transform(df_val.select('features', 'target'))

c. 保存预测结果至notebook本地并计算AUC和f1-score

创建pred的副本label_and_preds,并通过魔法命令 %%sql 将其保存为名为label_and_preds且类型为pandas.DataFrame的notebook本地变量

pred.createOrReplaceTempView('label_and_preds')
%%sql -o label_and_preds
select target, prediction from label_and_preds

在notebook本地计算auc和f1值,并画出ROC曲线

%%local
import pandas as pd
from sklearn import metrics
import matplotlib.pyplot as plt


label_and_preds['target'] = label_and_preds['target'].map(int)
probs = label_and_preds['pred_probability'].map(lambda x:  x['values']).explode()
real_label = pd.get_dummies(label_and_preds, columns=['target'])[['target_0','target_1','target_2']].values.reshape(-1,1)
fpr, tpr, thresholds = metrics.roc_curve(real_label,probs)
auc = metrics.auc(fpr, tpr)
print(f"the auc of val dataset is {auc}, cause iris is a very simple dataset.")
#绘图

#FPR就是横坐标,TPR就是纵坐标
plt.plot(fpr, tpr, c = 'r', lw = 2, alpha = 0.7, label = u'AUC=%.3f' % auc)
plt.plot((0, 1), (0, 1), c = '#808080', lw = 1, ls = '--', alpha = 0.7)
plt.xlim((-0.01, 1.02))
plt.ylim((-0.01, 1.02))
plt.xticks(np.arange(0, 1.1, 0.1))
plt.yticks(np.arange(0, 1.1, 0.1))
plt.xlabel('False Positive Rate', fontsize=13)
plt.ylabel('True Positive Rate', fontsize=13)

plt.legend(loc='lower right', fancybox=True, framealpha=0.8, fontsize=12)
plt.title(u'Roc Curve for RandomForest-iris', fontsize=17)
plt.show()

image.png

%%local
from sklearn.metrics import f1_score
f1 = f1_score( label_and_preds.target, label_and_preds.prediction, average='micro')
print(f"the f1 score of val dataset is {f1}, cause iris is a very simple dataset.")

三、附件

可执行ipynb文件见附件

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200