hadoop壓縮與解壓
1 壓縮
一 般來說,計算機處理的數據都存在一些冗余度,同時數據中間,尤其是相鄰數據間存在著相關性,所以可以通過一些有別于原始編碼的特殊編碼方式來保存數據, 使數據占用的存儲空間比較小,這個過程一般叫壓縮。和壓縮對應的概念是解壓縮,就是將被壓縮的數據從特殊編碼方式還原為原始數據的過程。
壓縮廣泛應用于海量數據處理中,對數據文件進行壓縮,可以有效減少存儲文件所需的空間,并加快數據在網絡上或者到磁盤上的傳輸速度。在Hadoop中,壓縮應用于文件存儲、Map階段到Reduce階段的數據交換(需要打開相關的選項)等情景。
數 據壓縮的方式非常多,不同特點的數據有不同的數據壓縮方式:如對聲音和圖像等特殊數據的壓縮,就可以采用有損的壓縮方法,允許壓縮過程中損失一定的信 息,換取比較大的壓縮比;而對音樂數據的壓縮,由于數據有自己比較特殊的編碼方式,因此也可以采用一些針對這些特殊編碼的專用數據壓縮算法。
2 Hadoop壓縮簡介
Hadoop作為一個較通用的海量數據處理平臺,在使用壓縮方式方面,主要考慮壓縮速度和壓縮文件的可分割性。
所 有的壓縮算法都會考慮時間和空間的權衡,更快的壓縮和解壓縮速度通常會耗費更多的空間(壓縮比較低)。例如,通過gzip命令壓縮數據時,用戶可以設置 不同的選項來選擇速度優先或空間優先,選項–1表示優先考慮速度,選項–9表示空間最優,可以獲得最大的壓縮比。需要注意的是,有些壓縮算法的壓縮和解壓 縮速度會有比較大的差別:gzip和zip是通用的壓縮工具,在時間/空間處理上相對平衡,gzip2壓縮比gzip和zip更有效,但速度較慢,而且 bzip2的解壓縮速度快于它的壓縮速度。
當 使用MapReduce處理壓縮文件時,需要考慮壓縮文件的可分割性。考慮我們需要對保持在HDFS上的一個大小為1GB的文本文件進行處理,當前 HDFS的數據塊大小為64MB的情況下,該文件被存儲為16塊,對應的MapReduce作業將會將該文件分為16個輸入分片,提供給16個獨立的 Map任務進行處理。但如果該文件是一個gzip格式的壓縮文件(大小不變),這時,MapReduce作業不能夠將該文件分為16個分片,因為不可能從 gzip數據流中的某個點開始,進行數據解壓。但是,如果該文件是一個bzip2格式的壓縮文件,那么,MapReduce作業可以通過bzip2格式壓 縮文件中的塊,將輸入劃分為若干輸入分片,并從塊開始處開始解壓縮數據。bzip2格式壓縮文件中,塊與塊間提供了一個48位的同步標記,因 此,bzip2支持數據分割。
表3-2列出了一些可以用于Hadoop的常見壓縮格式以及特性。
表3-2 Hadoop支持的壓縮格式

為了支持多種壓縮解壓縮算法,Hadoop引入了編碼/解碼器。與Hadoop序列化框架類似,編碼/解碼器也是使用抽象工廠的設計模式。目前,Hadoop支持的編碼/解碼器如表3-3所示。
表3-3 壓縮算法及其編碼/解碼器

同一個壓縮方法對應的壓縮、解壓縮相關工具,都可以通過相應的編碼/解碼器獲得。
3 Hadoop壓縮API應用實例import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
public class CodecTest {
public static void main(String[] args) throws Exception {
compress("org.apache.hadoop.io.compress.BZip2Codec");
// compress("org.apache.hadoop.io.compress.GzipCodec");
// compress("org.apache.hadoop.io.compress.Lz4Codec");
// compress("org.apache.hadoop.io.compress.SnappyCodec");
// uncompress("text");
// uncompress1("hdfs://master:9000/user/hadoop/text.gz");
}
// 壓縮文件
public static void compress(String codecClassName) throws Exception {
Class<?> codecClass = Class.forName(codecClassName);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
//輸入和輸出均為hdfs路徑
FSDataInputStream in = fs.open(new Path("/test.log"));
FSDataOutputStream outputStream = fs.create(new Path("/test1.bz2"));
System.out.println("compress start !");
// 創建壓縮輸出流
CompressionOutputStream out = codec.createOutputStream(outputStream);
IOUtils.copyBytes(in, out, conf);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
System.out.println("compress ok !");
}
// 解壓縮
public static void uncompress(String fileName) throws Exception {
Class<?> codecClass = Class
.forName("org.apache.hadoop.io.compress.GzipCodec");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
CompressionCodec codec = (CompressionCodec) ReflectionUtils
.newInstance(codecClass, conf);
FSDataInputStream inputStream = fs
.open(new Path("/user/hadoop/text.gz"));
// 把text文件里到數據解壓,然后輸出到控制臺
InputStream in = codec.createInputStream(inputStream);
IOUtils.copyBytes(in, System.out, conf);
IOUtils.closeStream(in);
}
// 使用文件擴展名來推斷二來的codec來對文件進行解壓縮
public static void uncompress1(String uri) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.out.println("no codec found for " + uri);
System.exit(1);
}
String outputUri = CompressionCodecFactory.removeSuffix(uri,
codec.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
}
}
}
本文由用戶
jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!