MR单元测试以及DeBug调试

举报
Smy1121 发表于 2019/06/20 13:37:44 2019/06/20
【摘要】 Hadoop的MapReduce程序提交到集群环境中运行,出问题时定位非常麻烦,有时需要一遍遍修改代码和打印日志来排查问题,哪怕是比较小的问题。如果数据量很大的话调试起来就相当耗费时间。

        Hadoop的MapReduce程序提交到集群环境中运行,出问题时定位非常麻烦,有时需要一遍遍修改代码和打印日志来排查问题,哪怕是比较小的问题。如果数据量很大的话调试起来就相当耗费时间。 而且,Map和Reduce的一些参数是Hadoop框架在运行时传入的,比如Context、InputSplit,这进一步增加了调试的难度。如果有一个良好的单元测试框架能帮助尽早发现、清除bug,那就太好了。


MRUnit 框架

        MRUnit是Cloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用。MRUnit针对不同测试对象使用不同的Driver:


        MapDriver:针对单独的Map测试


        ReduceDriver:针对单独的Reduce测试


        MapReduceDriver:将map和reduce串起来测试


        PipelineMapReduceDriver:将多个MapReduce对串志来测试


        后面就以Temperature程序作为测试案例,说明如何使用MRUnit框架?


   

准备测试案例

        为了理解单元测试框架,我们准备了一个 MapReduce 程序,这里还是以 Temperature 作为测试案例进行演示,只不过 map 方法中的气象站id(key)是从读入文件名称中提取的,为了便于单元测试,这里我们将 key 设置为常量weatherStationId,Temperature 具体代码如下所示。


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

/**

 * 

 * @function 统计美国各个气象站30年来的平均气温

 * @author cs

 *

 */

public class Temperature extends Configured implements Tool {

public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {

/**

* @function Mapper 解析气象站数据

* @input key=偏移量  value=气象站数据

* @output key=weatherStationId value=temperature

*/

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString(); //读取每行数据

int temperature = Integer.parseInt(line.substring(14, 19).trim());//气温


if (temperature != -9999) { //过滤无效数据

String weatherStationId = "weatherStationId";//真实的气象站id是从文件名字中提取,为了便于单元测试,这里key设置为常量weatherStationId

context.write(new Text(weatherStationId), new IntWritable(temperature));

}

}

}

/**

* @function Reducer 统计美国各个州的平均气温

* @input key=weatherStationId  value=temperature

* @output key=weatherStationId value=average(temperature)

*/

public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable();


public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {

int sum = 0;

int count = 0;

for (IntWritable val : values) {

sum += val.get();

count++;

}

result.set(sum / count);

context.write(key, result);

}

}


/**

* @function 任务驱动方法

* @param args

* @return

* @throws Exception

*/

@SuppressWarnings("deprecation")

@Override

public int run(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration();//读取配置配置


Path mypath = new Path(args[1]);

FileSystem hdfs = mypath.getFileSystem(conf);

if (hdfs.isDirectory(mypath)) {

hdfs.delete(mypath, true);

}


Job job = new Job(conf, "temperature");//新建一个任务

job.setJarByClass(Temperature.class);// 主类

FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径

FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径


job.setMapperClass(TemperatureMapper.class);// Mapper

job.setReducerClass(TemperatureReducer.class);// Reducer

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);


return job.waitForCompletion(true) ? 0 : 1;//提交任务

}


/**

* @function main 方法

* @param args

* @throws Exception

*/

public static void main(String[] args) throws Exception {

String[] args0 = {

"hdfs://xxx002:9000/weather/",

"hdfs://xxx002:9000/weather/out/"};

int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);

System.exit(ec);

}

}



        准备好这个类之后,我们开始对这个类进行单元测试。

        注:本实例需要用到mrunit包,自行下载mrunit-hadoop.jar


Mapper 单元测试

        Mapper 的逻辑就是从读取的气象站数据中,提取气温值。比如读取一行"1985 07 31 02 200 94 10137 220 26 1 0 -9999"气象数据,提取第14位到19位之间的字符即为气温值200。


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mrunit.mapreduce.MapDriver;

import org.junit.Before;

import org.junit.Test;

import com.dajiangtai.hadoop.advance.Temperature;

/**

 * Mapper 端的单元测试

 */

@SuppressWarnings("all")

public class TemperatureMapperTest {

private Mapper mapper;//定义一个Mapper对象

private MapDriver driver;//定义一个MapDriver 对象


@Before

public void init() {

mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象

driver = new MapDriver(mapper);//实例化MapDriver对象

}


@Test

public void test() throws IOException {

//输入一行测试数据

String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";

driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致

.withOutput(new Text("weatherStationId"), new IntWritable(200))//跟TemperatureMapper输出类型一致

.runTest();

}

}


在test()方法中,withInput的key/value参数分别为偏移量和一行气象数据,其类型要与TemperatureMapper的输入类型一致即为LongWritable和Text。 withOutput的key/value参数分别是我们期望输出的new Text("weatherStationId")和new IntWritable(200),我们要达到的测试效果就是我们的期望输出结果与 TemperatureMapper 的实际输出结果一致。


Mapper 端的单元测试,只需要鼠标放在 TemperatureMapperTest 类上右击,选择 Run As ——> JUnit test,运行结果如下图所示。

image.png

测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 Mapper 测试成功了。


Reducer 单元测试

Reduce 函数的逻辑就是把key相同的 value 值相加然后取平均值,Reducer 单元测试代码如下所示:


import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;

import org.junit.Before;

import org.junit.Test;

import com.dajiangtai.hadoop.advance.Temperature;

/**

 * Reducer 单元测试

 */

@SuppressWarnings("all")

public class TemperatureReduceTest {

private Reducer reducer;//定义一个Reducer对象

private ReduceDriver driver;//定义一个ReduceDriver对象


@Before

public void init() {

reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象

driver = new ReduceDriver(reducer);//实例化ReduceDriver对象

}


@Test

public void test() throws IOException {

String key = "weatherStationId";//声明一个key值

List values = new ArrayList();

values.add(new IntWritable(200));//添加第一个value值

values.add(new IntWritable(100));//添加第二个value值

driver.withInput(new Text("weatherStationId"), values)//跟TemperatureReducer输入类型一致

  .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致

  .runTest();

}

}

在test()方法中,withInput的key/value参数分别为new Text("weatherStationId")和List类型的集合values。withOutput 的key/value参数分别是我们所期望输出的new Text("weatherStationId")和new IntWritable(150),我们要达到的测试效果就是我们的期望输出结果与TemperatureReducer实际输出结果一致。


Reducer 端的单元测试,鼠标放在 TemperatureReduceTest 类上右击,选择 Run As ——> JUnit test,运行结果如下所示:

image.png


测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 Reducer 测试成功了。


MapReduce 单元测试

把 Mapper 和 Reducer 集成起来的测试案例代码如下:


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;

import org.junit.Before;

import org.junit.Test;

import com.dajiangtai.hadoop.advance.Temperature;

/**

 * Mapper 和 Reducer 集成起来测试

 */

@SuppressWarnings("all")

public class TemperatureTest {

private Mapper mapper;//定义一个Mapper对象

private Reducer reducer;//定义一个Reducer对象

private MapReduceDriver driver;//定义一个MapReduceDriver 对象


@Before

public void init() {

mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象

reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象

driver = new MapReduceDriver(mapper, reducer);//实例化MapReduceDriver对象

}


@Test

public void test() throws RuntimeException, IOException {

//输入两行行测试数据

String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";

String line2 = "1985 07 31 11   100    56 -9999    50     5 -9999     0 -9999";

driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致

  .withInput(new LongWritable(), new Text(line2))

  .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致

  .runTest();

}

}


在 test() 方法中,withInput添加了两行测试数据line和line2,withOutput 的key/value参数分别为我们期望的输出结果new Text("weatherStationId")和new IntWritable(150)。我们要达到的测试效果就是我们期望的输出结果与Temperature实际的输出结果一致。


MapReduce 端的单元测试,鼠标放在 TemperatureTest 类上右击,选择 Run As ——> JUnit test,运行结果如下所示:

image.png


测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 MapReduce 测试成功了。

        写程序几乎一大半的时间是调试,分布式程序调试的成本更高。 那么分布式的代码程序该如何调试呢?一起学习下MapReduce 代码如何使用 Debug 来调试。(hadoop 分布式集群的搭建,后续篇章详细讲解)


MapReduce 的Debug 调试

        这里以 Temperature 为例,右键 Temperature 项目,选择"Debug As" ——> "java Application",在程序中打上断点后直接进入调试模式,如下图所示。


image.png

程序进入debug调试后,后续的调试步骤跟 Eclipse 调试 java程序是一样的,这里就不再赘述。








【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。