【Hadoop】【Mapreduce】Map数量到底由什么决定
Job.waitForCompletion()
---job.submit()
------JobSubmitter.submitJobInternal()
submitJobInternal()中处理切片的核心代码如下:
int maps = writeSplits(job, submitJobDir);
//mapreduce.job.maps
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
//mapreduce.job.max.map, 默认为-1,表示不做限制
int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP, MRJobConfig.DEFAULT_JOB_MAX_MAP);
if (maxMaps >= 0 && maxMaps < maps) {
throw new IllegalArgumentException("The number of map tasks " + maps +
" exceeded limit " + maxMaps);
}
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
//mapred.mapper.new-api,默认为false,有下面两种切分方式。默认使用老的方式
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
1-新方式
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
//mapreduce.job.inputformat.class, 默认为TextInputFormat
//因此不难看出,新的分片方式是根据具体的输入实现类有关的,
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
2-旧方式
private int writeOldSplits(JobConf job, Path jobSubmitDir)
throws IOException {
//输入格式:mapred.input.format.class,默认为TextInoutFormat;
//传入参数:mapreduce.job.maps
org.apache.hadoop.mapred.InputSplit[] splits =
job.getInputFormat().getSplits(job, job.getNumMapTasks());
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {
public int compare(org.apache.hadoop.mapred.InputSplit a,
org.apache.hadoop.mapred.InputSplit b) {
try {
long left = a.getLength();
long right = b.getLength();
if (left == right) {
return 0;
} else if (left < right) {
return 1;
} else {
return -1;
}
} catch (IOException ie) {
throw new RuntimeException("Problem getting input split size", ie);
}
}
});
JobSplitWriter.createSplitFiles(jobSubmitDir, job,
jobSubmitDir.getFileSystem(job), splits);
return splits.length;
}
两种方式处理逻辑大同小异,比较大的差异仅仅是读取作业输入类型实现类。
新的方式为:mapreduce.job.inputformat.class;
老方式读取的配置项为mapred.input.format.class
切分的实现是在输入类中进行的,例如最常用的:FileInputFormat
/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
StopWatch sw = new StopWatch().start();
//输入的文件数组,
FileStatus[] stats = listStatus(job);
//设置参数:mapreduce.input.fileinputformat.numinputfiles
job.setLong(NUM_INPUT_FILES, stats.length);
long totalSize = 0; // compute total size
boolean ignoreDirs = !job.getBoolean(INPUT_DIR_RECURSIVE, false)
&& job.getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
List<FileStatus> files = new ArrayList<>(stats.length);
for (FileStatus file: stats) { // check we have valid files
if (file.isDirectory()) {
if (!ignoreDirs) {
throw new IOException("Not a file: "+ file.getPath());
}
} else {
files.add(file);
totalSize += file.getLen();
}
}
//根据客户设置的map数量,算出理想情况下的分片大小
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
//max(mapreduce.input.fileinputformat.split.maxsize, 1)
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), 1);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//有些输入是不能切分的,FileInputFormat是可以切分的
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
//最终切片的大小
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > 1.1) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
}
//如果这个输入不能切分,FileInputFormat的输入都是可以切分的,不会走到这个分支
} else {
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization “+ "is possible: " + file.getPath());
}
}
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
//对于长度为0的文件,创建一个空的split
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits.toArray(new FileSplit[splits.size()]);
}
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
其中:
minSize: Math.max(job.getLong(mapreduce.input.fileinputformat.split.minsize, 1), 1);即,当minsize没有设置或者其值小于1,那么取值1,如果大于1,则取值minsize
goalSize: totalSize / (numSplits == 0 ? 1 : numSplits);按照客户的配置理论上可以划分分片大小;使用参数:mapreduce.job.maps
blockSize: dfs.blocksize;
后面的逻辑就将比较简单了,拿到spliteSize之后,直接使用文件大小去分割,当然如果当前遍历的文件大小小于splitSize,那么也会生成一个splitter来处理。即一个文件至少会被分割成一个split。
因此如果是小文件场景,就不适合用FileInputFormat了,成千上万的小文件,意味着成千上万的split即同等数量的map task
mapreduce计算框架实现了很多InputFormat,上面介绍的FileInputFormat只是其中一种,下面介绍一种适合小文件的分割方式org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
但是CombineFileInputFormat是abstract类型,不能直接实例化,因此需要实现它。mapreduce框架中提供了好几种实现:
CombineTextInputFormat
CombineSequenceFileInputFormat
用户也可以通过实现CombineFileInputFormat自定义实现。例如Hive就通过实现CombineHiveInputFormat
下面分析下org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat是如何分片的。
mapreduce.input.fileinputformat.split.minsize.per.node
mapreduce.input.fileinputformat.split.minsize.per.rack
mapreduce.input.fileinputformat.split.maxsize
public List<InputSplit> getSplits(JobContext job)
throws IOException {
long minSizeNode = 0;
long minSizeRack = 0;
long maxSize = 0;
Configuration conf = job.getConfiguration();
// the values specified by setxxxSplitSize() takes precedence over the
// values that might have been specified in the config
if (minSplitSizeNode != 0) {
minSizeNode = minSplitSizeNode;
} else {
minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
}
if (minSplitSizeRack != 0) {
minSizeRack = minSplitSizeRack;
} else {
minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
}
if (maxSplitSize != 0) {
maxSize = maxSplitSize;
} else {
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
// If maxSize is not configured, a single split will be generated per
// node.
}
//job里面输入的文件列表
List<FileStatus> stats = listStatus(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
if (stats.size() == 0) {
return splits;
}
// 这里涉及一个pool的概念,这个是对提供的接口,需要实现。
// 这个功能主要是将路径分道不同的pool中,然后遍历这些池子。这样可以确保每一个文件只能属于一个池子。
// 同一个池子里面的file通常具有业务上的相关性。具体实现依业务侧而定,默认啥都没有
for (MultiPathFilter onepool : pools) {
ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
// pick one input path. If it matches all the filters in a pool,
// add it to the output set
for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
FileStatus p = iter.next();
if (onepool.accept(p.getPath())) {
myPaths.add(p); // add it to my output set
iter.remove();
}
}
// create splits for all files in this pool.
getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
}
// create splits for all files that are not in any pool.
getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
// free up rackToNodes map
rackToNodes.clear();
return splits;
}
CombineFileInputFormat.getMoreSplits()
/**
* Return all the splits in the specified set of paths
*/
private void getMoreSplits(JobContext job, List<FileStatus> stats,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits)
throws IOException {
Configuration conf = job.getConfiguration();
// all blocks for all the files in input set
OneFileInfo[] files;
// mapping from a rack name to the list of blocks it has
HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>();
// mapping from a block to the nodes on which it has replicas
HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>();
// mapping from a node to the list of blocks that it contains
HashMap<String, Set<OneBlockInfo>> nodeToBlocks = new HashMap<String, Set<OneBlockInfo>>();
files = new OneFileInfo[stats.size()];
if (stats.size() == 0) {
return;
}
// populate all the blocks for all files
long totLength = 0;
int i = 0;
for (FileStatus stat : stats) {
//使用输入文件的Stat信息,构造出上面声明的三种映射关系:
//1-机架-块;
//2-块-DataNode节点
//3-DataNode节点-块
files[i] =new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()), rackToBlocks, blockToNodes, nodeToBlocks,rackToNodes, maxSize);
totLength += files[i].getLength();
}
createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, maxSize, minSizeNode, minSizeRack, splits);
}
CombineFileInputFormat里面有三轮小文件合并,可能会造成这种差异:
第一轮:尝试将同一个datanode上面的block合并;
第二轮:尝试将同一个机架上面的block合并;
第三轮:随意合并,不用考虑block距离;
【OBS场景】
OBS场景下,没有block ,机架这种信息,因此不存在block之间的距离,所以combine的时候会直接跳过上面前两轮过程,直接进入第三轮,因此对于28MB的数据,只能产生一个map
【HDFS场景下】
涉及三个参数:
mapreduce.input.fileinputformat.split.minsize.per.node
mapreduce.input.fileinputformat.split.minsize.per.rack
mapreduce.input.fileinputformat.split.maxsize
void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
Map<OneBlockInfo, String[]> blockToNodes,
Map<String, List<OneBlockInfo>> rackToBlocks,
long totLength,
long maxSize,
long minSizeNode,
long minSizeRack,
List<InputSplit> splits
) {
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
long curSplitSize = 0;
int totalNodes = nodeToBlocks.size();
long totalLength = totLength;
Multiset<String> splitsPerNode = HashMultiset.create();
Set<String> completedNodes = new HashSet<String>();
//第一轮
while(true) {
//遍历这些输入涉及到的所有datanode
for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
String node = one.getKey();
// 如果一个节点已经被处理完,那么直接跳过
if (completedNodes.contains(node)) {
continue;
}
Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
// 遍历这个节点上面的所有block,由于一个block可以分布在多个节点上面,因此一个block被处理完之后立即将其从block-node映射关系中剔除。
Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
while (oneBlockIter.hasNext()) {
OneBlockInfo oneblock = oneBlockIter.next();
// 当前遍历的节点不在block-node中,说明这个block已经被处理过了,直接跳过。
if(!blockToNodes.containsKey(oneblock)) {
oneBlockIter.remove();
continue;
}
//处理block
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
curSplitSize = 0;
splitsPerNode.add(node);
// Remove entries from blocksInNode so that we don't walk these
// again.
blocksInCurrentNode.removeAll(validBlocks);
validBlocks.clear();
// Done creating a single split for this node. Move on to the next
// node so that splits are distributed across nodes.
//一次遍历的datanode节点的时候,并不是一股脑将所有的数据都分到切片汇总,一个node分配一个切片之后就,遍历下一个节点。因为一个block可能分布在多个节点上,如果只紧这一个节点分split,那么这个节点可能压力会很大。因此当前的datanode节点生成一个split之后,就去遍历下一个节点。,当然有可能遍历完所有的datanode节点之后,block-nodes映射中剩余,即输入的文件中海油block剩余没有分配到split中,这时候就到下一轮分配了:基于机架的分配
break;
}
}
//对于小文件场景,可能遍历完一个节点上面的所有block也没有凑够mapreduce.input.fileinputformat.split.maxsize,
//那么会进去到这里,继续处理。其中validBlock表示本轮预处理的block
if (validBlocks.size() != 0) {
// This implies that the last few blocks (or all in case maxSize=0)
// were not part of a split. The node is complete.
// if there were any blocks left over and their combined size is
// larger than minSplitNode, then combine them into one split.
// Otherwise add them back to the unprocessed pool. It is likely
// that they will be combined with other blocks from the
// same rack later on.
// This condition also kicks in when max split size is not set. All
// blocks on a node will be grouped together into a single split.
//如果当前的split已经达到了mapreduce.input.fileinputformat.split.minsize.per.node,那么创建一个split
if (minSizeNode != 0 && curSplitSize >= minSizeNode && splitsPerNode.count(node) == 0) {
// haven't created any split on this machine. so its ok to add a
// smaller one for parallelism. Otherwise group it in the rack for
// balanced size create an input split and add it to the splits
// array
addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
splitsPerNode.add(node);
// Remove entries from blocksInNode so that we don't walk this again.
blocksInCurrentNode.removeAll(validBlocks);
// The node is done. This was the last set of blocks for this node.
} else {
// Put the unplaced blocks back into the pool for later rack-allocation.
for (OneBlockInfo oneblock : validBlocks) {
blockToNodes.put(oneblock, oneblock.hosts);
}
}
validBlocks.clear();
curSplitSize = 0;
completedNodes.add(node);
} else { // 如果经过了上面的过程之后当前的节点上面所有的数据块都被添加到split之后,就将该block添加到完成节点列表中,放置重复遍历
if (blocksInCurrentNode.size() == 0) {
// Node is done. All blocks were fit into node-local splits.
completedNodes.add(node);
} // else Run through the node again.
}
}
// 每次处理完一个节点之后都要判断一下是不是所有的节点都已经处理完了
if (completedNodes.size() == totalNodes || totalLength == 0) {
// All nodes have been walked over and marked as completed or all blocks
// have been assigned. The rest should be handled via rackLock assignment.
LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
+ completedNodes.size() + ", size left: " + totalLength);
break;
}
}
// if blocks in a rack are below the specified minimum size, then keep them
// in 'overflow'. After the processing of all racks is complete, these
// overflow blocks will be combined into splits.
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
Set<String> racks = new HashSet<String>();
// 如果经历了上面的过程之后之后还是有block没有处理,那么进行第二轮处理,基于机架进行合并。
while (blockToNodes.size() > 0) {
// Create one split for this rack before moving over to the next rack.
// Come back to this rack after creating a single split for each of the
// remaining racks.
// Process one rack location at a time, Combine all possible blocks that
// reside on this rack as one split. (constrained by minimum and maximum
// split size).
// 遍历所有的rack,以及针对每一个机架遍历其上没有被分到split的block。达到maxSize之后分配一个split。当然有可能遍历完rack上面所有的block之后,其大小之和没有达到可以split的大小。那么会进一步判断是否达到rack.minsize.如果达到,就会创建一个split,然后遍历下一个rack,当然有可能
for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
rackToBlocks.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
racks.add(one.getKey());
List<OneBlockInfo> blocks = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
boolean createdSplit = false;
for (OneBlockInfo oneblock : blocks) {
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, getHosts(racks), validBlocks);
createdSplit = true;
break;
}
}
}
// if we created a split, then just go to the next rack
if (createdSplit) {
curSplitSize = 0;
validBlocks.clear();
racks.clear();
continue;
}
if (!validBlocks.isEmpty()) {
if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
// if there is a minimum size specified, then create a single split
// otherwise, store these blocks into overflow data structure
addCreatedSplit(splits, getHosts(racks), validBlocks);
} else {
// There were a few blocks in this rack that
// remained to be processed. Keep them in 'overflow' block list.
// These will be combined later.
// 如果凑不够minRackSize,那么将这些block添加到overflowBlocks中,放到最后一轮统一处理
overflowBlocks.addAll(validBlocks);
}
}
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
assert blockToNodes.isEmpty();
assert curSplitSize == 0;
assert validBlocks.isEmpty();
assert racks.isEmpty();
// 最后一轮,处理所有添加到overflowBlocks中的数据块
for (OneBlockInfo oneblock : overflowBlocks) {
validBlocks.add(oneblock);
curSplitSize += oneblock.length;
// This might cause an exiting rack location to be re-added,
// but it should be ok.
for (int i = 0; i < oneblock.racks.length; i++) {
racks.add(oneblock.racks[i]);
}
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, getHosts(racks), validBlocks);
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
// Process any remaining blocks, if any.
if (!validBlocks.isEmpty()) {
addCreatedSplit(splits, getHosts(racks), validBlocks);
}
}
- 点赞
- 收藏
- 关注作者
评论(0)