使用文件模式,实现多文件上传至HDFS

举报
Smy1121 发表于 2019/06/20 14:22:52 2019/06/20
【摘要】 前面有讲如何上传单个本地文件到HDFS文件系统,那么当一个目录下包含很多个文件,我们该如何做呢?这时我们就需要用到文件模式。 所以,在项目开始之前,先来学习一下什么是文件模式?文件模式 在某个单一操作中处理一系列文件是很常见的。例如一个日志处理的MapReduce作业可能要分析一个月的日志量。如果一个文件一个文件或者一个目录一个目录的声明那就太麻烦了,我们可以使...

        前面有讲如何上传单个本地文件到HDFS文件系统,那么当一个目录下包含很多个文件,我们该如何做呢?这时我们就需要用到文件模式。 所以,在项目开始之前,先来学习一下什么是文件模式?


文件模式


        在某个单一操作中处理一系列文件是很常见的。例如一个日志处理的MapReduce作业可能要分析一个月的日志量。如果一个文件一个文件或者一个目录一个目录的声明那就太麻烦了,我们可以使用通配符(wild card)来匹配多个文件(这个操作也叫做globbing)。

        Hadoop提供了两种方法来处理文件组:


public FileStatus[] globStatus(Path pathPattern) throws IOException;

public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException;

PathFilter


        使用文件模式有时候并不能有效的描述你想要的一系列文件,例如如果你想排除某个特定文件就很难。所以FileSystem的listStatus()和globStatus()方法就提供了一个可选参数:PathFilter——它允许你一些更细化的控制匹配:


package org.apache.hadoop.fs;

public interface PathFilter 

{

boolean accept(Path path);

}

        Hadoop中的匹配符与Unix中bash相同,如下图所示:

image.png

则对应于该组织结构有如下图表示:

image.png


上面我们已经掌握了文件模式如何使用,现在让我们一起开始项目吧,小讲已经迫不及待。


项目介绍

我们利用通配符和PathFilter 对象,将本地多种格式的文件上传至 HDFS文件系统,并过滤掉 txt文本格式以外的文件。


项目数据

如下图所示:

image.png


项目分析

        基于项目的需求,我们通过以下两步完成:


        1、首先使用globStatus(Path pathPattern, PathFilter filter),完成文件格式过滤,获取所有 txt 格式的文件。


        2、然后使用 Java API 接口 copyFromLocalFile,将所有 txt 格式的文件上传至 HDFS。


项目程序

猛戳这里进行完整代码下载 核心代码如下图所示


        首先定义一个类 RegexAcceptPathFilter实现 PathFilter,过滤掉 txt 文本格式以外的文件。


// 过滤文件

public static class RegexAcceptPathFilter implements PathFilter {

private final String regex;

public RegexAcceptPathFilter(String regex) {

this.regex = regex;

}

@Override

public boolean accept(Path path) {

// TODO Auto-generated method stub

boolean flag = path.toString().matches(regex);

return flag;

}

}

        如果要接收 regex 格式的文件,则accept()方法就return flag; 如果想要过滤掉regex格式的文件,则accept()方法就return !flag。

        接下来在 list 方法中,使用 globStatus 方法获取所有 txt 文件,然后通过 copyFromLocalFile 方法将文件上传至 HDFS。


/**

* 过滤文件格式   将多个文件上传至 HDFS

* @param dstPath 目的路径

* @throws IOException

* @throws URISyntaxException

*/

public static void list(String srcPath,String dstPath) throws IOException, URISyntaxException {

//读取配置文件

Configuration conf = new Configuration(); 


//获取默认文件系统  在Hadoop 环境下运行,也可以使用此种方法获取文件系统

//fs = FileSystem.get(conf);

//获取指定文件系统  本地环境运行,需要使用此种方法获取文件系统

URI uri = new URI("hdfs://xxx:9000");//HDFS 地址

fs = FileSystem.get(uri,conf);

// 获取本地文件系统

local = FileSystem.getLocal(conf);

//获取文件目录

FileStatus[] localStatus = local.globStatus(new Path(srcPath),new RegexAcceptPathFilter("^.*txt$"));

//获取所有文件路径

Path[] listedPaths = FileUtil.stat2Paths(localStatus);

//输出路径

Path out = new Path(dstPath);

//循环所有文件

        for(Path p:listedPaths){

        //将本地文件上传到HDFS

        fs.copyFromLocalFile(p, out);

        }

}

        在 main() 方法在调用 list,执行多文件上传至 HDFS。


package com.dajiangtai.hadoop.test2;

import java.io.IOException;

import java.net.URI;

import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.FileUtil;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.fs.PathFilter;

/**

 * @function 将指定格式的多个文件上传至 HDFS

 * @author cs

 *

 */public class CopyManyFilesToHDFS {

private static FileSystem fs = null;

private static FileSystem local = null;


/**

* @function Main 方法

* @param args

* @throws IOException

* @throws URISyntaxException

*/

public static void main(String[] args) throws IOException,URISyntaxException {

//文件上传路径

Path dstPath = new Path("hdfs://xxx:9000/middle/filter/");

//调用文件上传 list 方法

list(dstPath);

}


/**

* function 过滤文件格式   将多个文件上传至 HDFS

* @param dstPath 目的路径

* @throws IOException

* @throws URISyntaxException

*/

public static void list(Path dstPath) throws IOException, URISyntaxException {

//读取hadoop文件系统的配置

Configuration conf = new Configuration();

//HDFS 接口

URI uri = new URI("hdfs://xxx:9000");

//获取文件系统对象

fs = FileSystem.get(uri, conf);

// 获得本地文件系统

local = FileSystem.getLocal(conf);

//只上传data/testdata 目录下 txt 格式的文件

FileStatus[] localStatus = local.globStatus(new Path("E:/DHDemo/data/*"),new RegexAcceptPathFilter("^.*txt$"));

// 获得所有文件路径

Path[] listedPaths = FileUtil.stat2Paths(localStatus);

        for(Path p:listedPaths){

        //将本地文件上传到HDFS

        fs.copyFromLocalFile(p, dstPath);

        }

}

}


项目运行

1、保证Hadoop集群正常启动

2、保证数据集下载到本地目录下(如上面所述)

3、将代码拷贝到eclipse工具对应的包中

image.png

4、修改程序路径代码和自己的一致

image.png


5、在DFS文件系统里创建filter目录。具体操作:


(1)鼠标在middle目录上右键(因为代码中的路径是"hdfs://xxx:9000/middle/filter/")

(2)点击Cerate new directory选项,输入filter

(3)右键middle,点击Refresh即可看到filter目录

image.png

6、运行程序


运行结果

右键filter,点击Refresh即可


你可以在filter目录下看到如下结果

image.png


本地文件已经上传至 HDFS,希望大家动手试一试。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。