【详解】HadoopMapReduceJob的几种启动方式

举报
皮牙子抓饭 发表于 2025/09/28 21:57:04 2025/09/28
【摘要】 Hadoop MapReduce Job的几种启动方式在大数据处理领域,Hadoop是一个广泛使用的开源框架,它支持数据密集型分布式应用程序。MapReduce是Hadoop的核心组件之一,用于并行处理大规模数据集。本文将介绍如何通过不同的方法启动Hadoop MapReduce Job。1. 使用Hadoop命令行工具最直接和常用的方法是通过Hadoop提供的命令行工具来提交MapRedu...

Hadoop MapReduce Job的几种启动方式

在大数据处理领域,Hadoop是一个广泛使用的开源框架,它支持数据密集型分布式应用程序。MapReduce是Hadoop的核心组件之一,用于并行处理大规模数据集。本文将介绍如何通过不同的方法启动Hadoop MapReduce Job。

1. 使用Hadoop命令行工具

最直接和常用的方法是通过Hadoop提供的命令行工具来提交MapReduce任务。这种方式适用于大多数情况,并且配置简单。

基本命令格式

hadoop jar <path-to-jar> [main-class] [job-args]
  • ​<path-to-jar>​​: 指向包含MapReduce程序的JAR文件的路径。
  • ​[main-class]​​: 可选参数,指定主类名,如果JAR文件中已定义了主类,则可以省略。
  • ​[job-args]​​: 提供给MapReduce作业的参数列表。

示例

假设你有一个名为​​wordcount.jar​​的JAR文件,其中包含一个名为​​WordCount​​的主类,可以通过以下命令启动:

hadoop jar wordcount.jar WordCount /input /output

这里,​​/input​​是输入目录,​​/output​​是输出目录。

2. 使用Java API

对于需要更灵活控制MapReduce作业的情况,可以直接使用Hadoop的Java API编写代码来提交作业。这种方式提供了更多的配置选项,适合复杂的业务场景。

示例代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

运行步骤

  1. 编译上述代码并打包成JAR文件。
  2. 使用Hadoop命令行工具提交作业,如前文所述。

3. 使用Oozie工作流引擎

对于复杂的多步骤数据处理流程,可以使用Oozie工作流引擎来管理和调度MapReduce作业。Oozie允许用户定义一个工作流,该工作流可以包含多个MapReduce作业以及其他类型的作业(如Pig、Hive等)。

定义工作流

<workflow-app name="wordcount-wf" xmlns="uri:oozie:workflow:0.4">
    <start to="wordcount"/>
    <action name="wordcount">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.job.jar</name>
                    <value>/path/to/wordcount.jar</value>
                </property>
                <property>
                    <name>mapreduce.map.class</name>
                    <value>com.example.WordCount$TokenizerMapper</value>
                </property>
                <property>
                    <name>mapreduce.reduce.class</name>
                    <value>com.example.WordCount$IntSumReducer</value>
                </property>
                <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>/input</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>/output</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

提交工作流

使用Oozie客户端提交工作流:

oozie job -config job.properties -run

其中,​​job.properties​​文件包含必要的配置属性,如工作流XML文件的位置、Hadoop集群信息等。



这篇文章详细介绍了Hadoop MapReduce Job的几种启动方式,包括使用命令行工具、Java API以及Oozie工作流引擎的方法,每种方法都有相应的示例代码或配置说明,帮助读者更好地理解和应用这些技术。当然可以!Hadoop MapReduce 是一个强大的框架,用于处理和分析分布在大型集群上的大数据集。下面我将介绍几种常见的启动 Hadoop MapReduce Job 的方式,并提供相应的示例代码。

1. 使用命令行启动

最简单的方式是通过命令行提交 MapReduce 作业。假设你已经编写了一个 MapReduce 程序并打包成 JAR 文件,可以通过以下命令提交作业:

hadoop jar /path/to/your-job.jar com.yourcompany.yourapp.YourDriverClass /input/path /output/path

2. 使用 Java API 启动

你也可以在 Java 代码中直接启动 MapReduce 作业。以下是一个简单的示例:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

    public static void main(String[] args) throws Exception {
        // 创建配置对象
        Configuration conf = new Configuration();
        
        // 创建 Job 对象
        Job job = Job.getInstance(conf, "word count");
        
        // 设置主类
        job.setJarByClass(WordCountDriver.class);
        
        // 设置 Mapper 和 Reducer 类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        
        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        // 设置输入和输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // 提交作业
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3. 使用 Oozie 工作流引擎启动

Oozie 是一个工作流调度系统,可以用来管理和调度 Hadoop 作业。以下是一个简单的 Oozie 工作流 XML 配置文件示例:

<workflow-app name="wordcount-wf" xmlns="uri:oozie:workflow:0.5">
    <start to="wordcount"/>
    <action name="wordcount">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.job.jar</name>
                    <value>/path/to/your-job.jar</value>
                </property>
                <property>
                    <name>mapreduce.job.classname</name>
                    <value>com.yourcompany.yourapp.YourDriverClass</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>/input/path</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>/output/path</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

4. 使用 Apache Livy REST API 启动

Livy 是一个用于与 Apache Spark 交互的 REST 服务,但也可以用于提交 Hadoop MapReduce 作业。以下是一个使用 Livy 提交 MapReduce 作业的 Python 示例:

import requests
import json

# Livy 服务器地址
livy_url = "http://<livy-server>:8998"

# 创建会话
response = requests.post(
    f"{livy_url}/sessions",
    headers={"Content-Type": "application/json"},
    data=json.dumps({
        "kind": "spark",
        "conf": {
            "spark.master": "yarn",
            "spark.submit.deployMode": "cluster"
        }
    })
)

session_url = f"{livy_url}/sessions/{response.json()['id']}"

# 提交 MapReduce 作业
response = requests.post(
    f"{session_url}/statements",
    headers={"Content-Type": "application/json"},
    data=json.dumps({
        "code": """
            import org.apache.hadoop.conf.Configuration
            import org.apache.hadoop.fs.Path
            import org.apache.hadoop.mapreduce.Job
            import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
            import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

            val conf = new Configuration()
            val job = Job.getInstance(conf, "word count")
            job.setJarByClass(classOf[WordCountDriver])
            job.setMapperClass(classOf[WordCountMapper])
            job.setReducerClass(classOf[WordCountReducer])
            job.setOutputKeyClass(classOf[Text])
            job.setOutputValueClass(classOf[IntWritable])
            FileInputFormat.addInputPath(job, new Path("/input/path"))
            FileOutputFormat.setOutputPath(job, new Path("/output/path"))
            job.waitForCompletion(true)
        """
    })
)

print(response.json())

Hadoop MapReduce Job可以通过多种方式启动,包括通过命令行、编写Java程序直接调用Hadoop API、使用Hadoop Streaming等。下面详细介绍这几种方式及其代码示例:

1. 命令行启动

通过命令行启动MapReduce作业是最简单的方法之一。假设你已经有一个打包好的JAR文件,并且你的MapReduce类名为​​MyMapReduce​​,输入和输出路径分别为​​input​​和​​output​​,你可以使用以下命令来启动作业:

hadoop jar myjob.jar MyMapReduce input output

2. Java程序直接调用Hadoop API

通过编写Java程序直接调用Hadoop API来启动MapReduce作业,这种方式提供了更多的灵活性和控制能力。以下是一个简单的示例:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyMapReduce {

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MyMapReduce <input path> <output path>");
            System.exit(-1);
        }

        String inputPath = args[0];
        String outputPath = args[1];

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "My MapReduce Job");

        // 设置主类
        job.setJarByClass(MyMapReduce.class);

        // 设置Mapper和Reducer类
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        // 设置输出键值对类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入和输出路径
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        // 提交作业并等待完成
        boolean success = job.waitForCompletion(true);
        System.exit(success ? 0 : 1);
    }
}

在这个示例中,​​MyMapper​​和​​MyReducer​​是自定义的Mapper和Reducer类,你需要根据具体需求实现它们。

3. 使用Hadoop Streaming

Hadoop Streaming允许你使用任何可执行脚本或程序作为MapReduce的Mapper和Reducer。这对于使用非Java语言(如Python、Perl等)编写MapReduce作业非常有用。以下是一个使用Python脚本的示例:

假设你有两个Python脚本,​​mapper.py​​和​​reducer.py​​,你可以使用以下命令启动MapReduce作业:

hadoop jar /path/to/hadoop-streaming.jar \
    -file /path/to/mapper.py -mapper /path/to/mapper.py \
    -file /path/to/reducer.py -reducer /path/to/reducer.py \
    -input /path/to/input -output /path/to/output

4. 使用Hadoop提供的工具类

Hadoop提供了一些工具类,如​​ToolRunner​​,可以简化MapReduce作业的启动过程。以下是一个使用​​ToolRunner​​的示例:

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyMapReduce extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MyMapReduce <input path> <output path>");
            return -1;
        }

        String inputPath = args[0];
        String outputPath = args[1];

        Job job = Job.getInstance(getConf(), "My MapReduce Job");
        job.setJarByClass(MyMapReduce.class);

        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MyMapReduce(), args);
        System.exit(exitCode);
    }
}

在这个示例中,​​MyMapReduce​​类实现了​​Tool​​接口,并重写了​​run​​方法。​​main​​方法中使用了​​ToolRunner​​来运行作业。

以上是Hadoop MapReduce Job的几种启动方式及其代码示例。希望这些信息对你有所帮助!

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。