《Hadoop权威指南:大数据的存储与分析》—3.5.2 通过FileSystem API读取数据

举报
清华大学出版社 发表于 2019/10/12 13:55:59 2019/10/12
【摘要】 本节书摘来自清华大学出版社《Hadoop权威指南:大数据的存储与分析》一书中第三章,第3.5.2节,作者是Tom White , 王 海 华 东 刘 喻 吕粤海 译。

3.5.2  通过FileSystem API读取数据

正如前一小节所解释的,有时根本不可能在应用中设置URLStreamHandlerFactory实例。在这种情况下,我们需要用FileSystem API来打开一个文件的输入流。

Hadoop文件系统中通过Hadoop Path对象(而非java.io.File对象,因为它的语义与本地文件系统联系太紧密)来代表文件。可以将路径视为一个Hadoop文件系统URI,如hdfs://localhost/user/tom/quangle.txt

FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS。获取FileSystem实例有下面这几个静态工厂方法:

public static FileSystem get(Configuration conf) throws IOException 

Public static FileSystem get(URI uri, Configuration conf) throws IOException

public static FileSystem get(URI uri, Configuration conf, String user)

  throws IOException

 

Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(etc/hadoop/core-site.xml)。第一个方法返回的是默认文件系统(core-site.xml中指定的,如果没有指定,则使用默认的本地文件系统)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统。第三,作为给定用户来访问文件系统,对安全来说是至关重要。详情可以参见10.4节。

在某些情况下,你可能希望获取本地文件系统的运行实例,此时你可以使用的getLocal()方法很方便地获取。

public static LocalFileSystem getLocal(Configuration conf) throws IOException

 

有了FileSystem实例之后,我们调用open()函数来获取文件的输入流:

Public FSDataInputStream open(Path f) throws IOException 

Public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

 

第一个方法使用默认的缓冲区大小4 KB

最后,我们重写范例3-1,得到范例3-2

范例3-2. 直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件

public class FileSystemCat { 

  public static void main(String[] args) throws Exception { 

    String uri = args[0]; 

    Configuration conf = new Configuration(); 

    FileSystem fs = FileSystem.get(URI.create(uri), conf); 

    InputStream in = null; 

    try { 

      in = fs.open(new Path(uri)); 

      IOUtils.copyBytes(in, System.out, 4096, false); 

    } finally { 

      IOUtils.closeStream(in); 

    } 

  } 

}

 

程序运行结果如下:

% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt 

On the top of the Crumpetty Tree 

The Quangle Wangle sat, 

But his face you could not see, 

On account of his Beaver Hat. 

FSDataInputStream对象

实际上,FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象。这个类是继承了java.io.DataInputStream的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据。

package org.apache.hadoop.fs; 

 

public class FSDataInputStream extends DataInputStream 

      implements Seekable, PositionedReadable { 

      // implementation elided 

 }

Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos())的查询方法:

 public interface Seekable { 

   void seek(long pos) throws IOException; 

   long getPos() throws IOException; 

 }

 

调用seek()来定位大于文件长度的位置会引发IOException异常。与java.io.InputStreamskip()不同,seek()可以移到文件中任意一个绝对位置,skip()则只能相对于当前位置定位到另一个新位置。

范例3-3是对范例3-2的简单扩展,它将一个文件写入标准输出两次:在一次写完之后,定位到文件的起始位置再次以流方式读取该文件并输出。

范例3-3. 使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示两

public class FileSystemDoubleCat { 

 

   public static void main(String[] args) throws Exception { 

     String uri = args[0]; 

     Configuration conf = new Configuration(); 

     FileSystem fs = FileSystem.get(URI.create(uri), conf); 

     FSDataInputStream in = null; 

     try { 

       in = fs.open(new Path(uri)); 

       IOUtils.copyBytes(in, System.out, 4096, false); 

       in.seek(0); // go back to the start of the file 

       IOUtils.copyBytes(in, System.out, 4096, false); 

     } finally { 

       IOUtils.closeStream(in); 

     } 

   } 

 }

 

在一个小文件上运行的结果如下:

% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt 

On the top of the Crumpetty Tree 

The Quangle Wangle sat, 

But his face you could not see, 

On account of his Beaver Hat. 

On the top of the Crumpetty Tree 

The Quangle Wangle sat, 

But his face you could not see, 

On account of his Beaver Hat.

 

FSDataInputStream类也实现了PositionedReadable接口,从一个指定偏移量处读取文件的一部分:

public interface PositionedReadable { 

 

   public int read(long position, byte[] buffer, int offset, int length

         throws IOException; 

 

   public void readFully(long position, byte[] buffer, int offset, int length)
   throws IOException; 

 

   public void readFully(long position, byte[] buffer) throws IOException; 

 }

 

read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏离量offset处。返回值是实际读到的字节数:调用者需要检查这个值,它有可能小于指定的length长度。readFully()方法将指定length长度的字节数数据读取到buffer(或在只接受buffer字节数组的版本中,读取buffer.length长度字节数据),除非已经读到文件末尾,这种情况下将抛出EOFException异常。

所有这些方法会保留文件当前偏移量,并且是线程安全的(FSDataInputStrean并不是为并发访问设计的,因此最好为此新建多个实例),因此它们提供了在读取文件的主体时,访问文件其他部分(可能是元数据)的便利方法。

最后务必牢记,seek()方法是一个相对高开销的操作,需要慎重使用。建议用流数据来构建应用的访问模式(比如使用MapReduce),而非执行大量seek()方法。


【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。