在ModelArts Notebook中连接DLI Spark集群并进行机器学习建模
一、背景介绍
本样例基于sklearn中内置的iris数据集(鸢尾花分类数据集)介绍如何将本地notebook中的数据发送到DLI spark集群,并进行简单的特征工程和分类预测建模。
ma-spark魔法命令
%reload_ext ma_magic : 加载ModelArts内置的自研魔法命令ma_magic
%ma_spark query cluster : 使用ma_spark魔法命令查询已注册的DLI队列,执行该命令之前需要先加载ma_magic命令
%ma_spark connect --cluster_name --ak --sk : 连接指定队列,执行该命令之前需要先加载ma_magic命令
sparkmagic魔法命令
sparkmagic魔法命令仅在Sparkmagic(DLI-PySpark)类型的Kernel中可用,详情见sparkmagic
%%local : 当前cell中的代码在notebook本地执行。
%%sql : 在当前cell中仅可执行sql命令,并将提交到DLI Spark集群中执行
sparkmagic其余魔法命令可通过 %%help 查看,若不加 %% 默认支持spark语法且会将代码提交到DLI远程Spark集群执行,执行完成后会将执行结果返回至notebook进行展示。
二、Notebook实践
本样例代码结构如下
- 1 基于ma-spark魔法命令连接DLI Spark分布式集群
- 2 数据准备
- a. 加载iris数据集
- b. 本地数据集发送至远程集群
- 3 读取集群中的数据
- a. 查看数据
- b. 划分训练集和验证集
- c. 基于训练集进行简单的可视化分析
- 4 特征工程
- a. 特征交叉
- 5 构建模型
- a. 基于spark.ml构建随机森林
- b. 训练模型和预测
- c. 保存预测结果至notebook本地并计算AUC和f1-score
1. 基于ma-spark魔法命令连接DLI spark集群
在同一个实例中开启多个jupyter notebook执行远程Spark作业时,仅需连接DLI spark集群一次;
在Sparkmagic(DLI-PySpark)类型的Kernel中连接DLI/MRS集群,需要添加 %%local 命令;
在PySpark和python类型的kernel中连接DLI/MRS集群,请去除 %%local 命令;
查询已注册的队列
%%local
%reload_ext ma_magic
%ma_spark query cluster
连接队列
连接成功后,请先执行restart kernel,然后再执行下述spark相关操作
%%local
%reload_ext ma_magic
%ma_spark connect --cluster_name <dli-queue-name-placeholder> --ak <access-key-placeholder> --sk <secret-access-key-placeholder>
2. 数据准备
a. 加载iris数据集,并将列名转换为DLI Spark支持的格式
'''
e.g. columns: ['sepal length (cm)', ...] -> ['sepal_length', ...]
'''
%%local
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
dataset = load_iris()
data = dataset.data
target = dataset.target
columns = [col_name.split('(cm)')[0].strip().replace(' ', '_') for col_name in dataset.feature_names]
data_pd = pd.DataFrame(np.hstack([data, target.reshape(-1, 1)]), columns = columns+['target'])
data_pd.head()
b. 本地数据集发送至远程集群,并将DataFrame命名为spark_df, 可在cell中执行 %%help 查看%%send_to_spark命令详情
%%send_to_spark -i data_pd -t df -n spark_df
3. 读取集群中的数据
a. 查看数据
查看数据前5行
%%pretty
spark_df.show(5)
查看各列数据类型
spark_df.printSchema()
查看各列缺失值比率,若存在缺失值则需要以众数或者中位数等进行缺失值填充
def check_na_ratio(df, cols):
if isinstance(cols, str):
cols = [cols]
if isinstance(cols, list):
na_ratio = df.agg(*[(1-F.count(col)/F.count('*')).alias(f'{col}_na_ratio') for col in cols])
else:
raise TypeError(f"cols should be [str, list], but got type {type(cols)}")
return na_ratio
from pyspark.sql import functions as F
na_ratio = check_na_ratio(spark_df, spark_df.columns)
na_ratio.show()
b. 划分训练集和验证集
将全部数据按照8比2的比例划分为训练集和验证集,随机种子为2021
df_train, df_val = spark_df.randomSplit([0.8, 0.2], seed=2021)
print(f'训练集样本数:{df_train.count()}, 验证集样本数:{df_val.count()}')
df_val.show()
c. 基于训练集进行简单的可视化分析
注意,在DLI远程Spark集群中仅安装了pyspark, pandas和numpy, 不支持pip install安装python库,因此无法基于matplotlib和seaborn等工具进行可视化,可以通过魔法命令将数据导入到notebook本地进行可视化操作。
此外,尽量避免将大量数据导入notebook,避免因内存或网络等原因导致失败或者其他错误。当需要将数据导入notebook本地进行可视化时,可以先在远程spark集群进行groupBy等操作,减少数据量。
### 构建各种鸢尾花类别下长度等指标的统计值:最大值,最小值和均值
df_train_agg_target = df_train.groupBy('target').agg(
F.max('petal_length').alias('petal_length_max'),
F.min('petal_length').alias('petal_length_min'),
F.mean('petal_length').alias('petal_length_mean'),
F.max('petal_width').alias('petal_width_max'),
F.min('petal_width').alias('petal_width_min'),
F.mean('petal_width').alias('petal_width_mean'),
F.max('sepal_length').alias('sepal_length_max'),
F.min('sepal_length').alias('sepal_length_min'),
F.mean('sepal_length').alias('sepal_length_mean'),
F.max('sepal_width').alias('sepal_width_max'),
F.min('sepal_width').alias('sepal_width_min'),
F.mean('sepal_width').alias('sepal_width_mean'),
)
df_train_agg_target.show()
基于sparkmagic魔法命令 %%spark 将远程spark集群中的数据加载到notebook中, 具体可在当前ipynb中执行%%help查看
%%spark -o df_train_agg_target
df_train_agg_target.select(df_train_agg_target.columns)
在notebook中进行可视化操作,画出不同鸢尾花类型下各个指标的统计值分布情况。从下图可以看出,不同鸢尾花类别的各指标差异较大,可知能够较为简单的区分三种鸢尾花类型
%%local
import warnings
import logging
import matplotlib.pyplot as plt
logging.getLogger('matplotlib.font_manager').disabled = True
warnings.filterwarnings('ignore')
x = range(1, df_train_agg_target.shape[-1])
bar_width = 0.2
color_map = dict(zip([0, 1, 2], ['red', 'orange', 'blue']))
plot_columns = [col for col in df_train_agg_target.columns if col != 'target']
for tar in df_train_agg_target.target.unique():
tmp_df = df_train_agg_target[df_train_agg_target.target==tar]
plt.bar(x, tmp_df[plot_columns].squeeze(), width=bar_width, label = target_name_map.get(tar), fc=color_map.get(tar))
x = [i+bar_width for i in x]
plt.xticks(x, plot_columns, rotation=-60, fontsize=10)
plt.legend()
plt.show()
4. 特征工程
a. 特征交叉,对任意两列特征之间进行减和乘操作
原始数据有4列特征,分别是petal_length,petal_width,sepal_length和sepal_width,对任意两列特征进行减法和乘法的特征交叉操作,特征维度从4维扩展到16维
feat_cols = list(set(spark_df.columns) -set(['target']))
print(feat_cols)
for i in range(len(feat_cols)):
for j in range(i+1, len(feat_cols)):
df_train = df_train.withColumn(f"{feat_cols[j]}_sub_{feat_cols[i]}", F.col(feat_cols[j]) - F.col(feat_cols[i]))
df_train = df_train.withColumn(f"{feat_cols[j]}_mul_{feat_cols[i]}", F.col(feat_cols[j]) * F.col(feat_cols[i]))
df_val = df_val.withColumn(f"{feat_cols[j]}_sub_{feat_cols[i]}", F.col(feat_cols[j]) - F.col(feat_cols[i]))
df_val = df_val.withColumn(f"{feat_cols[j]}_mul_{feat_cols[i]}", F.col(feat_cols[j]) * F.col(feat_cols[i]))
%%pretty
df_val.show()
5. 构建模型
a. 基于spark.ml构建随机森林
构建随机森林分类器,并指定输入特征的列名为features(向量化后的特征列),标签列的列名为target,预测输出列的列名为prediction,其余参数为训练参数,具体可查看PySpark文档 PySpark
from pyspark.ml.classification import RandomForestClassifier
rf_cls = RandomForestClassifier(
featuresCol='features',
labelCol='target',
predictionCol='prediction',
subsamplingRate=0.8,
seed=2021
)
b. 训练模型和预测
在Spark中需要先基于VectorAssembler将指定的各列特征向量化,并在spark.DataFrame中新增指定名称的列如features,才能进一步输入到模型中进行训练和推理。
'''
e.g. inputCols = ['x', 'y', 'z']
----------------------------- ----------------------------------------
col_name 'x' 'y' 'z' 'target' col_name 'x' 'y' 'z' 'target' 'features'
row1 1 2 3 0 -> row1 1 2 3 0 [1,2,3]
row2 2 3 4 1 row2 2 3 4 1 [2,3,4]
----------------------------- ----------------------------------------
'''
from pyspark.ml.feature import VectorAssembler
feat_cols = df_train.columns
feat_cols.remove('target')
print(feat_cols)
assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
df_train = assembler.transform(df_train)
df_val = assembler.transform(df_val)
model = rf_cls.fit(df_train.select('features', 'target'))
pred = model.transform(df_val.select('features', 'target'))
c. 保存预测结果至notebook本地并计算AUC和f1-score
创建pred的副本label_and_preds,并通过魔法命令 %%sql 将其保存为名为label_and_preds且类型为pandas.DataFrame的notebook本地变量
pred.createOrReplaceTempView('label_and_preds')
%%sql -o label_and_preds
select target, prediction from label_and_preds
在notebook本地计算auc和f1值,并画出ROC曲线
%%local
import pandas as pd
from sklearn import metrics
import matplotlib.pyplot as plt
label_and_preds['target'] = label_and_preds['target'].map(int)
probs = label_and_preds['pred_probability'].map(lambda x: x['values']).explode()
real_label = pd.get_dummies(label_and_preds, columns=['target'])[['target_0','target_1','target_2']].values.reshape(-1,1)
fpr, tpr, thresholds = metrics.roc_curve(real_label,probs)
auc = metrics.auc(fpr, tpr)
print(f"the auc of val dataset is {auc}, cause iris is a very simple dataset.")
#绘图
#FPR就是横坐标,TPR就是纵坐标
plt.plot(fpr, tpr, c = 'r', lw = 2, alpha = 0.7, label = u'AUC=%.3f' % auc)
plt.plot((0, 1), (0, 1), c = '#808080', lw = 1, ls = '--', alpha = 0.7)
plt.xlim((-0.01, 1.02))
plt.ylim((-0.01, 1.02))
plt.xticks(np.arange(0, 1.1, 0.1))
plt.yticks(np.arange(0, 1.1, 0.1))
plt.xlabel('False Positive Rate', fontsize=13)
plt.ylabel('True Positive Rate', fontsize=13)
plt.legend(loc='lower right', fancybox=True, framealpha=0.8, fontsize=12)
plt.title(u'Roc Curve for RandomForest-iris', fontsize=17)
plt.show()
%%local
from sklearn.metrics import f1_score
f1 = f1_score( label_and_preds.target, label_and_preds.prediction, average='micro')
print(f"the f1 score of val dataset is {f1}, cause iris is a very simple dataset.")
三、附件
可执行ipynb文件见附件
- 点赞
- 收藏
- 关注作者
评论(0)