脫離JVM?Hadoop生態圈的掙扎與演化
新世紀以來,互聯網及個人終端的普及,傳統行業的信息化及物聯網的發展等 產業變化產生了大量的數據,遠遠超出了單臺機器能夠處理的范圍,分布式存儲與處理成為唯一的選項。從2005年開始,Hadoop從最初Nutch項目的 一部分,逐步發展成為目前最流行的大數據處理平臺。Hadoop生態圈的各個項目,圍繞著大數據的存儲,計算,分析,展示,安全等各個方面,構建了一個完 整的大數據生態系統,并有Cloudera,HortonWorks,MapR等數十家公司基于開源的Hadoop平臺構建自己的商業模式,可以認為是最 近十年來最成功的開源社區。
Hadoop的成功固然是由于其順應了新世紀以來互聯網技術的發展趨勢, 同時其基于JVM的平臺開發也為Hadoop的快速發展起到了促進作用。Hadoop生態圈的項目大都基于Java,Scala,Clojure等JVM 語言開發,這些語言良好的語法規范,豐富的第三方類庫以及完善的工具支持,為Hadoop這樣的超大型項目提供了基礎支撐。同時,作為在程序員中普及率最 高的語言之一,它也降低了更多程序員使用,或是參與開發Hadoop項目的門檻。同時,基于Scala開發的Spark,甚至因為項目的火熱反過來極大的 促進了Scala語言的推廣。但是隨著Hadoop平臺的逐步發展,Hadoop生態圈的項目之間的競爭加劇,越來越多的Hadoop項目注意到了這些 JVM語言的一些不足之處,希望通過更有效率的處理方式,提升分布式系統的執行效率與健壯性。本文主要以Spark和Flink項目為例,介紹 Hadoop社區觀察到的一些因為JVM語言的不足導致的問題,以及相應的解決方案與未來可能的發展方向。
注:本文假設讀者對Java和Hadoop系統有基本了解。
背景
目前Hadoop生態圈共有MapReduce,Tez,Spark及 Flink等分布式計算引擎,分布式計算引擎項目之間的競爭也相當激烈。MapReduce作為Hadoop平臺的第一個分布式計算引擎,具有非常良好的 可擴展性,Yahoo曾成功的搭建了上萬臺節點的MapReduce系統。但是MapReduce只支持Map和Reduce編程范式,使得復雜數據計算 邏輯需要分割為多個Hadoop Job,而每個Hadoop Job都需要從HDFS讀取數據,并將Job執行結果寫回HDFS,所以會產生大量額外的IO開銷,目前MapReduce正在逐漸被其他三個分布式計算 引擎替代。Tez,Spark和Flink都支持圖結構的分布式計算流,可在同一Job內支持任意復雜邏輯的計算流。Tez的抽象層次較低,用戶不易直接 使用,Spark與Flink都提供了抽象的分布式數據集以及可在數據集上使用的操作符,用戶可以像操作Scala數據集合類似的方式在 Spark/FLink中的操作分布式數據集,非常的容易上手,同時,Spark與Flink都在分布式計算引擎之上,提供了針對SQL,流處理,機器學 習和圖計算等特定數據處理領域的庫。
隨著各個項目的發展與日益成熟,通過改進分布式計算框架本身大幅提高性能 的機會越來越少。同時,在當前數據中心的硬件配置中,采用了越來越多更先進的IO設備,例如SSD存儲,10G甚至是40Gbps網絡,IO帶寬的提升非 常明顯,許多計算密集類型的工作負載的瓶頸已經取決于底層硬件系統的吞吐量,而不是傳統上人們認為的IO帶寬,而CPU和內存的利用效率,則很大程度上決 定了底層硬件系統的吞吐量。所以越來越多的項目將眼光投向了JVM本身,希望通過解決JVM本身帶來的一些問題,提高分布式系統的性能或是健壯性,從而增 強自身的競爭力。
JVM本身作為一個各種類型應用執行的平臺,其對Java對象的管理也是 基于通用的處理策略,其垃圾回收器通過估算Java對象的生命周期對Java對象進行有效率的管理。針對不同類型的應用,用戶可能需要針對該類型應用的特 點,配置針對性的JVM參數更有效率的管理Java對象,從而提高性能。這種JVM調優的黑魔法需要用戶對應用本身以及JVM的各參數有深入的了解,極大 的提高了分布式計算平臺的調優門檻(例如這篇文章中對Spark的調優 Tuning Java Garbage Collection for Spark Applications )。然而類似Spark或是Flink的分布式計算框架,框架本身了解計算邏輯每個步驟的數據傳輸,相比于JVM垃圾回收器,其了解更多的Java對象生 命周期,從而為更有效率的管理Java對象提供了可能。
JVM存在的問題
1. Java對象開銷
相對于c/c++等更加接近底層的語言,Java對象的存儲密度相對偏 低,例如【1】,“abcd”這樣簡單的字符串在UTF-8編碼中需要4個字節存儲,但Java采用UTF-16編碼存儲字符串,需要8個字節存儲 “abcd”,同時Java對象還對象header等其他額外信息,一個4字節字符串對象,在Java中需要48字節的空間來存儲。對于大部分的大數據應 用,內存都是稀缺資源,更有效率的內存存儲,則意味著CPU數據訪問吞吐量更高,以及更少的磁盤落地可能。
2. 對象存儲結構引發的cache miss
為了緩解CPU處理速度與內存訪問速度的差距【2】,現代CPU數據訪問一般都會有多級緩存。當從內存加載數據到緩存時,一般是以cache line為單位加載數據,所以當CPU訪問的數據如果是在內存中連續存儲的話,訪問的效率會非常高。如果CPU要訪問的數據不在當前緩存所有的cache line中,則需要從內存中加載對應的數據,這被稱為一次cache miss。當cache miss非常高的時候,CPU大部分的時間都在等待數據加載,而不是真正的處理數據。Java對象并不是連續的存儲在內存上,同時很多的Java數據結構的數據聚集性也不好,在Spark的性能調優中,經常能夠觀測到大量的cache miss。Java社區有個項目叫做Project Valhalla,可能會部分的解決這個問題,有興趣的可以看看這兒 OpenJDK: Valhalla 。
3. 大數據的垃圾回收
Java的垃圾回收機制,一直讓Java開發者又愛又恨,一方面它免去了 開發者自己回收資源的步驟,提高了開發效率,減少了內存泄漏的可能,另一方面,垃圾回收也是Java應用的一顆不定時炸彈,有時秒級甚至是分鐘級的垃圾回 收極大的影響了Java應用的性能和可用性。在當前的數據中心中,大容量的內存得到了廣泛的應用,甚至出現了單臺機器配置TB內存的情況,同時,大數據分 析通常會遍歷整個源數據集,對數據進行轉換,清洗,處理等步驟。在這個過程中,會產生海量的Java對象,JVM的垃圾回收執行效率對性能有很大影響。通 過JVM參數調優提高垃圾回收效率需要用戶對應用和分布式計算框架以及JVM的各參數有深入的了解,而且有時候這也遠遠不夠。
4. OOM問題
OutOfMemoryError是分布式計算框架經常會遇到的問題,當 JVM中所有對象大小超過分配給JVM的內存大小時,就會fOutOfMemoryError錯誤,JVM崩潰,分布式框架的健壯性和性能都會受到影響。 通過JVM管理內存,同時試圖解決OOM問題的應用,通常都需要檢查Java對象的大小,并在某些存儲Java對象特別多的數據結構中設置閾值進行控制。 但是JVM并沒有提供官方的檢查Java對象大小的工具,第三方的工具類庫可能無法準確通用的確定Java對象的大小【6】。侵入式的閾值檢查也會為分布 式計算框架的實現增加很多額外的業務邏輯無關的代碼。
解決方案
為了解決以上提到的問題,高性能分布式計算框架通常需要以下技術:
1. 定制的序列化工具。顯式內存管理的前提步驟就是序列化,將Java對象序列化成二進制數據存儲在內存上(on heap或是off-heap)。通用的序列化框架,如Java默認的java.io.Serializable將Java對象以及其成員變量的所有元信 息作為其序列化數據的一部分,序列化后的數據包含了所有反序列化所需的信息。這在某些場景中十分必要,但是對于Spark或是Flink這樣的分布式計算 框架來說,這些元數據信息可能是冗余數據。定制的序列化框架,如Hadoop的org.apache.hadoop.io.Writable,需要用戶實 現該接口,并自定義類的序列化和反序列化方法。這種方式效率最高,但需要用戶額外的工作,不夠友好。
2. 顯式的內存管理。一般通用的做法是批量申請和釋放內存,每個JVM實例有一個統一的內存管理器,所有的內存的申請和釋放都通過該內存管理器進行。這可以避免常見的內存碎片問題,同時由于數據以二進制的方式存儲,可以大大減輕垃圾回收的壓力。
3. 緩存友好的數據結構和算法。只將操作相關的數據連續存儲,可以最大化的利用L1/L2/L3緩存,減少Cache miss的概率,提升CPU計算的吞吐量。以排序為例,由于排序的主要操作是對Key進行對比,如果將所有排序數據的Key與Value分開,對Key連續存儲,則訪問Key時的Cache命中率會大大提高。
定制的序列化工具
分布式計算框架可以使用定制序列化工具的前提是要處理的數據流通常是同一 類型,由于數據集對象的類型固定,對于數據集可以只保存一份對象Schema信息,節省大量的存儲空間。同時,對于固定大小的類型,也可通過固定的偏移位 置存取。當我們需要訪問某個對象成員變量的時候,通過定制的序列化工具,并不需要反序列化整個Java對象,而是可以直接通過偏移量,只是反序列化特定的 對象成員變量。如果對象的成員變量較多時,能夠大大減少Java對象的創建開銷,以及內存數據的拷貝大小。Spark與Flink數據集都支持任意 Java或是Scala類型,通過自動生成定制序列化工具,Spark與Flink既保證了API接口對用戶的友好度(不用像Hadoop那樣數據類型需 要繼承實現org.apache.hadoop.io.Writable接口),同時也達到了和Hadoop類似的序列化效率。
Spark的序列化框架
Spark支持通用的計算框架,如Java Serialization和Kryo。其缺點之前也略有論述,總結如下:
占用較多內存。Kryo相對于Java Serialization更高,它支持一種類型到Integer的映射機制,序列化時用Integer代替類型信息,但還不及定制的序列化工具效率。
反序列化時,必須反序列化整個Java對象。
無法直接操作序列化后的二進制數據。
Project Tungsten 提供了一種更好的解決方式,針對于DataFrame API(Spark針對結構化數據的類SQL分析API,參考 Spark DataFrame Blog ),由于其數據集是有固定Schema的Tuple(可大概類比為數據庫中的行),序列化是針對每個Tuple存儲其類型信息以及其成員的類型信息是非常 浪費內存的,對于Spark來說,Tuple類型信息是全局可知的,所以其定制的序列化工具只存儲Tuple的數據,如下圖所示

圖1 Spark off-heap object layout
對于固定大小的成員,如int,long等,其按照偏移量直接內聯存儲。 對于變長的成員,如String,其存儲一個指針,指向真正的數據存儲位置,并在數據存儲開始處存儲其長度。通過這種存儲方式,保證了在反序列化時,當只 需訪問某一個成員時,只需根據偏移量反序列化這個成員,并不需要反序列化整個Tuple。
Project Tungsten的定制序列化工具應用在Sort,HashTable,Shuffle等很多對Spark性能影響最大的地方。比如在Shuffle階 段,定制序列化工具不僅提升了序列化的性能,而且減少了網絡傳輸的數據量,根據DataBricks的Blog介紹,相對于 Kryo,Shuffle800萬復雜Tuple數據時,其性能至少提高2倍以上。此外,Project Tungsten也計劃通過Code generation技術,自動生成序列化代碼,將定制序列化工具推廣到Spark Core層,從而使得更多的Spark應用受惠于此優化。
Flink的序列化框架
Flink在系統設計之初,就借鑒了很多傳統RDBMS的設計,其中之一 就是對數據集的類型信息進行分析,對于特定Schema的數據集的處理過程,進行類似RDBMS執行計劃優化的優化。同時,數據集的類型信息也可以用來設 計定制的序列化工具。和Spark類似,Flink支持任意的Java或是Scala類型,Flink通過Java Reflection框架分析基于Java的Flink程序UDF(User Define Function)的返回類型的類型信息,通過Scala Compiler分析基于Scala的Flink程序UDF的返回類型的類型信息。類型信息由TypeInformation類表示,這個類有諸多具體實 現類,例如(更多詳情參考Flink官方博客 Apache Flink: Juggling with Bits and Bytes ):
1. BasicTypeInfo: 任意Java基本類型(裝包或未裝包)和String類型。
2. BasicArrayTypeInfo: 任意Java基本類型數組(裝包或未裝包)和String數組。
3. WritableTypeInfo: 任意Hadoop’s Writable接口的實現類.
4. TupleTypeInfo: 任意的Flink tuple類型(支持Tuple1 to Tuple25). Flink tuples是固定長度固定類型的Java Tuple實現。
5. CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples).
6. PojoTypeInfo: 任意的POJO (Java or Scala),例如,Java對象的所有成員變量,要么是public修飾符定義,要么有getter/setter方法。
7. GenericTypeInfo: 任意無法匹配之前幾種類型的類。)
前6種類型數據集幾乎覆蓋了絕大部分的Flink程序,針對前6種類型數 據集,Flink皆可以自動生成對應的TypeSerializer定制序列化工具,非常有效率的對數據集進行序列化和反序列化。對于第7中類 型,Flink使用Kryo進行序列化和反序列化。此外,對于可被用作Key的類型,Flink還同時自動生成TypeComparator,用來輔助直 接對序列化后的二進制數據直接進行compare,hash等之類的操作。對于Tuple,CaseClass,Pojo等組合類型,Flink自動生成 的TypeSerializer,TypeComparator同樣是組合的,并把其成員的序列化/反序列化代理給其成員對應的 TypeSerializer,TypeComparator,如下圖所示:

圖2 Flink組合類型序列化
此外,如有需要,用戶可通過集成TypeInformation接口,定制實現自己的序列化工具。
顯式的內存管理
垃圾回收的JVM內存管理回避不了的問題,JDK8的G1算法改善了 JVM垃圾回收的效率和可用范圍,但對于大數據處理的實際環境中,還是遠遠不夠。這也和現在分布式框架的發展趨勢有沖突,越來越多的分布式計算框架希望盡 可能多的將待處理的數據集放在內存中,而對于JVM垃圾回收來說,內存中Java對象越少,存活時間越短,其效率越高。通過JVM進行內存管理的 話,OutOfMemoryError也是一個很難解決的問題。同時,在JVM內存管理中,Java對象有潛在的碎片化存儲問題(Java對象所有信息可 能不是在內存中連續存儲),也有可能在所有Java對象大小沒有超過JVM分配內存時,出現OutOfMemoryError問題。
Flink的內存管理
Flink將內存分為三個部分,每個部分都有不同的用途:
1. Network buffers: 一些以32KB Byte數組為單位的buffer,主要被網絡模塊用于數據的網絡傳輸。
2. Memory Manager pool: 大量以32KB Byte數組為單位的內存池,所有的運行時算法(例如Sort/Shuffle/Join)都從這個內存池申請內存,并將序列化后的數據存儲其中,結束后釋放回內存池。
3. Remaining (Free) Heap: 主要留給UDF中用戶自己創建的Java對象,由JVM管理。
Network buffers在Flink中主要基于Netty的網絡傳輸,無需多講。Remaining Heap用于UDF中用戶自己創建的Java對象,在UDF中,用戶通常是流式的處理數據,并不需要很多內存,同時Flink也不鼓勵用戶在UDF中緩存很多數據,因為這會引起前面提到的諸多問題。Memory Manager pool(以后以內存池代指)通常會配置為最大的一塊內存,接下來會詳細介紹。
在Flink中,內存池由多個MemorySegment組成,每個 MemorySegment代表一塊連續的內存,底層存儲是byte[],默認32KB大小。MemorySegment提供了根據偏移量訪問數據的各種 方法,如get/put int,long,float,double等,MemorySegment之間數據拷貝等方法,和java.nio.ByteBuffer類似。對于 Flink的數據結構,通常包括多個向內存池申請的MemeorySegment,所有要存入的對象,通過TypeSerializer序列化之后,將二 進制數據存儲在MemorySegment中,在取出時,通過TypeSerializer反序列化。數據結構通過MemorySegment提供的 set/get方法訪問具體的二進制數據。
Flink這種看起來比較復雜的內存管理方式帶來的好處主要有:
1. 二進制的數據存儲大大提高了數據存儲密度,節省了存儲空間。
2. 所有的運行時數據結構和算法只能通過內存池申請內存,保證了其使用的內存大小是固定的,不會因為運行時數據結構和算法而發生OOM。而對于大部分的分布式計算框架來說,這部分由于要緩存大量數據,是最有可能導致OOM的地方。
3. 內存池雖然占據了大部分內存,但其中的MemorySegment容量較大(默認32KB),所以內存池中的Java對象其實很少,而且一直被內存池引用,所有在垃圾回收時很快進入持久代,大大減輕了JVM垃圾回收的壓力。
4. Remaining Heap的內存雖然由JVM管理,但是由于其主要用來存儲用戶處理的流式數據,生命周期非常短,速度很快的Minor GC就會全部回收掉,一般不會觸發Full GC。
Flink當前的內存管理在最底層是基于byte[],所以數據最終還是on-heap,最近Flink增加了off-heap的內存管理支持,將會在下一個release中正式出現。Flink off-heap的內存管理相對于on-heap的優點主要在于(更多細節,請參考 Apache Flink: Off-heap Memory in Apache Flink and the curious JIT compiler ):
1. 啟動分配了大內存(例如100G)的JVM很耗費時間,垃圾回收也很慢。如果采用off-heap,剩下的Network buffer和Remaining heap都會很小,垃圾回收也不用考慮MemorySegment中的Java對象了。
2. 更有效率的IO操作。在off-heap下,將MemorySegment寫到磁盤或是網絡,可以支持zeor-copy技術,而on-heap的話,則至少需要一次內存拷貝。
3. off-heap可用于錯誤恢復,比如JVM崩潰,在on-heap時,數據也隨之丟失,但在off-heap下,off-heap的數據可能還在。此外,off-heap上的數據還可以和其他程序共享。
Spark的內存管理
Spark的off-heap內存管理與Flink off-heap模式比較相似,也是通過Java UnSafe API直接訪問off-heap內存,通過定制的序列化工具將序列化后的二進制數據存儲與off-heap上,Spark的數據結構和算法直接訪問和操作在off-heap上的二進制數據。Project Tungsten是一個正在進行中的項目,想了解具體進展可以訪問: [SPARK-7075] Project Tungsten (Spark 1.5 Phase 1) , [SPARK-9697] Project Tungsten (Spark 1.6)。
緩存友好的計算
磁盤IO和網絡IO之前一直被認為是Hadoop系統的瓶頸,但是隨著Spark,Flink等新一代的分布式計算框架的發展,越來越多的趨勢使得CPU/Memory逐漸成為瓶頸,這些趨勢包括:
1. 更先進的IO硬件逐漸普及。10GB網絡和SSD硬盤等已經被越來越多的數據中心使用。
2. 更高效的存儲格式。Parquet,ORC等列式存儲被越來越多的Hadoop項目支持,其非常高效的壓縮性能大大減少了落地存儲的數據量。
3. 更高效的執行計劃。例如Spark DataFrame的執行計劃優化器的Fliter-Push-Down優化會將過濾條件盡可能的提前,甚至提前到Parquet的數據訪問層,使得在很多實際的工作負載中,并不需要很多的磁盤IO。
由于CPU處理速度和內存訪問速度的差距,提升CPU的處理效率的關鍵在 于最大化的利用L1/L2/L3/Memory,減少任何不必要的Cache miss。定制的序列化工具給Spark和Flink提供了可能,通過定制的序列化工具,Spark和Flink訪問的二進制數據本身,因為占用內存較 小,存儲密度比較大,而且還可以在設計數據結構和算法時,盡量連續存儲,減少內存碎片化對Cache命中率的影響,甚至更進一步,Spark與Flink 可以將需要操作的部分數據(如排序時的Key)連續存儲,而將其他部分的數據存儲在其他地方,從而最大可能的提升Cache命中的概率。
Flink中的數據結構
以Flink中的排序為例,排序通常是分布式計算框架中一個非常重的操作,Flink通過特殊設計的排序算法,獲得了非常好了性能,其排序算法的實現如下:
1. 將待排序的數據經過序列化后存儲在兩個不同的MemorySegment集中。數據全部的序列化值存放于其中一個MemorySegment集中。數據序列化后的Key和指向第一個MemorySegment集中其值的指針存放于第二個MemorySegment集中。
2. 對第二個MemorySegment集中的Key進行排序,如需交換Key位置,只需交換對應的Key+Pointer的位置,第一個MemorySegment集中的數據無需改變。 當比較兩個Key大小時,TypeComparator提供了直接基于二進制數據的對比方法,無需反序列化任何數據。
3. 排序完成后,訪問數據時,按照第二個MemorySegment集中Key的順序訪問,并通過Pinter值找到數據在第一個MemorySegment集中的位置,通過TypeSerializer反序列化成Java對象返回。

圖3 Flink排序算法
這樣實現的好處有:
1. 通過Key和Full data分離存儲的方式,盡量將被操作的數據最小化,提高Cache命中的概率,從而提高CPU的吞吐量。
2. 移動數據時,只需移動Key+Pointer,而無須移動數據本身,大大減少了內存拷貝的數據量。
3. TypeComparator直接基于二進制數據進行操作,節省了反序列化的時間。
Spark的數據結構
Spark中基于off-heap的排序與Flink幾乎一模一樣,在這里就不多做介紹了,感興趣的話,請參考: https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
總結
本文主要介紹了Hadoop生態圈的一些項目遇到的一些因為JVM內存管 理導致的問題,以及社區是如何應對的。基本上,以內存為中心的分布式計算框架,大都開始了部分脫離JVM,走上了自己管理內存的路線,Project Tungsten甚至更進一步,提出了通過LLVM,將部分邏輯編譯成本地代碼,從而更加深入的挖掘SIMD等CPU潛力。此外,除了 Spark,Flink這樣的分布式計算框架,HBase(HBASE-11425),HDFS(HDFS-7844)等項目也在部分性能相關的模塊通過 自己管理內存來規避JVM的一些缺陷,同時提升性能。
來源:36大數據 作者:李呈祥