阿里云破紀錄的背后:377秒是如何煉成的?
原文 http://linux.cn/article-6517-1.html
10月28日, Sort Benchmark 官方宣布,阿里云用377秒完成了100TB的數據排序,打破了此前Apache Spark創造的1406秒紀錄。在含金量最高的 GraySort 和 MinuteSort 兩個評測系統中,阿里云分別在通用和專用目的排序類別中創造了4項世界紀錄。
消息一出,整個技術圈都沸騰了,特別是對云計算高度關注的互聯網、計算機行業。阿里云打破世界紀錄,再次點燃了大家對分布式計算的熱情。同時,大數據、云計算的各種圈子里也掀起了討論:這件事情有多難?怎么做到的?對普通人意味著什么等等。
基于這些原因,我們發表此文,希望從阿里云的角度回答大家的疑問。
這件事情有多難?
SortBenchmark的出現,是希望能用最簡單的方法,評估出不同的計算模型,計算平臺的計算能力優劣?而排序是最基礎的計算問題,任何一 本數據結構和算法的計算機教材,首先要講的,就是各種排序算法。所以排序,當之無愧的成為這個簡單,但直接有效的benchmark。
SortBenchmark競賽最早的紀錄追溯到1987年,當時都是單機的比賽。如何造出最強大的機器,如何盡量壓榨單臺機器的性能是大家的主要工作。
但從1998年開始,大家的策略和思路發生了改變,分布式計算開始成為主流。大家的工作重點也轉變為:如何有效調度成百上千乃至幾萬臺機器上的 CPU、內存、網絡、磁盤IO等物理資源,最快完成海量數據的排序。這就像軍隊里,管好幾個人,你可以當班長;管好幾十個人,你可以當排長;但要管好幾萬 人,你才能當將軍。
而且,對大規模集群做線性擴展,遠比大家想象得困難。正如,一個班長說“我只有幾個人,所以我才是班長,但如果你現在給我幾萬人,我馬上就是將軍 了”,大家會覺得好笑一樣。當規模不斷擴大,系統的各種瓶頸都會逐漸出現,原來能處理所有消息,能做出各種調度決定,現在發現忙不過來;如果找出下級代 理,可能又會發現代理做出的決定和處理總不是最好的。
這還只是一種資源的調度,當計算需要多種資源完美配合時,你可能會發現內存是有效調度了,但是會影響網絡的使用;網絡可能用好了,但是又影響了磁盤的有效利用。調度不好時,各個維度可能互相沖突。
當你把資源調度得差不多了,你可能發現其實這個計算任務如果從機器A上換到機器B上運行,時間會短很多。或者機器A本來很適合,但是碰巧機器A壞 了,就像幾千人的軍隊打仗,有人臨陣脫逃很正常。諸如此類的問題,隨著規模的不斷擴大,會急劇復雜化。可以說,規模每增加一個數量級,分布式計算平臺需要 處理問題就會完全不同。而如何利用大量低端機器達到高性能,正是云計算技術的核心挑戰。
阿里云的“飛天”分布式計算平臺于2013年正式上線了5000臺的單集群規模,現在生產線上的規模更大。關于如何支持這么大的規模,可以參考 VLDB 2014上伏羲發表的文章,這不是本文的重點。本文接下來會重點介紹在支持如此大規模計算集群后,我們還做了哪些事情,讓一萬億條記錄,100TB數據的 排序能在不到7分鐘完成。
阿里云如何做到的?
“飛天”是阿里云的分布式計算平臺,不僅承擔著阿里集團內部所有的離線數據處理任務,同時也提供阿里云公共云服務的基礎平臺支撐。“飛天”系統的 關鍵模塊包括:(a)Pangu-分布式文件系統,負責存儲和管理計算中心的數據文件;(b)Fuxi-分布式調度系統,負責管理計算中心的集群資源,調 度分布式系統中運行的在線和離線應用。Fuxi提供了一種名為FuxiJob的大數據批處理框架,能處理任意的基于DAG(有向無環圖)描述的用戶計算任 務。
Fuxi已經部署在了阿里巴巴多個計算中心的數十萬服務器上,單個集群的規模超過5000臺機器。任何可以用DAG描述的離線數據處理作業都可以 用Fuxi Job來執行,包括但不限于MapReduce作業和更加復雜的機器學習作業。Job的輸入輸出文件以及運行過程中的臨時文件都存儲在Pangu中,依賴 Pangu提供的文件副本和locality配置來獲取更好的性能,同時提高數據的可靠性。
接下來我們重點介紹基于“飛天”系統開發的Fuxisort程序。我們在GraySort和MinuteSort兩項比賽中使用相同的程序,程序中的優化將在后續章節中詳細介紹。
概述
首先,程序會對待排序數據進行采樣,以確定數據各分片的范圍。如圖1所示,除了采樣之外,整個數據排序過程分兩大階段:map階段和sort階段。兩個階段都包含多個并行的任務。
圖 1. FuxiSort流程圖
在map階段,map任務通過Pangu的ChunkServer進程從本地磁盤中讀入數據分片,然后對輸入數據進行RangePartition分配給不同的sort,分配后的數據通過網絡直接傳輸給sort任務。
在sort階段,所有的sort任務周期性地將map任務發過來的數據讀入內存,當內存緩沖區滿的時候,進行基于快速排序算法的內存排序,內存排 序的結果數據將會被寫入Pangu的temporary文件(這種文件存放在本地,不會做多份的拷貝)。當sort任務接收完所有的map數據后,會將所 有在內存中排好序的數據以及之前寫入temporary文件中的數據一起做歸并排序,歸并排序的最終結果輸出到Pangu中。當FuxiSort所有的 sort任務都執行完后,會生成多個的Pangu文件,它們在全局也是有序的。
實現和優化
a)輸入數據采樣。為了降低數據傾斜帶來的性能影響,我們對輸入數據做了采樣,根據采樣結果來確定RangePartition的邊界,從而保證每個sort任務處理的數據量盡量接近。
舉例說明,假設輸入數據被分成了X個文件,首先,我們在每個文件里隨機選取Y個位置,從每個位置開始連續讀取Z個數據樣本,最后共得到 X * Y * Z個樣本。然后,我們對這些樣本數據進行排序,排序后樣本數據被均分為S份,這里S為sort任務的個數,這樣就得到每個sort任務待 處理數據的范圍邊界。由于樣本是均分的,可以使得每個sort任務都處理了幾乎相等的數據量。
對于GraySort而言,我們有20000個輸入文件(X),每個輸入文件選取300個位置(Y),每個位置讀取1個樣本(Z),最終我們選取 6000000條樣本進行排序,并均分為20000份(sort任務個數),map任務將根據上述樣本來進行RangePartition,保證 sort任務處理的數據盡量均勻。整個采樣過程大約耗時35秒。對于MinuteSort而言,3350個輸入文件,我們在每個文件里選取900個數據作 為樣本,總的樣本數量為3015000,排序后分成10050份。整個采樣過程耗時4秒。對于IndySort,則不需要這個采樣過程。
b ) IO 雙buffer。 map階段,FuxiSort在一個I/O buffer中處理數據,同時Pangu在另一個buffer中執行數據讀入操作。這兩個buffer的角色會周期性地進行切換,這樣就能保證處理數據操 作和I/O操作能并行起來,從而能夠大幅降低任務的Latency。
圖2. FuxiSort各階段啟動順序
c ) 流水線操作。 如圖2所示,為了進一步降低整體Latency,我們把排序過程的每個階段分解成許多小的步驟,并且盡可能地將這些小的步驟重疊起來執行。這些分解出來的小步驟如下所示:
- 數據采樣;
- Job啟動;
- MapTask讀輸入數據;
- MapTask發送數據至SortTask;
- SortTask接收數據;
- SortTask將內存中的數據進行排序,當內存裝不下時,將排好序的數據dump到臨時文件中;
- SortTask將內存中的有序數據和臨時文件中的有序數據做merge sort;
- SortTask寫最終輸出文件。
FuxiSort將數據采樣過程和Job啟動過程并行起來執行,在Job啟動階段做的主要工作包括任務的分發,以及一些其他的數據管理工作,比如 收集所有SortTask的網絡地址,并且通知所有的MapTask。當數據采樣過程結束時,采樣程序會將每個分區的界限存放在Pangu上,并且會建立 另一個通知文件存放在Pangu上,用來標志采樣結束。一旦任務分發完成,每個MapTask就開始周期性地檢查通知文件是否存在。一旦檢查到通知文件存 在,也就意味著采樣程序產生的各分區界限可用,MapTask就會立刻讀取這些分區界限,并且根據這些界限進行數據分發。
步驟(3)(4)和(5)在map階段并行執行,步驟(7)和(8)在sort階段并行執行。
在步驟(6)中,只有當分配給task的內存已經全部填滿,才會進行排序和dump,由于在排序過程中,內存被全部占用,沒有剩余內存可以接收新 的數據,因此步驟(5)會被阻塞。為了緩解這個問題,我們將步驟(5)和(6)并行起來,一旦內存使用超過一定量值,就開始做排序,這樣,步驟(6)會被 提前執行,而步驟(5)也不會被阻塞。當內存全部占滿時,我們將內存中已經排好序的數據進行歸并,并dump到臨時文件中。顯然,開始做排序的內存閾值越 低,步驟(6)開始得越早。在我們的實驗中,當接收到的數據占用分配給Task內存的1/10時,開始執行步驟(6)。通過這種方法,我們將I/O和計算 并行起來,并且沒有明顯的延遲,雖然這種方法可能會需要merge更多的臨時文件,但在我們的場景中沒有因此導致明顯的overhead。
圖2說明了每一步所花費的時間以及在執行過程中這些步驟之前的重合部分。
d ) 網絡通信優化。 在map task和sort task之前有明顯的網絡通信流量,每個網絡包到達后都會產生CPU中斷。如果對中斷的處理被綁定到一個指定的CPU內核上,當這個CPU內核忙于排序 時,對中斷的處理會被延遲,這就可能導致請求超時,甚至丟包。通過設置”sm_affinity”,可以將網絡中斷產生的負載均衡到所有的CPU內核上, 請求超時和丟包的比率明顯下降。
圖3. 實時計算框架
e ) 對MinuteSort的進一步優化。 由于MinuteSort的執行時間要求限制在60秒內,一般離線作業的調度開銷就變得不容忽視。為了降低這些開銷,我們在Fuxi的準實時Job模型上 執行MinuteSort,Fuxi準實時Job模型是為了降低調度產生的overhead,使內存計算獲得很高的性能而開發的。Figure 3說明了準實時Job模型的框架。在典型的生產環境中,準實時系統是一個長期運行的service,會在集群部署過程中被啟動,并且在每臺機器上啟動一個 不退出的worker進程。系統啟動之后,用戶可以向準實時系統的調度器提交各種job,并且可以獲得job在運行期間的狀態。sort benchmark競賽要求與排序直接相關的啟動和退出過程也需要包含在最終的時間里,為了遵守這一規則,我們在提交MinuteSort job之前,先通過程序去啟動準實時系統worker,在job運行結束后,再將worker進程停掉,在最終提交的結果中,包含了worker啟動和停 止所用的時間。
準實時系統針對的場景是在中等規模大小的數據集(不超過10TB)上,對延遲敏感的數據處理過程,在這種規模的數據集下,包括輸入和輸出在內的所有records都可能被cache在內存中。在我們的實驗中,我們只在準實時系統中運行MinuteSort。
對普通人意味著什么?
從2009年阿里云誕生那天起,我們的愿景就是打造一個自研的、通用的、大規模分布式計算底層系統,讓計算像電一樣成為公共服務,“飛天”平臺是承載這一理念的技術核心。
FuxiSort打破Sort Benchmark排序比賽世界紀錄是阿里云6年技術沉淀的直接體現,是所有技術人的驕傲。
但這僅僅是開始。技術本身不是目的,阿里云在任何技術上的進步,都會通過云產品對外輸出,讓中國乃至全世界的云計算客戶收益。比如本次參賽的 FuxiSort,通過開放數據處理服務(Open Data Processing Service, 簡稱ODPS)對外商用。ODPS是由阿里云自主研發,提供針對TB/PB級數據、實時性要求不高的分布式處理能力,應用于數據分析、挖掘、商業智能等領 域。阿里巴巴的離線數據業務都運行在ODPS上(詳情參考 http://www.aliyun.com/product/odps/ )。
阿里云將借助技術創新,不斷提升計算能力與規模效益,希望更多的合作伙伴、中小企業、開發者能夠受益于云計算帶來的便利和價值,共同將云計算變成真正意義上的公共服務和普惠科技。