2015年2月12日 星期四

HDFS SequenceFile



SequenceFile是Hadoop API提供的一種二進制儲存文件,可以將它看成是一個大容器,裡面包含許多小文件,只是它像資料結構中的鏈結串列將小文件串接,而每個節點都是一個key/value的形式,通常key為fileName,value為文件內容,另外sequenceFile也支持壓縮,壓縮有利於節省空間及傳輸時間。
一、由於SequenceFile並沒有建立小文件對應到大文件的索引,所以在查詢小文件時就必須進行遍歷,降低讀取效率。
二、具有壓縮功能,包含三種壓縮方式,可以透過API也可以透過Config來設定
     (一)NONE:無壓縮
     (二)RECORD:僅壓縮value部分
     (三)BLOCK:key/value皆壓縮

SequenceFile.setDefaultCompressionType(conf, SequenceFile.CompressionType.BLOCK);//JAVA API
conf.set("io.seqfile.compression.type", "BLOCK");//Config

三、使用SequenceFile來實際撰寫「某一目錄下所有檔案」以seq方式上傳到HDFS。
     (一)首先撰寫一個SequenceFileOperation的類,並包含建構子,設定HDFS的路徑

public class SequenceFileOperation {
    private Configuration conf = new Configuration();
    public SequenceFileOperation(String HDFS_PATH){
        conf.set("fs.default.name",HDFS_PATH);
    }

}

      (二)寫一個writeTo方法,使用輸入串流將數據導到應用程序中,在使用Sequence.Writer將key、value以append方式追加,其中key為字串型態也就是Hadoop中的Text,而value為BytesWritable

public void writeTo(String srcPath, SequenceFile.Writer writer, Text key, BytesWritable value){
    InputStream in = null;
    try {
        in = new BufferedInputStream(new FileInputStream(srcPath));
        String fileName = srcPath.substring(srcPath.lastIndexOf("\\") + 1);
        key.set(fileName);
        int len = 0;
        byte[] buff = new byte[1024];
        while ((len = in.read(buff))!= -1) {
            value.set(buff, 0, len);
            writer.append(key, value);//將每筆資訊追加到SequenceFile.Writer的尾端
        }
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }finally {
        IOUtils.closeStream(in);
    }
}

在這裡並沒有一次寫入而是每次都寫1024個bytes,這樣的話讀取會較無效率,故可以改良成一次寫入全部。

byte[] buff = new byte[in.available()];

      (三)撰寫uploadTo方法將指定目錄下所有檔案以打包形式上傳到HDFS,writer的部分使用SequenceFile.Writer

public void uploadTo(String srcDir,String desc){
    SequenceFile.Writer writer = null;
    try {
        FileSystem fileSystem = FileSystem.get(conf);
        SequenceFile.setDefaultCompressionType(conf, SequenceFile.CompressionType.BLOCK);
        Text key = new Text();
        BytesWritable value = new BytesWritable();
        writer = SequenceFile.createWriter(fileSystem,conf,new Path(desc),key.getClass(),value.getClass());
        File folder = new File(srcDir);
        String[] list = folder.list();
        for (int i = 0; i < list.length; i++) {
            String filePath = srcDir + "\\" + list[i];
            writeTo(filePath, writer, key, value);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }finally {
        IOUtils.closeStream(writer);
    }
}

      (四)以SequenceFile.Reader來下載,下載的部分給予該檔案位於哪個.seq檔,及檔案名稱來進行循序搜尋,因為在上傳時是以檔案名稱當作key故需給予檔案名稱。

public void downloadFromSeq(String src, String desc, String fileName){
    SequenceFile.Reader reader = null;
    OutputStream out = null;
    try {
        FileSystem fileSystem = FileSystem.get(conf);
        reader = new SequenceFile.Reader(fileSystem,new Path(src),conf);
        Text key = new Text();
        Text compareKey = new Text(fileName);
        BytesWritable value = new BytesWritable();
        out = new BufferedOutputStream(new FileOutputStream(desc));
        while (reader.next(key,value)){
            if (!key.equals(compareKey)) continue;
            out.write(value.getBytes(),0,value.getLength());
        }
    } catch (IOException e) {
        e.printStackTrace();
    }finally {
        IOUtils.closeStream(out);
        IOUtils.closeStream(reader);
    }
}

      (五)最後實際來使用上傳及下載的方法

public static void main(String[] args) {
        final String HDFS_PATH = "hdfs://192.168.121.130:9000";
        SequenceFileOperation sequenceFileOperation = new SequenceFileOperation(HDFS_PATH);
        sequenceFileOperation.uploadTo("C:\\Users\\hduser\\Downloads\\myTestFiles","/testFile/test.seq");
//      /testFile includes...
//        amb.exe
//        1.pdf
//        2.pdf
//        3.pdf
//        4.pdf
//        test.txt        
        sequenceFileOperation.downloadFromSeq("/testFile/test.seq","C:\\Users\\hduser\\Downloads\\1.pdf","1.pdf");
    }

沒有留言:

張貼留言