面试题:MapReduce黑名单过滤问题

举报
RupertTears 发表于 2021/07/29 13:58:20 2021/07/29
【摘要】 题目:MapReduce 黑名单过滤问题 问题描述:在HDFS中有两个名单:人员名单,人员黑名单。要求:对该人员名单进行过滤,除去黑名单中的人员,并且统计人员名单中人员出现的频次。将结果生成文件,保存在HDFS中。

题目:

           MapReduce 黑名单过滤问题

问题描述:

           在HDFS中有两个名单:人员名单,人员黑名单。

           要求:对该人员名单进行过滤,除去黑名单中的人员,并且统计人员名单中人员出现的频次。

           将结果生成文件,保存在HDFS中。

核心业务代码:

package com.miracle.tears.detail;

import com.miracle.tears.exception.FilePathError;
import com.miracle.tears.util.HdfsOperatorUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.StringTokenizer;

/**
 *
 * 题目:
 *      在 hdfs 目录/tmp/tianliangedu/input/wordcount 目录中有一系列文件,内容为","号分
 *  隔,同时在 hdfs 路径/tmp/tianliangedu/black.txt 黑名单文件,一行一个单词用于存放不记入统
 *  计的单词列表。求按","号分隔的各个元素去除掉黑名单后的出现频率,输出到目录
 *  /tmp/tianliangedu/output/个人用户名的 hdfs 目录中。
 *
 *  黑名单过滤问题
 *        通过 Configuration 配置传参的方式解决
 *        map reduce driver 三合一的写法
 *
 *  分析:
 *    三个路径
 *      输入路径
 *          HDFS中一个人员名单 和 一个 人员黑名单
 *      输入路径
 *          HDFS路径
 *    人员名单
 *      按 逗号 分割
 *      过滤掉黑名单中人员
 *      计数剩余人员频次
 */
public class BlackList4Conf {

    /**
     * map 方法
     */
    public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>{

        // 4.1 定义用于去重的数据结构 (存放了黑名单集合)
        Set<String> blackUniqSet = new HashSet<>();



        /**
         * 拿到黑名单数据,方便 map 对人员进行判断
         * @param context 上下文
         */
        @Override
        protected void setup(Mapper<Object, Text, Text, IntWritable>.Context context) {
            // 1. 获取hadoop配置对象
            Configuration conf = new Configuration();
            // 2. 通过配置对象得到黑名单内容
            String configContent = conf.get("blacklist");
            // 3. 将字符串以空白分割,转变为数组存储 (多一个/ 用于转义)
            String [] blackListArray = configContent.split("//s");
            // 4. 将黑名单数组存入集合 (数组转集合)
            blackUniqSet.addAll(Arrays.asList(blackListArray));
        }

        // 定义一个String对象的mr包装类Text变量
        Text wordText = new Text();
        // 定义一个值为1的IntWritable对象,记录频次,方便kv输出
        IntWritable one = new IntWritable(1);


        /**
         * 对上层传递过来的数据
         *      依据 value 值,一一处理
         * @param key 在此阶段进行key的定义
         * @param value 传递过来的数据
         * @param context 上下文环境 用于承接参数的传递、环境配置等
         * @throws IOException 异常
         * @throws InterruptedException 异常
         */

        @Override
        protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // 1. 用字符串分割工具类,处理行数据,按逗号分割
            StringTokenizer st = new StringTokenizer(value.toString(),",");
            // 2. 对分割出的数据进行处理
            while(st.hasMoreTokens()){
                // 2.1 接收传递来的数据
                String word = st.nextToken();
                // 2.2 判断这个名字是否在黑名单中
                if(!blackUniqSet.contains(word)){
                    /**
                     *  2.3 不在黑名单中
                     *  2.4 新建 hadoop 数据类型
                     *  2.5 设置key 和 value
                     */
                    wordText.set(word);
                    context.write(wordText,one);
                }
            }
        }
    }


    /**
     * reduce 方法
     */
    public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable >{

        // 定义用于转换类型输出的sum包装类
        IntWritable sumWritable = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // 1. 相同的key抵达,定义要频次的和
            int sum = 0;
            // 2. 遍历每一个同key的值
            for (IntWritable temp : values){
                // 2.1 求和
                sum += temp.get();
            }
            // 3. 将频次和转换类型输出
            sumWritable.set(sum);
            // 4. 通过 context对象 输出
            context.write(key,sumWritable);
        }
    }

    /**
     * main 方法
     * 驱动类
     */
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, FilePathError {
        // 1. 获取hadoop配置环境变量
        Configuration conf = new Configuration();

        // 2.1 添加 GOP (通用选项解析器)
        GenericOptionsParser gop = new GenericOptionsParser(conf, args);
        // 2.2 读取客户端输入的参数,存入数组中
        String [] remainingArgs = gop.getRemainingArgs();
        // 2.3 定义黑名单的路径参数 为用户输入的第三个参数
        String hdfsConfigPath = remainingArgs[2];
        // 2.4 使用自定义工具类读取文件内容,并存储在字符串中
        String configContent = HdfsOperatorUtil.readContentFormFile(hdfsConfigPath);
        // 2.5 设置在全局中
        conf.set("blacklist",configContent);

        // 3. 获取任务实例
        Job job = Job.getInstance(conf,"黑名单过滤");
        // 3.1 设置主类
        job.setJarByClass(BlackList4Conf.class);
        // 3.2 设置Mapper
        job.setMapperClass(MyMapper.class);
        // 3.3 设置Combiner
        job.setCombinerClass(MyReduce.class);
        // 3.4 设置Reducer
        job.setReducerClass(MyReduce.class);

        // 4.1 设置key的输出类型
        job.setOutputKeyClass(Text.class);
        // 4.2 设置value的输出类型
        job.setOutputKeyClass(IntWritable.class);

        // 5. 将输入的路径进行多源化处理
        // 5.1 传入一个目录,获取的是一个应用,以逗号分割取出第一个(一个文件定位到文件或者目录都可以)
        String [] filePathArray = remainingArgs[0].split(",");
        // 5.2 遍历添加
        for (String filePath : filePathArray){
            FileInputFormat.addInputPath(job, new Path(filePath));
        }

        // 6. 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));

        // 7. 系统退出JVM
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

工具类:

package com.miracle.tears.util;

import com.miracle.tears.exception.FilePathError;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class HdfsOperatorUtil {
    /**
     * 给定一个HDFS文件,获取该文件对应的文本文件内容
     * @param txtFilePath 文件路径
     * @return 文件内容
     */
    public static String readContentFormFile(String txtFilePath) throws FilePathError, IOException {
        // 1. 边界检查
        if (txtFilePath == null || txtFilePath.trim().length() == 0){
            throw new FilePathError("文件路径不存在或者路径为空,请检查文件路径!");
        }
        // 2. 加速读取效率,抽象给定一个HDFS文件路径拿到其对应的字节数组
        byte[] byteArray = readFromHdfsFileToByteArray(txtFilePath);
        // 3. 返回字节数组,并设置编码格式
        return new String(byteArray,"utf-8");
    }

    /**
     * 获取 hadoopConf 对象
     */
    static Configuration hadoopConf = new Configuration();

    /**
     * 读取hdfs中文件转换为字节数组
     *
     * @param resourceFilePath 本地文件路径
     * @return 本地文件内容 字节数组形式
     * @throws IOException 异常
     */
    public static byte[] readFromHdfsFileToByteArray(String resourceFilePath) throws IOException, FilePathError {
        if (resourceFilePath == null || resourceFilePath.trim().length() == 0) {
            throw new FilePathError("文件路径不存在或者路径为空,请检查文件路径!");
        }
        // 获取 hadoopConf 对应的集群的对象引用
        FileSystem fs = FileSystem.get(hadoopConf);
        // 将传入的源文件路径转换为一个hdfs文件路径
        Path hdfsPath = new Path(resourceFilePath);
        // 开启数据输入流,向内存输入数据
        FSDataInputStream inputStream = fs.open(hdfsPath);
        // 初始化java字节数组 1024*64
        byte[] byteArray = new byte[65536];
        // 创建输出缓冲流,提升效率
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        // 定义每次读取的数据量
        int readLength = 0;
        // 循环读取
        while ((readLength = inputStream.read(byteArray)) > 0) {
            bos.write(byteArray);
            byteArray = new byte[65536];
        }
        // 读取完毕后,关闭hdfs输入流
        inputStream.close();
        // 将之前写到的字节输出流中的字节,转换为一个整体的字节数组
        byte[] resultByteArray = bos.toByteArray();
        // 关闭输出缓冲流
        bos.close();
        // 返回读取的数据集
        return resultByteArray;
    }

}

自定义异常类:

package com.miracle.tears.exception;

/**
 * 自定义异常类
 *      1. 继承一个已有的异常
 */
public class FilePathError extends Exception {
    /**
     * 2. 公共的无参构造
     */
    public FilePathError() {
        super();
    }

    /**
     * 3. 调用父类的有参构造
     * @param message 抛出传入的异常信息
     */
    public FilePathError(String message) {
        super(message);
    }
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。