【详解】HadoopMapReduceJob的几种启动方式
Hadoop MapReduce Job的几种启动方式
在大数据处理领域,Hadoop是一个广泛使用的开源框架,它支持数据密集型分布式应用程序。MapReduce是Hadoop的核心组件之一,用于并行处理大规模数据集。本文将介绍如何通过不同的方法启动Hadoop MapReduce Job。
1. 使用Hadoop命令行工具
最直接和常用的方法是通过Hadoop提供的命令行工具来提交MapReduce任务。这种方式适用于大多数情况,并且配置简单。
基本命令格式
hadoop jar <path-to-jar> [main-class] [job-args]
-
<path-to-jar>
: 指向包含MapReduce程序的JAR文件的路径。 -
[main-class]
: 可选参数,指定主类名,如果JAR文件中已定义了主类,则可以省略。 -
[job-args]
: 提供给MapReduce作业的参数列表。
示例
假设你有一个名为wordcount.jar
的JAR文件,其中包含一个名为WordCount
的主类,可以通过以下命令启动:
hadoop jar wordcount.jar WordCount /input /output
这里,/input
是输入目录,/output
是输出目录。
2. 使用Java API
对于需要更灵活控制MapReduce作业的情况,可以直接使用Hadoop的Java API编写代码来提交作业。这种方式提供了更多的配置选项,适合复杂的业务场景。
示例代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行步骤
- 编译上述代码并打包成JAR文件。
- 使用Hadoop命令行工具提交作业,如前文所述。
3. 使用Oozie工作流引擎
对于复杂的多步骤数据处理流程,可以使用Oozie工作流引擎来管理和调度MapReduce作业。Oozie允许用户定义一个工作流,该工作流可以包含多个MapReduce作业以及其他类型的作业(如Pig、Hive等)。
定义工作流
<workflow-app name="wordcount-wf" xmlns="uri:oozie:workflow:0.4">
<start to="wordcount"/>
<action name="wordcount">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapreduce.job.jar</name>
<value>/path/to/wordcount.jar</value>
</property>
<property>
<name>mapreduce.map.class</name>
<value>com.example.WordCount$TokenizerMapper</value>
</property>
<property>
<name>mapreduce.reduce.class</name>
<value>com.example.WordCount$IntSumReducer</value>
</property>
<property>
<name>mapreduce.job.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapreduce.job.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>/input</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>/output</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
提交工作流
使用Oozie客户端提交工作流:
oozie job -config job.properties -run
其中,job.properties
文件包含必要的配置属性,如工作流XML文件的位置、Hadoop集群信息等。
这篇文章详细介绍了Hadoop MapReduce Job的几种启动方式,包括使用命令行工具、Java API以及Oozie工作流引擎的方法,每种方法都有相应的示例代码或配置说明,帮助读者更好地理解和应用这些技术。当然可以!Hadoop MapReduce 是一个强大的框架,用于处理和分析分布在大型集群上的大数据集。下面我将介绍几种常见的启动 Hadoop MapReduce Job 的方式,并提供相应的示例代码。
1. 使用命令行启动
最简单的方式是通过命令行提交 MapReduce 作业。假设你已经编写了一个 MapReduce 程序并打包成 JAR 文件,可以通过以下命令提交作业:
hadoop jar /path/to/your-job.jar com.yourcompany.yourapp.YourDriverClass /input/path /output/path
2. 使用 Java API 启动
你也可以在 Java 代码中直接启动 MapReduce 作业。以下是一个简单的示例:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// 创建配置对象
Configuration conf = new Configuration();
// 创建 Job 对象
Job job = Job.getInstance(conf, "word count");
// 设置主类
job.setJarByClass(WordCountDriver.class);
// 设置 Mapper 和 Reducer 类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入和输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3. 使用 Oozie 工作流引擎启动
Oozie 是一个工作流调度系统,可以用来管理和调度 Hadoop 作业。以下是一个简单的 Oozie 工作流 XML 配置文件示例:
<workflow-app name="wordcount-wf" xmlns="uri:oozie:workflow:0.5">
<start to="wordcount"/>
<action name="wordcount">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapreduce.job.jar</name>
<value>/path/to/your-job.jar</value>
</property>
<property>
<name>mapreduce.job.classname</name>
<value>com.yourcompany.yourapp.YourDriverClass</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>/input/path</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>/output/path</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
4. 使用 Apache Livy REST API 启动
Livy 是一个用于与 Apache Spark 交互的 REST 服务,但也可以用于提交 Hadoop MapReduce 作业。以下是一个使用 Livy 提交 MapReduce 作业的 Python 示例:
import requests
import json
# Livy 服务器地址
livy_url = "http://<livy-server>:8998"
# 创建会话
response = requests.post(
f"{livy_url}/sessions",
headers={"Content-Type": "application/json"},
data=json.dumps({
"kind": "spark",
"conf": {
"spark.master": "yarn",
"spark.submit.deployMode": "cluster"
}
})
)
session_url = f"{livy_url}/sessions/{response.json()['id']}"
# 提交 MapReduce 作业
response = requests.post(
f"{session_url}/statements",
headers={"Content-Type": "application/json"},
data=json.dumps({
"code": """
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
val conf = new Configuration()
val job = Job.getInstance(conf, "word count")
job.setJarByClass(classOf[WordCountDriver])
job.setMapperClass(classOf[WordCountMapper])
job.setReducerClass(classOf[WordCountReducer])
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[IntWritable])
FileInputFormat.addInputPath(job, new Path("/input/path"))
FileOutputFormat.setOutputPath(job, new Path("/output/path"))
job.waitForCompletion(true)
"""
})
)
print(response.json())
Hadoop MapReduce Job可以通过多种方式启动,包括通过命令行、编写Java程序直接调用Hadoop API、使用Hadoop Streaming等。下面详细介绍这几种方式及其代码示例:
1. 命令行启动
通过命令行启动MapReduce作业是最简单的方法之一。假设你已经有一个打包好的JAR文件,并且你的MapReduce类名为MyMapReduce
,输入和输出路径分别为input
和output
,你可以使用以下命令来启动作业:
hadoop jar myjob.jar MyMapReduce input output
2. Java程序直接调用Hadoop API
通过编写Java程序直接调用Hadoop API来启动MapReduce作业,这种方式提供了更多的灵活性和控制能力。以下是一个简单的示例:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyMapReduce {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MyMapReduce <input path> <output path>");
System.exit(-1);
}
String inputPath = args[0];
String outputPath = args[1];
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "My MapReduce Job");
// 设置主类
job.setJarByClass(MyMapReduce.class);
// 设置Mapper和Reducer类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// 设置输出键值对类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入和输出路径
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// 提交作业并等待完成
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
在这个示例中,MyMapper
和MyReducer
是自定义的Mapper和Reducer类,你需要根据具体需求实现它们。
3. 使用Hadoop Streaming
Hadoop Streaming允许你使用任何可执行脚本或程序作为MapReduce的Mapper和Reducer。这对于使用非Java语言(如Python、Perl等)编写MapReduce作业非常有用。以下是一个使用Python脚本的示例:
假设你有两个Python脚本,mapper.py
和reducer.py
,你可以使用以下命令启动MapReduce作业:
hadoop jar /path/to/hadoop-streaming.jar \
-file /path/to/mapper.py -mapper /path/to/mapper.py \
-file /path/to/reducer.py -reducer /path/to/reducer.py \
-input /path/to/input -output /path/to/output
4. 使用Hadoop提供的工具类
Hadoop提供了一些工具类,如ToolRunner
,可以简化MapReduce作业的启动过程。以下是一个使用ToolRunner
的示例:
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyMapReduce extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MyMapReduce <input path> <output path>");
return -1;
}
String inputPath = args[0];
String outputPath = args[1];
Job job = Job.getInstance(getConf(), "My MapReduce Job");
job.setJarByClass(MyMapReduce.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MyMapReduce(), args);
System.exit(exitCode);
}
}
在这个示例中,MyMapReduce
类实现了Tool
接口,并重写了run
方法。main
方法中使用了ToolRunner
来运行作业。
以上是Hadoop MapReduce Job的几种启动方式及其代码示例。希望这些信息对你有所帮助!
- 点赞
- 收藏
- 关注作者
评论(0)