2015年1月3日 星期六

Java HDFS 操作



一、當建置好Hadoop開發環境後就可以開始運用Hadoop所提供的Java API來對HDFS進行操作,透過這個API能夠進行HDFS的創建文件、上傳檔案、下載檔案、刪除檔案等,而以下是文件操作所牽涉到的幾個類:
     (一)Configuration類:主要是將客戶端或者服務器的配置導入並使用甚至動態修改。
     (二)FileSystem類:定義了一個hadoop的文件系統接口,該類是一個抽象類,透過以下靜態工廠方法取得實例。

Configuration conf = new Configuration();
FileSystem filesystem = FileSystem.get(conf);
FileSystem filesystem = FileSystem.get(new URI(...),conf);

二、接著再了解API的類之後就是環境的搭建了,以本身的環境為例:
     (一)使用一台虛擬機,內部再用輕量虛擬化技術Docker,虛擬出兩個Container分別為Slave1、Slave2,而該台虛擬機本身就當成master,並將Master、Slave1、Slave2的IP位址暴露出來,讓我們的本機可以透過JAVA API去操作。
     (二)本機就依照前篇所講的配置Java開發環境:Intellij hadoop hdfs Environment Settings
三、接下來就是透過上面幾個類來實作HDFS的操作,因此首先我們可以設計一個HDFS操作類名為HDFSOperation,內部宣告一個Configuration類的變數conf,並以建構子進行配置,這裡主要以HDFS順利操作為原則,因此只要配置master的路徑即可,讓我們順利跟namenode溝通。

/**
 * HDFS操作實例
 */
public class HDFSOperation {
    private Configuration conf = new Configuration();
    /**
     * 建構子,根據(HDFS_PATH)進行配置
     * @param HDFS_PATH
     */
    public HDFSOperation(String HDFS_PATH){
       conf.set("fs.default.name",HDFS_PATH);
    }
}

四、以下撰寫幾個常用的API的method:

 /**
     * 建立資料夾
     * @param dirPath
     */
    public void createDir(String dirPath){
        try {
            FileSystem filesystem = FileSystem.get(conf);
            filesystem.mkdirs(new Path(dirPath));
            System.out.print("創建資料夾成功");
            filesystem.close();
        } catch (IOException ex) {
            Logger.getLogger(HDFSOperation.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
    /**
     * 取得目錄下檔案狀態,e.g path、size等
     * @param dirPath
     * @return
     */
    public String getFileStatus(String dirPath){
        try {
            FileSystem filesystem = FileSystem.get(conf);
            FileStatus[] list = filesystem.listStatus(new Path(dirPath));
            StringBuilder sbMessages = new StringBuilder();
            for (FileStatus f : list){
                String message = String.format("Path:%s,isFolder:%s,Size:%s",f.getPath(),f.isDirectory(),f.getLen());
                sbMessages.append(message).append("\n");
            }
            filesystem.close();
            return sbMessages.toString();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     *刪除遠端文件或者檔案(path),傳入isDir來做區別
     * @param path
     * @param isDir
     */
    public void delete(String path,boolean isDir){
        try {
            FileSystem filesystem = FileSystem.get(conf);
            filesystem.delete(new Path(path),isDir);
            System.out.print("刪除成功");
            filesystem.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 將本地文件(src)上傳到HDFS服務器指定路徑(desc),其中比較特殊的是OutputStream,要根據FileSystem這個類去取得輸出串流,並透過writeStreamingTo這個方法將串流輸出成檔案,而緩衝區大小為4096bytes。
     * @param src 來源端路徑(本地)
     * @param desc 目的端路徑(HDFS)
     */
    public void uploadFileToHDFS(String src,String desc){
        try {
            FileSystem filesystem = FileSystem.get(conf);
            InputStream in = new BufferedInputStream(new FileInputStream(src));
            OutputStream out = filesystem.create(new Path(desc));
            writeStreamingTo(in,out,4096);
            //------------------using copyFromLocalFile------------------------//
            //filesystem.copyFromLocalFile(new Path(src),new Path(desc));
            System.out.print(String.format("Upload from %s to %s",src,conf.get("fs.default.name") + desc));
            filesystem.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 從HDFS上,下載檔案到本地端,其中比較特殊的是InputStream,要根據FileSystem這個類去取得輸入串流,並透過writeStreamingTo這個方法將串流輸出成檔案,而緩衝區大小為4096bytes。
     * @param src 來源端路徑(HDFS)
     * @param desc 目的端路徑(本地)
     */
    public void downloadFileFromHDFS(String src,String desc){
        try {
            FileSystem filesystem = FileSystem.get(conf);
            OutputStream out = new BufferedOutputStream(new FileOutputStream(desc));
            InputStream in = filesystem.open(new Path(src));
            writeStreamingTo(in,out,4096);
            //------------------using copyFromLocalFile------------------------//
            //filesystem.copyFromLocalFile(new Path(src),new Path(desc));
            System.out.print(String.format("Download from %s to %s",src,conf.get("fs.default.name") + desc));
            filesystem.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 將來源數據利用輸入串流取出到應用程序,再利用輸出串流將檔案寫到目的,其中利用buffersize來控制緩衝區大小,並掌握進度。
     * @param in 輸入串流
     * @param out 輸出串流
     * @param bufferSize 緩衝區大小
     */
    private void writeStreamingTo(InputStream in,OutputStream out,int bufferSize){//待修改
        try {
            byte[] buffer = new byte[bufferSize];
            int len = 0;
            int totalBytes = in.available();
            int writedBytes = 0;
            System.out.println("Start Download.....");
            while ((len = in.read(buffer)) > 0) {
                writedBytes += len;
                out.write(buffer,0,len);
                System.out.println(String.format("%.2f%s",writedBytes / (double)totalBytes * 100,"%"));
            }
            in.close();
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

五、最後實際來操作以上幾個方法:

public static void main(String[] args) {
        final String HDFS_PATH = "hdfs://192.168.121.130:9000" ;
        HDFSOperation hdfs = new HDFSOperation(HDFS_PATH);
        System.out.print(hdfs.getFileStatus("/"));
//        hdfs.createDir("/tmp");
//        hdfs.uploadFileToHDFS("C:\\Users\\test\\Downloads\\hadoop-2.6.0.tar.gz","/tmp/hadoop-2.6.0.tar.gz");
//        hdfs.downloadFileFromHDFS("/tmp/hadoop-2.6.0.tar.gz","C:\\Users\\test\\Downloads\\hadoop-2.6.0.tar.gz");
//        hdfs.delete("/tmp/hadoop-2.6.0.tar.gz",false);
//        hdfs.delete("/tmp",true);
    }

沒有留言:

張貼留言