hadoop壓縮與解壓

jopen 10年前發布 | 55K 次閱讀 分布式/云計算/大數據 Hadoop

1 壓縮

一 般來說,計算機處理的數據都存在一些冗余度,同時數據中間,尤其是相鄰數據間存在著相關性,所以可以通過一些有別于原始編碼的特殊編碼方式來保存數據, 使數據占用的存儲空間比較小,這個過程一般叫壓縮。和壓縮對應的概念是解壓縮,就是將被壓縮的數據從特殊編碼方式還原為原始數據的過程。

壓縮廣泛應用于海量數據處理中,對數據文件進行壓縮,可以有效減少存儲文件所需的空間,并加快數據在網絡上或者到磁盤上的傳輸速度。在Hadoop中,壓縮應用于文件存儲、Map階段到Reduce階段的數據交換(需要打開相關的選項)等情景。

數 據壓縮的方式非常多,不同特點的數據有不同的數據壓縮方式:如對聲音和圖像等特殊數據的壓縮,就可以采用有損的壓縮方法,允許壓縮過程中損失一定的信 息,換取比較大的壓縮比;而對音樂數據的壓縮,由于數據有自己比較特殊的編碼方式,因此也可以采用一些針對這些特殊編碼的專用數據壓縮算法。

2 Hadoop壓縮簡介

Hadoop作為一個較通用的海量數據處理平臺,在使用壓縮方式方面,主要考慮壓縮速度和壓縮文件的可分割性。

所 有的壓縮算法都會考慮時間和空間的權衡,更快的壓縮和解壓縮速度通常會耗費更多的空間(壓縮比較低)。例如,通過gzip命令壓縮數據時,用戶可以設置 不同的選項來選擇速度優先或空間優先,選項–1表示優先考慮速度,選項–9表示空間最優,可以獲得最大的壓縮比。需要注意的是,有些壓縮算法的壓縮和解壓 縮速度會有比較大的差別:gzip和zip是通用的壓縮工具,在時間/空間處理上相對平衡,gzip2壓縮比gzip和zip更有效,但速度較慢,而且 bzip2的解壓縮速度快于它的壓縮速度。

當 使用MapReduce處理壓縮文件時,需要考慮壓縮文件的可分割性。考慮我們需要對保持在HDFS上的一個大小為1GB的文本文件進行處理,當前 HDFS的數據塊大小為64MB的情況下,該文件被存儲為16塊,對應的MapReduce作業將會將該文件分為16個輸入分片,提供給16個獨立的 Map任務進行處理。但如果該文件是一個gzip格式的壓縮文件(大小不變),這時,MapReduce作業不能夠將該文件分為16個分片,因為不可能從 gzip數據流中的某個點開始,進行數據解壓。但是,如果該文件是一個bzip2格式的壓縮文件,那么,MapReduce作業可以通過bzip2格式壓 縮文件中的塊,將輸入劃分為若干輸入分片,并從塊開始處開始解壓縮數據。bzip2格式壓縮文件中,塊與塊間提供了一個48位的同步標記,因 此,bzip2支持數據分割。

表3-2列出了一些可以用于Hadoop的常見壓縮格式以及特性。

表3-2    Hadoop支持的壓縮格式

hadoop壓縮與解壓

為了支持多種壓縮解壓縮算法,Hadoop引入了編碼/解碼器。與Hadoop序列化框架類似,編碼/解碼器也是使用抽象工廠的設計模式。目前,Hadoop支持的編碼/解碼器如表3-3所示。

表3-3    壓縮算法及其編碼/解碼器

hadoop壓縮與解壓

同一個壓縮方法對應的壓縮、解壓縮相關工具,都可以通過相應的編碼/解碼器獲得。

3 Hadoop壓縮API應用實例
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class CodecTest {
    public static void main(String[] args) throws Exception {
        compress("org.apache.hadoop.io.compress.BZip2Codec");
//        compress("org.apache.hadoop.io.compress.GzipCodec");
//        compress("org.apache.hadoop.io.compress.Lz4Codec");
//        compress("org.apache.hadoop.io.compress.SnappyCodec");
        // uncompress("text");
        // uncompress1("hdfs://master:9000/user/hadoop/text.gz");
    }

    // 壓縮文件
    public static void compress(String codecClassName) throws Exception {
        Class<?> codecClass = Class.forName(codecClassName);
        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(conf);
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
        //輸入和輸出均為hdfs路徑
        FSDataInputStream in = fs.open(new Path("/test.log"));
        FSDataOutputStream outputStream = fs.create(new Path("/test1.bz2"));

        System.out.println("compress start !");

        // 創建壓縮輸出流
        CompressionOutputStream out = codec.createOutputStream(outputStream);
        IOUtils.copyBytes(in, out, conf);
        IOUtils.closeStream(in);
        IOUtils.closeStream(out);
        System.out.println("compress ok !");
    }

    // 解壓縮
    public static void uncompress(String fileName) throws Exception {
        Class<?> codecClass = Class
                .forName("org.apache.hadoop.io.compress.GzipCodec");
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        CompressionCodec codec = (CompressionCodec) ReflectionUtils
                .newInstance(codecClass, conf);
        FSDataInputStream inputStream = fs
                .open(new Path("/user/hadoop/text.gz"));
        // 把text文件里到數據解壓,然后輸出到控制臺
        InputStream in = codec.createInputStream(inputStream);
        IOUtils.copyBytes(in, System.out, conf);
        IOUtils.closeStream(in);
    }

    // 使用文件擴展名來推斷二來的codec來對文件進行解壓縮
    public static void uncompress1(String uri) throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);

        Path inputPath = new Path(uri);
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        CompressionCodec codec = factory.getCodec(inputPath);
        if (codec == null) {
            System.out.println("no codec found for " + uri);
            System.exit(1);
        }
        String outputUri = CompressionCodecFactory.removeSuffix(uri,
                codec.getDefaultExtension());
        InputStream in = null;
        OutputStream out = null;
        try {
            in = codec.createInputStream(fs.open(inputPath));
            out = fs.create(new Path(outputUri));
            IOUtils.copyBytes(in, out, conf);
        } finally {
            IOUtils.closeStream(out);
            IOUtils.closeStream(in);
        }
    }

}

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!