非死book官方詳解:使用Apache Spark進行大型語言模型訓練
Apache Spark 是用于大規模數據處理的快速和通用引擎,它運行在 Hadoop,Mesos,可以離線或云端運行,具有高速、可擴展等特點。近年來,在 IBM 等大公司和眾多社區貢獻者的推動下,Spark 得到了越來越多的應用。今天,非死book 團隊也展示了他們使用 Apache Spark 進行大型語言模型訓練的方法。
如何處理大規模數據是 非死book 基礎設施團隊面臨的核心問題。隨著軟件技術的發展,我們面臨著越來越高的硬件需求,為了滿足需要,我們必須在開源架構上設計并構建新的系統。
考慮到我們的需求,我們決定使用 Apache Spark,一個快速發展的開源數據處理平臺,它可以自由擴展,支持用戶自定義應用。
幾個月前,我們分享了一個支持 Spark 聲明(SQL)的的例子。在本文中,我們將簡要介紹如何使用 Spark 重新設計一個大型、復雜(100 余級)的管道,而這個管道最初是使用 HQL 在 Hive 上編寫的。在此之中,我們會介紹如何控制數據分布,避免數據偏移,并實現對特定應用程序的優化,以構建高性能及可靠的數據管道。與原來的 HQL 查詢集相比,這種新的基于 Spark 的管道是模塊化的,高度可讀且易于維護的。除了質量提升之外,我們還觀察到它的資源使用和數據登錄時間也有減少。
使用案例:N-gram 語言模型訓練
自然語言處理是涉及計算機和人類語言之間相互作用的人工智能領域。計算機可以對語言進行建模,此類模型可用于檢測和糾正拼寫錯誤。N-gram 語言模型是其中使用最廣泛的語言建模方法。N-gram 通常以 N-x 方式呈現,其中前 N-1 個字作為歷史,基于 N-1 的歷史來預測下一個字。例如,「你能來這里嗎(Can you please come here)」包含 5 個單詞,是一個 5-gram。它的歷史是「你能來嗎(Can you please come)」基于這個歷史,N-gram 語言模型可以計算出單詞「這里。(here.)」的條件概率。
大規模、高階的 N-gram 語言模型(例如 N = 5)已經被證明在許多應用中非常有效,例如自動語音識別和機器翻譯。在 非死book 中,它被用于為上傳到時間線的視頻自動生成字幕,探測可能低質量的地址標簽(如「家,溫暖的家」,「Apt#00,Fake lane,Foo City」)。
用大數據集訓練的語言模型與用較小數據集訓練的語言模型相比,前者通常具有更高的準確性。覆蓋罕見單詞(或 N-gram)充分實例的可能性會隨著數據集體量的增大而增加。對于具有較大數據集的訓練任務,分布式計算框架(如 MapReduce)通常具有更好的可擴展性,可進行并行化模型訓練。
早期解決方案
我們最初開發了一個基于 Hive 的解決方案來生成 N-gram 語言模型。N-gram 計數由最后兩個字的歷史記錄分割,使用基于 C ++的 TRANSFORM 來判斷局部語言模型,并將它們保存在 Hive 中。單獨的子模型建立在不同的數據源上,每個都由 Hive 查詢觸發。隨后,每個子模型被插值算法計算權重,最后所有子模型被組合輸出。以下是管道的概述:
基于 Hive 的解決方案在構建語言模型中獲得了一定程度的成功:當使用幾百萬 N-gram 訓練時,我們能用它輕松地構建 5-gram 語言模型。然而一旦我們試圖增加訓練數據集的大小,運行管道的端到端時間就會達到不可接受的程度。
Hive 提供了一個基于 SQL 的引擎,可以輕松地編寫查詢,這些查詢會自動轉換為 MapReduce 作業。對于訓練語言模型而言,將計算表示為 SQL 查詢是不自然的,原因如下:
-
管道代碼,包括每個子模型訓練的幾個 SQL 查詢。這些查詢大部分是相似的,只有細微的差別。為模型訓練而編寫新的管道會導致這些 SQL 查詢重復。
-
當越來越多的子句被添加到查詢中時,系統會越來越難以理解查詢的意圖。
-
更改查詢的一部分需要重新運行整個管道,以確保不會導致回歸。無法測試隔離變化使得開發周期變長。
作為替代方法,編寫 Hadoop 作業在表達計算方面為開發人員提供了更多的自由,但這也需要更多的時間,需要我們具有 Hadoop 的專業知識。
基于 Spark 的解決方案
Spark 自帶特定領域語言(DSL),使得編寫自定義應用程序比 SQL 查詢作業更加容易。通過 DSL,你可以控制較低級別的操作(例如,當數據被洗牌時),并且可以訪問中間數據。這有助于實現復雜的算法,達到更高的效率和穩定性。它還允許用戶能以模塊化的方式編寫管道,而不是使用一個單一的 SQL 字符串,這提高了管道的可讀性,可維護性和可測試性。所有這些優勢吸引我們引入了 Spark。
在 Scala 或 Java 中重現 C ++的邏輯——語言模型訓練算法的實現——會是巨量的工作,因此我們決定不更改該部分。和 Hive 一樣,Spark 支持運行自定義用戶代碼,這使得調用相同的 C ++二進制文件變得容易。它允許開發者平滑過渡,因此我們不必同時維護兩個版本的 C ++邏輯,而且遷移對用戶是透明的。我們使用 Spark 提供的 RDD 接口,沒有使用 Spark SQL,因為前者可以控制中間數據的分區并直接管理分片生成。Spark 的 pipe()運算符用于調用二進制文件。
在更高層上,管道的設計保持不變。我們繼續使用 Hive 表作為應用程序的初始輸入和最終輸出。中間輸出被寫入集群節點上的本地硬盤中。整個應用程序大約有 1,000 行的 Scala 代碼,并且可以在 Spark 上執行時生成 100 多個階段(這取決于訓練數據源的數量)。
可擴展性挑戰
當我們使用更大的訓練數據集來測試 Spark 方案時,我們遇到了可擴展性的挑戰。在本節中,我們首先介紹數據分布要求(平滑和分割),然后是它帶來的挑戰和我們的解決方案。
平滑
N-gram 模型是根據訓練數據中的 N-gram 出現計數來估算的。由于在訓練數據中有可能缺少 N-gram,這種方式可能很難推廣到未見的數據中。為了解決這個問題,我們使用了許多平滑方法以減少觀察到的 N-gram 計數以提升未見的 N-gram 概率,并使用較低階模型來讓較高階模型平滑。由于平滑,對于具有歷史 h 的 N-gram,需要具有相同歷史的所有 N-gram 計數和具有作為 h 的后綴的歷史的所有較低級 N-gram 來估算其概率。例如,對于三元組「how are you,」,其中「how are」是歷史,「you」是要預測的詞,為了估計 P(you|how are),我們需要「how are*」,「are*」和所有 unigram(單字 N-gram)的計數,其中*是表示詞匯表中任何單詞的通配符。經常會出現 N-gram(例如,「how are*」)導致處理時的數據發生偏移。
分片
通過分布式計算框架,我們可以將 N-gram 計數分割成多片,以便由多個并行機器進行處理。基于 N-gram 歷史的最后 k 個單詞的分片方式可以保證比 k 更長的 N-gram 在所有片段之間被平衡。這需要在所有分片上共享所有長度為 k 的 N-gram 計數。我們把所有這些短 N-gram 放在一個叫做「0-shard」的特殊分片中。例如,如果 k 是 2,那么從訓練數據中提取的所有單字母和雙字母會被組合在同一個分片(0- shard)中,并且所有進行模型訓練的服務器都可以訪問。
問題:數據扭曲(Data skew)
在基于 Hive 的管道中,我們使用兩個單詞的歷史分片(two word history sharding) 方式進行模型訓練。兩詞歷史分片意味著,共享相同集合的最高有效兩詞歷史(最靠近正被預測的詞)的所有 N-gram 計數會被分布到同一節點用于處理。與單字歷史相比,兩字分片通常具有更平衡的數據分布,除了所有節點必須共享存儲在 0-shard 中的平滑算法所需的單字和雙字統計。下圖說明了具有單字和兩字歷史的分片分布之間的比較。
對于大型數據集而言,兩字歷史分割會生成巨大的 0-shard。必須向所有節點散布 0-shard 以縮短總計算時間。同時,這種情況還存在潛在的不穩定性,因為很難預測它的內存需求,一旦啟動作業,它可能在運行中耗盡內存。雖然我們可以提前分配更多內存,但仍然不能保證 100%的穩定性,而且這會導致集群內存利用率降低,因為并不是所有實例都需要比歷史均值更多的內存。
當我們嘗試使用 Spark 后,作業可以在低負載狀況下運行。但是對于更大的數據集,我們觀察到了以下幾個問題:
-
由于執行器長時間沒有接收到 heartbeat,驅動程序將執行器標記為「lost」
-
執行器 OOM
-
頻繁的執行器 GC
-
隨機服務 OOM
-
Spark 的 block 存在 2GB 的限制
所有這些問題的根本原因可以歸結于數據扭曲。我們想要實現分片的均衡分布,但是兩詞的歷史分片和單詞歷史分片都不能帶來均衡。因此,我們提出了一種混合方法:漸進式分片和動態調整分片大小。
解決方案:漸進式分片(Progressive sharding)
漸進式分片用迭代的方法來解決數據扭曲(skew)問題。在第一次迭代時,我們首先進行單個字的分片,在這一步的分片中只需要對所有分片(shard)的一元語言模型計數進行分割。一元語言模型計數遠少于二元語言模型計數。通常情況下這種處理是可以完成預期作用的,但不包含分片極其大的情況。例如,對應于「how to ...」的分片將會被扭曲。為了解決這個問題,我們核查每個分片的尺寸然后僅處理小于某一閾值的分片。
在第二次迭代時,我們使用二個字的分割,即根據二個字的歷史來完成對 N-gram 的分布。在這個階段,我們只需要向二元語言模型(不包括已在第一次迭代中處理的二元語言模型)共享 N-gram 的計數。這些二元語言模型計數的數目遠少于整個的二元語言模型計數數目,因此處理起來也更快。正如上面所說,我們依然核查每個分片的尺寸然后僅處理小于某一閾值的分片。所剩下的分片將會在下一次的迭代中通過三個字歷史來處理。在大多數情況下,三次迭代已足以滿足非常大數據集的需要。
動態調整分片尺寸
在第一次迭代里,我們用了一個足夠大的預設數,從而使得大部分的生成分片尺寸很小。每一個分片是由單個 Spark 所完成。在這次迭代中 0-shard 的分片是非常小的尺寸,有很多小分片并不會影響處理效率。在后面的迭代中,分片的數目將由 N—gram 的未處理部分所自動產生。
這些方案能夠成功實現歸功于 Spark DSL 的靈活性。通過 Hive,開發者們不需要支配這些低級別的運算。
針對訓練模型的通用庫
根據不同應用環境,怎樣使用語言模型呢?每一種應用可能需要不同的數據和配置,因此不同的管道也應運而生。在 Hive 解決方案中,管道的 SQL 部分應用之間是相似的,但在幾個地方有不同的單元。相較于重復每一個管道的代碼,我們開發了一種可以調用不同管道和不同數據源及配置的普適的 Spark 應用。
基于 Spark 解決問題時,我們也可以自動地在輸入配置的基礎上,優化應用程序運行的工作流程中的步驟。比如說,如果用戶沒有明確指出使用熵修剪算法,那么應用程序將會跳過模型重新評估。如果用戶在配置中明確指定了計數截止,那么應用程序將會瓦解許多低計數的 N-grams 并以通配符占位符來減少存儲。這些優化組合節省了計算資源,同時可以在更短的時間內產生訓練好的模型。
Spark 管道與 Hive 管道性能的比較
我們利用以下性能指標比較 Spark 管道與 Hive 管道:
-
CPU 時間:這是從操作系統的角度衡量 CPU 的使用。比如,如果你在 32 核機上使用所有 CPU 的 50%,以每 10 秒處理一個進程,那么你的 CPU 時間將是 32*0.5*160CPU 秒。
-
CPU 保留時間:這是從資源管理框架的角度衡量 CPU 的保留。舉個例子,如果我們在 32 核機上保留 10 秒來運行一個任務,那么 CPU 的保存時間是 32*10=320CPU 秒。CPU 時間與 CPU 保留時間的比例反映出我們是如何實現在集群上保留 CPU 資源的。當精確度達到要求時,相較于 CPU 時間,在運行相同的工作負載的條件下,保存時間可以作為一個更好的測量尺度去比較引擎的執行。比如,如果一個運程需要 1CPU 秒去運行但是必須保存 100CPU 秒,那么在完成同樣的工作量時,按照這種度量標準,它就比一個需要 10CPU 秒運行且保存 10CPU 秒的運程效率低。
-
延遲/屏蔽時間:從結束到結束的工作時間
以下圖表總結了 Spark 和 Hive 工作的性能比較結果。注意 Spark 的管道不是 Hive 管道 1:1 的轉化。它有許多有助于實現更好的可測量性和執行的定制和優化。
基于 Spark 的管道可以不費力地多次處理輸入數據,甚至輸入數據量高于 Hive 的巔峰處理量。例如,我們訓練一個較大的語言模型,它可以在幾小時內生成一個包含 192 億 N-grams 的語言模型。能夠用更多的數據并更快地運行試驗訓練的能力可以促使產生更高質量的模型。正如我們在我們自己的試驗中觀察到的,大規模語言模型通常會在相關的應用中得到更好的結果。
總結
Spark 的靈活性可以從以下方面為我們提供幫助:
-
用模塊化的方式表達應用邏輯,相較于整體的 SQL 字符串,擁有更強的可讀性和可持續性。
-
在計算的任何階段都可以對數據實現自定義處理(例如,分區,重洗)
-
高性能的計算機引擎可以節省計算資源和試驗時間
-
擁有輸入更大規模數據的擴展能力可以訓練出高質量的語言模型
-
建立一個通用的應用,可以用于在不同的產品上生成語言模型。
-
由于支持運行用戶二進制文件(如 Hive's TRANSFORM)和與Hive數據交互的兼容性,我們可以從早期的解決方案實行改進。
非死book 對加入 Spark 開源社區表示興奮,并將共同協作致力于開發出 Spark 的全部潛能。
來自:http://www.jiqizhixin.com/article/2252