使用文件模式,实现多文件上传至HDFS
前面有讲如何上传单个本地文件到HDFS文件系统,那么当一个目录下包含很多个文件,我们该如何做呢?这时我们就需要用到文件模式。 所以,在项目开始之前,先来学习一下什么是文件模式?
文件模式
在某个单一操作中处理一系列文件是很常见的。例如一个日志处理的MapReduce作业可能要分析一个月的日志量。如果一个文件一个文件或者一个目录一个目录的声明那就太麻烦了,我们可以使用通配符(wild card)来匹配多个文件(这个操作也叫做globbing)。
Hadoop提供了两种方法来处理文件组:
public FileStatus[] globStatus(Path pathPattern) throws IOException;
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException;
PathFilter
使用文件模式有时候并不能有效的描述你想要的一系列文件,例如如果你想排除某个特定文件就很难。所以FileSystem的listStatus()和globStatus()方法就提供了一个可选参数:PathFilter——它允许你一些更细化的控制匹配:
package org.apache.hadoop.fs;
public interface PathFilter
{
boolean accept(Path path);
}
Hadoop中的匹配符与Unix中bash相同,如下图所示:
则对应于该组织结构有如下图表示:
上面我们已经掌握了文件模式如何使用,现在让我们一起开始项目吧,小讲已经迫不及待。
项目介绍
我们利用通配符和PathFilter 对象,将本地多种格式的文件上传至 HDFS文件系统,并过滤掉 txt文本格式以外的文件。
项目数据
如下图所示:
项目分析
基于项目的需求,我们通过以下两步完成:
1、首先使用globStatus(Path pathPattern, PathFilter filter),完成文件格式过滤,获取所有 txt 格式的文件。
2、然后使用 Java API 接口 copyFromLocalFile,将所有 txt 格式的文件上传至 HDFS。
项目程序
猛戳这里进行完整代码下载 核心代码如下图所示
首先定义一个类 RegexAcceptPathFilter实现 PathFilter,过滤掉 txt 文本格式以外的文件。
// 过滤文件
public static class RegexAcceptPathFilter implements PathFilter {
private final String regex;
public RegexAcceptPathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return flag;
}
}
如果要接收 regex 格式的文件,则accept()方法就return flag; 如果想要过滤掉regex格式的文件,则accept()方法就return !flag。
接下来在 list 方法中,使用 globStatus 方法获取所有 txt 文件,然后通过 copyFromLocalFile 方法将文件上传至 HDFS。
/**
* 过滤文件格式 将多个文件上传至 HDFS
* @param dstPath 目的路径
* @throws IOException
* @throws URISyntaxException
*/
public static void list(String srcPath,String dstPath) throws IOException, URISyntaxException {
//读取配置文件
Configuration conf = new Configuration();
//获取默认文件系统 在Hadoop 环境下运行,也可以使用此种方法获取文件系统
//fs = FileSystem.get(conf);
//获取指定文件系统 本地环境运行,需要使用此种方法获取文件系统
URI uri = new URI("hdfs://xxx:9000");//HDFS 地址
fs = FileSystem.get(uri,conf);
// 获取本地文件系统
local = FileSystem.getLocal(conf);
//获取文件目录
FileStatus[] localStatus = local.globStatus(new Path(srcPath),new RegexAcceptPathFilter("^.*txt$"));
//获取所有文件路径
Path[] listedPaths = FileUtil.stat2Paths(localStatus);
//输出路径
Path out = new Path(dstPath);
//循环所有文件
for(Path p:listedPaths){
//将本地文件上传到HDFS
fs.copyFromLocalFile(p, out);
}
}
在 main() 方法在调用 list,执行多文件上传至 HDFS。
package com.dajiangtai.hadoop.test2;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
/**
* @function 将指定格式的多个文件上传至 HDFS
* @author cs
*
*/public class CopyManyFilesToHDFS {
private static FileSystem fs = null;
private static FileSystem local = null;
/**
* @function Main 方法
* @param args
* @throws IOException
* @throws URISyntaxException
*/
public static void main(String[] args) throws IOException,URISyntaxException {
//文件上传路径
Path dstPath = new Path("hdfs://xxx:9000/middle/filter/");
//调用文件上传 list 方法
list(dstPath);
}
/**
* function 过滤文件格式 将多个文件上传至 HDFS
* @param dstPath 目的路径
* @throws IOException
* @throws URISyntaxException
*/
public static void list(Path dstPath) throws IOException, URISyntaxException {
//读取hadoop文件系统的配置
Configuration conf = new Configuration();
//HDFS 接口
URI uri = new URI("hdfs://xxx:9000");
//获取文件系统对象
fs = FileSystem.get(uri, conf);
// 获得本地文件系统
local = FileSystem.getLocal(conf);
//只上传data/testdata 目录下 txt 格式的文件
FileStatus[] localStatus = local.globStatus(new Path("E:/DHDemo/data/*"),new RegexAcceptPathFilter("^.*txt$"));
// 获得所有文件路径
Path[] listedPaths = FileUtil.stat2Paths(localStatus);
for(Path p:listedPaths){
//将本地文件上传到HDFS
fs.copyFromLocalFile(p, dstPath);
}
}
}
项目运行
1、保证Hadoop集群正常启动
2、保证数据集下载到本地目录下(如上面所述)
3、将代码拷贝到eclipse工具对应的包中
4、修改程序路径代码和自己的一致
5、在DFS文件系统里创建filter目录。具体操作:
(1)鼠标在middle目录上右键(因为代码中的路径是"hdfs://xxx:9000/middle/filter/")
(2)点击Cerate new directory选项,输入filter
(3)右键middle,点击Refresh即可看到filter目录
6、运行程序
运行结果
右键filter,点击Refresh即可
你可以在filter目录下看到如下结果
本地文件已经上传至 HDFS,希望大家动手试一试。
- 点赞
- 收藏
- 关注作者
评论(0)