面试题:MapReduce黑名单过滤问题
【摘要】 题目:MapReduce 黑名单过滤问题
问题描述:在HDFS中有两个名单:人员名单,人员黑名单。要求:对该人员名单进行过滤,除去黑名单中的人员,并且统计人员名单中人员出现的频次。将结果生成文件,保存在HDFS中。
题目:
MapReduce 黑名单过滤问题
问题描述:
在HDFS中有两个名单:人员名单,人员黑名单。
要求:对该人员名单进行过滤,除去黑名单中的人员,并且统计人员名单中人员出现的频次。
将结果生成文件,保存在HDFS中。
核心业务代码:
package com.miracle.tears.detail;
import com.miracle.tears.exception.FilePathError;
import com.miracle.tears.util.HdfsOperatorUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.StringTokenizer;
/**
*
* 题目:
* 在 hdfs 目录/tmp/tianliangedu/input/wordcount 目录中有一系列文件,内容为","号分
* 隔,同时在 hdfs 路径/tmp/tianliangedu/black.txt 黑名单文件,一行一个单词用于存放不记入统
* 计的单词列表。求按","号分隔的各个元素去除掉黑名单后的出现频率,输出到目录
* /tmp/tianliangedu/output/个人用户名的 hdfs 目录中。
*
* 黑名单过滤问题
* 通过 Configuration 配置传参的方式解决
* map reduce driver 三合一的写法
*
* 分析:
* 三个路径
* 输入路径
* HDFS中一个人员名单 和 一个 人员黑名单
* 输入路径
* HDFS路径
* 人员名单
* 按 逗号 分割
* 过滤掉黑名单中人员
* 计数剩余人员频次
*/
public class BlackList4Conf {
/**
* map 方法
*/
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>{
// 4.1 定义用于去重的数据结构 (存放了黑名单集合)
Set<String> blackUniqSet = new HashSet<>();
/**
* 拿到黑名单数据,方便 map 对人员进行判断
* @param context 上下文
*/
@Override
protected void setup(Mapper<Object, Text, Text, IntWritable>.Context context) {
// 1. 获取hadoop配置对象
Configuration conf = new Configuration();
// 2. 通过配置对象得到黑名单内容
String configContent = conf.get("blacklist");
// 3. 将字符串以空白分割,转变为数组存储 (多一个/ 用于转义)
String [] blackListArray = configContent.split("//s");
// 4. 将黑名单数组存入集合 (数组转集合)
blackUniqSet.addAll(Arrays.asList(blackListArray));
}
// 定义一个String对象的mr包装类Text变量
Text wordText = new Text();
// 定义一个值为1的IntWritable对象,记录频次,方便kv输出
IntWritable one = new IntWritable(1);
/**
* 对上层传递过来的数据
* 依据 value 值,一一处理
* @param key 在此阶段进行key的定义
* @param value 传递过来的数据
* @param context 上下文环境 用于承接参数的传递、环境配置等
* @throws IOException 异常
* @throws InterruptedException 异常
*/
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 1. 用字符串分割工具类,处理行数据,按逗号分割
StringTokenizer st = new StringTokenizer(value.toString(),",");
// 2. 对分割出的数据进行处理
while(st.hasMoreTokens()){
// 2.1 接收传递来的数据
String word = st.nextToken();
// 2.2 判断这个名字是否在黑名单中
if(!blackUniqSet.contains(word)){
/**
* 2.3 不在黑名单中
* 2.4 新建 hadoop 数据类型
* 2.5 设置key 和 value
*/
wordText.set(word);
context.write(wordText,one);
}
}
}
}
/**
* reduce 方法
*/
public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable >{
// 定义用于转换类型输出的sum包装类
IntWritable sumWritable = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 1. 相同的key抵达,定义要频次的和
int sum = 0;
// 2. 遍历每一个同key的值
for (IntWritable temp : values){
// 2.1 求和
sum += temp.get();
}
// 3. 将频次和转换类型输出
sumWritable.set(sum);
// 4. 通过 context对象 输出
context.write(key,sumWritable);
}
}
/**
* main 方法
* 驱动类
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, FilePathError {
// 1. 获取hadoop配置环境变量
Configuration conf = new Configuration();
// 2.1 添加 GOP (通用选项解析器)
GenericOptionsParser gop = new GenericOptionsParser(conf, args);
// 2.2 读取客户端输入的参数,存入数组中
String [] remainingArgs = gop.getRemainingArgs();
// 2.3 定义黑名单的路径参数 为用户输入的第三个参数
String hdfsConfigPath = remainingArgs[2];
// 2.4 使用自定义工具类读取文件内容,并存储在字符串中
String configContent = HdfsOperatorUtil.readContentFormFile(hdfsConfigPath);
// 2.5 设置在全局中
conf.set("blacklist",configContent);
// 3. 获取任务实例
Job job = Job.getInstance(conf,"黑名单过滤");
// 3.1 设置主类
job.setJarByClass(BlackList4Conf.class);
// 3.2 设置Mapper
job.setMapperClass(MyMapper.class);
// 3.3 设置Combiner
job.setCombinerClass(MyReduce.class);
// 3.4 设置Reducer
job.setReducerClass(MyReduce.class);
// 4.1 设置key的输出类型
job.setOutputKeyClass(Text.class);
// 4.2 设置value的输出类型
job.setOutputKeyClass(IntWritable.class);
// 5. 将输入的路径进行多源化处理
// 5.1 传入一个目录,获取的是一个应用,以逗号分割取出第一个(一个文件定位到文件或者目录都可以)
String [] filePathArray = remainingArgs[0].split(",");
// 5.2 遍历添加
for (String filePath : filePathArray){
FileInputFormat.addInputPath(job, new Path(filePath));
}
// 6. 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
// 7. 系统退出JVM
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
工具类:
package com.miracle.tears.util;
import com.miracle.tears.exception.FilePathError;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class HdfsOperatorUtil {
/**
* 给定一个HDFS文件,获取该文件对应的文本文件内容
* @param txtFilePath 文件路径
* @return 文件内容
*/
public static String readContentFormFile(String txtFilePath) throws FilePathError, IOException {
// 1. 边界检查
if (txtFilePath == null || txtFilePath.trim().length() == 0){
throw new FilePathError("文件路径不存在或者路径为空,请检查文件路径!");
}
// 2. 加速读取效率,抽象给定一个HDFS文件路径拿到其对应的字节数组
byte[] byteArray = readFromHdfsFileToByteArray(txtFilePath);
// 3. 返回字节数组,并设置编码格式
return new String(byteArray,"utf-8");
}
/**
* 获取 hadoopConf 对象
*/
static Configuration hadoopConf = new Configuration();
/**
* 读取hdfs中文件转换为字节数组
*
* @param resourceFilePath 本地文件路径
* @return 本地文件内容 字节数组形式
* @throws IOException 异常
*/
public static byte[] readFromHdfsFileToByteArray(String resourceFilePath) throws IOException, FilePathError {
if (resourceFilePath == null || resourceFilePath.trim().length() == 0) {
throw new FilePathError("文件路径不存在或者路径为空,请检查文件路径!");
}
// 获取 hadoopConf 对应的集群的对象引用
FileSystem fs = FileSystem.get(hadoopConf);
// 将传入的源文件路径转换为一个hdfs文件路径
Path hdfsPath = new Path(resourceFilePath);
// 开启数据输入流,向内存输入数据
FSDataInputStream inputStream = fs.open(hdfsPath);
// 初始化java字节数组 1024*64
byte[] byteArray = new byte[65536];
// 创建输出缓冲流,提升效率
ByteArrayOutputStream bos = new ByteArrayOutputStream();
// 定义每次读取的数据量
int readLength = 0;
// 循环读取
while ((readLength = inputStream.read(byteArray)) > 0) {
bos.write(byteArray);
byteArray = new byte[65536];
}
// 读取完毕后,关闭hdfs输入流
inputStream.close();
// 将之前写到的字节输出流中的字节,转换为一个整体的字节数组
byte[] resultByteArray = bos.toByteArray();
// 关闭输出缓冲流
bos.close();
// 返回读取的数据集
return resultByteArray;
}
}
自定义异常类:
package com.miracle.tears.exception;
/**
* 自定义异常类
* 1. 继承一个已有的异常
*/
public class FilePathError extends Exception {
/**
* 2. 公共的无参构造
*/
public FilePathError() {
super();
}
/**
* 3. 调用父类的有参构造
* @param message 抛出传入的异常信息
*/
public FilePathError(String message) {
super(message);
}
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)