分布式發布訂閱消息系統 Kafka 架構設計

javap 9年前發布 | 32K 次閱讀 Kafka 消息系統

我們為什么要搭建該系統

Kafka是一個消息系統,原本開發自LinkedIn,用作LinkedIn的活動流(activity stream)和運營數據處理管道(pipeline)的基礎。現在它已為多家不同類型的公司 作為多種類型的數據管道(data pipeline)和消息系統使用。

活動流數據是所有站點在對其網站使用情況做報表時要用到的數據中最常規的部分。活動數據包括頁面訪問量(page view)、被查看內容方面的信息以及搜索情況等內容。這種數據通常的處理方式是先把各種活動以日志的形式寫入某種文件,然后周期性地對這些文件進行統計 分析。運營數據指的是服務器的性能數據(CPU、IO使用率、請求時間、服務日志等等數據)。運營數據的統計方法種類繁多。

近年來,活動和運營數據處理已經成為了網站軟件產品特性中一個至關重要的組成部分,這就需要一套稍微更加復雜的基礎設施對其提供支持。

活動流和運營數據的若干用例

  • "動態匯總(News feed)"功能。將你朋友的各種活動信息廣播給你
  • 相關性以及排序。通過使用計數評級(count rating)、投票(votes)或者點擊率( click-through)判定一組給定的條目中那一項是最相關的.
  • 安全:網站需要屏蔽行為不端的網絡爬蟲(crawler),對API的使用進行速率限制,探測出擴散垃圾信息的企圖,并支撐其它的行為探測和預防體系,以切斷網站的某些不正常活動。
  • 運營監控:大多數網站都需要某種形式的實時且隨機應變的方式,對網站運行效率進行監控并在有問題出現的情況下能觸發警告。
  • 報表和批處理: 將數據裝載到數據倉庫或者Hadoop系統中進行離線分析,然后針對業務行為做出相應的報表,這種做法很普遍。

活動流數據的特點

這種由不可變(immutable)的活動數據組成的高吞吐量數據流代表了對計算能力的一種真正的挑戰,因其數據量很容易就可能會比網站中位于第二位的數據源的數據量大10到100倍。

傳統的日志文件統計分析對報表和批處理這種離線處理的情況來說,是一種很不錯且很有伸縮性的方法;但是這種方法對于實時處理來說其時延太大,而且還具有較 高的運營復雜度。另一方面,現有的消息隊列系統(messaging and queuing system)卻很適合于在實時或近實時(near-real-time)的情況下使用,但它們對很長的未被處理的消息隊列的處理很不給力,往往并不將數 據持久化作為首要的事情考慮。這樣就會造成一種情況,就是當把大量數據傳送給Hadoop這樣的離線系統后, 這些離線系統每個小時或每天僅能處理掉部分源數據。Kafka的目的就是要成為一個隊列平臺,僅僅使用它就能夠既支持離線又支持在線使用這兩種情況。

Kafka支持非常通用的消息語義(messaging semantics)。盡管我們這篇文章主要是想把它用于活動處理,但并沒有任何限制性條件使得它僅僅適用于此目的。

部署

下面的示意圖所示是在LinkedIn中部署后各系統形成的拓撲結構。

分布式發布訂閱消息系統 Kafka 架構設計

要注意的是,一個單個的Kafka集群系統用于處理來自各種不同來源的所有活動數據。它同時為在線和離線的數據使用者提供了一個單個的數據管道,在線活動 和異步處理之間形成了一個緩沖區層。我們還使用kafka,把所有數據復制(replicate)到另外一個不同的數據中心去做離線處理。

我們并不想讓一個單個的Kafka集群系統跨越多個數據中心,而是想讓Kafka支持多數據中心的數據流拓撲結構。這是通過在集群之間進行鏡像或“同步” 實現的。這個功能非常簡單,鏡像集群只是作為源集群的數據使用者的角色運行。這意味著,一個單個的集群就能夠將來自多個數據中心的數據集中到一個位置。下 面所示是可用于支持批量裝載(batch loads)的多數據中心拓撲結構的一個例子:

分布式發布訂閱消息系統 Kafka 架構設計

請注意,在圖中上面部分的兩個集群之間不存在通信連接,兩者可能大小不同,具有不同數量的節點。下面部分中的這個單個的集群可以鏡像任意數量的源集群。要了解鏡像功能使用方面的更多細節,請訪問這里.

主要的設計元素

Kafka之所以和其它絕大多數信息系統不同,是因為下面這幾個為數不多的比較重要的設計決策:

  1. Kafka在設計之時為就將持久化消息作為通常的使用情況進行了考慮。
  2. 主要的設計約束是吞吐量而不是功能。
  3. 有關哪些數據已經被使用了的狀態信息保存為數據使用者(consumer)的一部分,而不是保存在服務器之上。
  4. Kafka是一種顯式的分布式系統。它假設,數據生產者(producer)、代理(brokers)和數據使用者(consumer)分散于多臺機器之上。

以上這些設計決策將在下文中進行逐條詳述。

基礎知識

首先來看一些基本的術語和概念。

消息指的是通信的基本單位。由消息生產者(producer)發布關于某話題(topic)的消息,這句話的意思是,消息以一種物理方式被發送給了作為代理(broker)的服務器(可能是另外一臺機器)。若干的消息使用者(consumer)訂閱(subscribe)某個話題,然后生產者所發布的每條消息都會被發送給所有的使用者。

Kafka是一個顯式的分布式系統 —— 生產者、使用者和代理都可以運行在作為一個邏輯單位的、進行相互協作的集群中不同的機器上。對于代理和生產者,這么做非常自然,但使用者卻需要一些特殊的支持。每個使用者進程都屬于一個使用者小組(consumer group) 。準確地講,每條消息都只會發送給每個使用者小組中的一個進程。因此,使用者小組使得許多進程或多臺機器在邏輯上作為一個單個的使用者出現。使用者小組這個概念非常強大,可以用來支持JMS中隊列(queue)或者話題(topic)這兩種語義。為了支持隊列 語義,我們可以將所有的使用者組成一個單個的使用者小組,在這種情況下,每條消息都會發送給一個單個的使用者。為了支持話題語 義,可以將每個使用者分到它自己的使用者小組中,隨后所有的使用者將接收到每一條消息。在我們的使用當中,一種更常見的情況是,我們按照邏輯劃分出多個使 用者小組,每個小組都是有作為一個邏輯整體的多臺使用者計算機組成的集群。在大數據的情況下,Kafka有個額外的優點,對于一個話題而言,無論有多少使 用者訂閱了它,一條條消息都只會存儲一次。

消息持久化(Message Persistence)及其緩存

不要害怕文件系統!

在對消息進行存儲和緩存時,Kafka嚴重地依賴于文件系統。 大家普遍認為“磁盤很慢”,因而人們都對持久化結(persistent structure)構能夠提供說得過去的性能抱有懷疑態度。實際上,同人們的期望值相比,磁盤可以說是既很慢又很快,這取決于磁盤的使用方式。設計的很 好的磁盤結構往往可以和網絡一樣快。

磁盤性能方面最關鍵的一個事實是,在過去的十幾年中,硬盤的吞吐量正在變得和磁盤尋道時間嚴重不一致了。結果,在一個由6個7200rpm的SATA硬盤 組成的RAID-5磁盤陣列上,線性寫入(linear write)的速度大約是300MB/秒,但隨即寫入卻只有50k/秒,其中的差別接近10000倍。線性讀取和寫入是所有使用模式中最具可預計性的一種 方式,因而操作系統采用預讀(read-ahead)和后寫(write-behind)技術對磁盤讀寫進行探測并優化后效果也不錯。預讀就是提前將一個 比較大的磁盤塊中內容讀入內存,后寫是將一些較小的邏輯寫入操作合并起來組成比較大的物理寫入操作。關于這個問題更深入的討論請參考這篇文章ACM Queue article;實際上他們發現,在某些情況下,順序磁盤訪問能夠比隨即內存訪問還要快!

為了抵消這種性能上的波動,現代操作系變得越來越積極地將主內存用作磁盤緩存。所有現代的操作系統都會樂于將所有空閑內存轉做磁盤 緩存,即時在需要回收這些內存的情況下會付出一些性能方面的代價。所有的磁盤讀寫操作都需要經過這個統一的緩存。想要舍棄這個特性都不太容易,除非使用直 接I/O。因此,對于一個進程而言,即使它在進程內的緩存中保存了一份數據,這份數據也可能在OS的頁面緩存(pagecache)中有重復的一份,結構 就成了一份數據保存了兩次。

更進一步講,我們是在JVM的基礎之上開發的系統,只要是了解過一些Java中內存使用方法的人都知道這兩點:

  1. Java對象的內存開銷(overhead)非常大,往往是對象中存儲的數據所占內存的兩倍(或更糟)。
  2. Java中的內存垃圾回收會隨著堆內數據不斷增長而變得越來越不明確,回收所花費的代價也會越來越大。


由于這些因素,使用文件系統并依賴于頁面緩存要優于自己在內存中維護一個緩存或者什么別的結構 —— 通過對所有空閑內存自動擁有訪問權,我們至少將可用的緩存大小翻了一倍,然后通過保存壓縮后的字節結構而非單個對象,緩存可用大小接著可能又翻了一倍。這 么做下來,在GC性能不受損失的情況下,我們可在一臺擁有32G內存的機器上獲得高達28到30G的緩存。而且,這種緩存即使在服務重啟之后會仍然保持有 效,而不象進程內緩存,進程重啟后還需要在內存中進行緩存重建(10G的緩存重建時間可能需要10分鐘),否則就需要以一個全空的緩存開始運行(這么做它 的初始性能會非常糟糕)。這還大大簡化了代碼,因為對緩存和文件系統之間的一致性進行維護的所有邏輯現在都是在OS中實現的,這事OS做起來要比我們在進 程中做那種一次性的緩存更加高效,準確性也更高。如果你使用磁盤的方式更傾向于線性讀取操作,那么隨著每次磁盤讀取操作,預讀就能非常高效使用隨后準能用得著的數據填充緩存。

這就讓人聯想到一個非常簡單的設計方案:不是要在內存中保存盡可能多的數據并在需要時將這些數據刷新(flush)到文件系統,而是我們要做完全相反的事 情。所有數據都要立即寫入文件系統中持久化的日志中但不進行刷新數據的任何調用。實際中這么做意味著,數據被傳輸到OS內核的頁面緩存中了,OS隨后會將 這些數據刷新到磁盤的。此外我們添加了一條基于配置的刷新策略,允許用戶對把數據刷新到物理磁盤的頻率進行控制(每當接收到N條消息或者每過M秒),從而 可以為系統硬件崩潰時“處于危險之中”的數據在量上加個上限。

這種以頁面緩存為中心的設計風格在一篇講解Varnish的設計思想的文章中有詳細的描述(文風略帶有助于身心健康的傲氣)。

常量時長足矣

消息系統元數據的持久化數據結構往往采用BTree。 BTree是目前最通用的數據結構,在消息系統中它可以用來廣泛支持多種不同的事務性或非事務性語義。 它的確也帶來了一個非常高的處理開銷,Btree運算的時間復雜度為O(log N)。一般O(log N)被認為基本上等于常量時長,但對于磁盤操作來講,情況就不同了。磁盤尋道時間一次要花10ms的時間,而且每個磁盤同時只能進行一個尋道操作,因而其 并行程度很有限。因此,即使少量的磁盤尋道操作也會造成非常大的時間開銷。因為存儲系統混合了高速緩存操作和真正的物理磁盤操作,所以樹型結構(tree structure)可觀察到的性能往往是超線性的(superlinear)。更進一步講,BTrees需要一種非常復雜的頁面級或行級鎖定機制才能避 免在每次操作時鎖定一整顆樹。實現這種機制就要為行級鎖定付出非常高昂的代價,否則就必須對所有的讀取操作進行串行化(serialize)。因為對磁盤 尋道操作的高度依賴,就不太可能高效地從驅動器密度(drive density)的提高中獲得改善,因而就不得不使用容量較小(< 100GB)轉速較高的SAS驅動去,以維持一種比較合理的數據與尋道容量之比。

直覺上講,持久化隊列可以按照通常的日志解決方案的樣子構建,只是簡單的文件讀取和簡單地向文件中添加內容。雖然這種結果必然無法支持BTree實現中的 豐富語義,但有個優勢之處在于其所有的操作的復雜度都是O(1),讀取操作并不需要阻止寫入操作,而且反之亦然。這樣做顯然有性能優勢,因為性能完全同數 據大小之間脫離了關系 —— 一個服務器現在就能利用大量的廉價、低轉速、容量超過1TB的SATA驅動器。雖然這些驅動器尋道操作的性能很低,但這些驅動器在大量數據讀寫的情況下性 能還湊和,而只需1/3的價格就能獲得3倍的容量。 能夠存取到幾乎無限大的磁盤空間而無須付出性能代價意味著,我們可以提供一些消息系統中并不常見的功能。例如,在Kafka中,消息在使用完后并沒有立即 刪除,而是會將這些消息保存相當長的一段時間(比方說一周)。

效率最大化

我們的假設是,系統里消息的量非常之大,實際消息量是網站頁面瀏覽總數的數倍之多(因為每個頁面瀏覽就是我們要處理的其中一個活動)。而且我們假設發布的每條消息都會被至少讀取一次(往往是多次),因而我們要為消息使用而不是消息的產生進行系統優化,

導致低效率的原因常見的有兩個:過多的網絡請求和大量的字節拷貝操作。

為了提高效率,API是圍繞這“消息集”(message set)抽象機制進行設計的,消息集將消息進行自然分組。這么做能讓網絡請求把消息合成一個小組,分攤網絡往返(roundtrip)所帶來的開銷,而不是每次僅僅發送一個單個消息。

MessageSet實現(implementation)本身是對字節數組或文件進行一次包裝后形成的一薄層API。因而,里面并不存在消息處理所需的 單獨的序列化(serialization)或逆序列化(deserialization)的步驟。消息中的字段(field)是按需進行逆序列化的(或 者說,在不需要時就不進行逆序列化)。

由代理維護的消息日志本身不過是那些已寫入磁盤的消息集的目錄。按此進行抽象處理后,就可以讓代理和消息使用者共用一個單個字節的格式(從某種程度上說,消息生產者也可以用它,消息生產者的消息要求其校驗和(checksum)并在驗證后才會添加到日志中)

使用共通的格式后就能對最重要的操作進行優化了:持久化后日志塊(chuck)的網絡傳輸。為了將數據從頁面緩存直接傳送給socket,現代的Unix 操作系統提供了一個高度優化的代碼路徑(code path)。在Linux中這是通過sendfile這個系統調用實現的。通過Java中的API,FileChannel.transferTo,由它來簡潔的調用上述的系統調用。

為了理解sendfile所帶來的效果,重要的是要理解將數據從文件傳輸到socket的數據路徑:

  1. 操作系統將數據從磁盤中讀取到內核空間里的頁面緩存
  2. 應用程序將數據從內核空間讀入到用戶空間的緩沖區
  3. 應用程序將讀到的數據寫回內核空間并放入socke的緩沖區
  4. 操作系統將數據從socket的緩沖區拷貝到NIC(網絡借口卡,即網卡)的緩沖區,自此數據才能通過網絡發送出去

這樣效率顯然很低,因為里面涉及4次拷貝,2次系統調用。使用sendfile就可以避免這些重復的拷貝操作,讓OS直接將數據從頁面緩存發送到網絡中,其中只需最后一步中的將數據拷貝到NIC的緩沖區。

我們預期的一種常見的用例是一個話題擁有多個消息使用者。采用前文所述的零拷貝優化方案,數據只需拷貝到頁面緩存中一次,然后每次發送給使用者時都對它進 行重復使用即可,而無須先保存到內存中,然后在閱讀該消息時每次都需要將其拷貝到內核空間中。如此一來,消息使用的速度就能接近網絡連接的極限。

要得到Java中對send'file和零拷貝的支持方面的更多背景知識,請參考IBM developerworks上的這篇文章

端到端的批量壓縮

多數情況下系統的瓶頸是網絡而不是CPU。 這一點對于需要將消息在個數據中心間進行傳輸的數據管道來說,尤其如此。當然,無需來自Kafka的支持,用戶總是可以自行將消息壓縮后進行傳輸,但這么 做的壓縮率會非常低,因為不同的消息里都有很多重復性的內容(比如JSON里的字段名、web日志中的用戶代理或者常用的字符串)。高效壓縮需要將多條消 息一起進行壓縮而不是分別壓縮每條消息。理想情況下,以端到端的方式這么做是行得通的 —— 也即,數據在消息生產者發送之前先壓縮一下,然后在服務器上一直保存壓縮狀態,只有到最終的消息使用者那里才需要將其解壓縮。

通過運行遞歸消息集,Kafka對這種壓縮方式提供了支持。 一批消息可以打包到一起進行壓縮,然后以這種形式發送給服務器。這批消息都會被發送給同一個消息使用者,并會在到達使用者那里之前一直保持為被壓縮的形式。

Kafka支持GZIP和Snappy壓縮協議。關于壓縮的更多更詳細的信息,請參見這里

客戶狀態

追蹤(客戶)消費了什么是一個消息系統必須提供的一個關鍵功能之一。它并不直觀,但是記錄這個狀態是該系統的關鍵性能之一。狀態追蹤要求(不斷)更新一個 有持久性的實體的和一些潛在會發生的隨機訪問。因此它更可能受到存儲系統的查詢時間的制約而不是帶寬(正如上面所描述的)。

大部分消息系統保留著關于代理者使用(消費)的消息的元數據。也就是說,當消息被交到客戶手上時,代理者自己記錄了整個過程。這是一個相當直觀的選擇,而 且確實對于一個單機服務器來說,它(數據)能去(放在)哪里是不清晰的。又由于許多消息系統存儲使用的數據結構規模小,所以這也是個實用的選擇--因為代 理者知道什么被消費了使得它可以立刻刪除它(數據),保持數據大小不過大。

也許不顯然的是,讓代理和使用者這兩者對消息的使用情況做到一致表述絕不是一件輕而易舉的事情。如果代理每次都是在將消息發送到網絡中后就將該消息記錄為已使用的話,一旦使用者沒能真正處理到該消息(比方說,因為它宕機或這請求超時了抑或別的什么原因),就會出現消息丟失的情況。為了解決此問題,許多消息系新加了一個確認功能,當消息發出后僅把它標示為已發送而不是已使用,然后代理需要等到來自使用者的特定的確認信息后才將消息記錄為已使用。 這種策略的確解決了丟失消息的問題,但由此產生了新問題。首先,如果使用者已經處理了該消息但卻未能發送出確認信息,那么就會讓這一條消息被處理兩次。第 二個問題是關于性能的,這種策略中的代理必須為每條單個的消息維護多個狀態(首先為了防止重復發送就要將消息鎖定,然后,然后還要將消息標示為已使用后才 能刪除該消息)。另外還有一些棘手的問題需要處理,比如,對于那些以發出卻未得到確認的消息該如何處理?

消息傳遞語義(Message delivery semantics)

系統可以提供的幾種可能的消息傳遞保障如下所示:

  • 最多一次—這種用于處理前段文字所述的第一種情況。消息在發出后立即標示為已使用,因此消息不會被發出去兩次,但這在許多故障中都會導致消息丟失。
  • 至少一次—這種用于處理前文所述的第二種情況,系統保證每條消息至少會發送一次,但在有故障的情況下可能會導致重復發送。
  • 僅僅一次—這種是人們實際想要的,每條消息只會而且僅會發送一次。

這個問題已得到廣泛的研究,屬于“事務提交”問題的一個變種。提供僅僅一次語義的算法已經有了,兩階段或者三階段提交法以及Paxos算法的一些變種就是 其中的一些例子,但它們都有與生俱來的的缺陷。這些算法往往需要多個網絡往返(round trip),可能也無法很好的保證其活性(liveness)(它們可能會導致無限期停機)。FLP結果給出了這些算法的一些基本的局限。

Kafka對元數據做了兩件很不尋常的事情。一件是,代理將數據流劃分為一組互相獨立的分區。這些分區的語義由生產者定義,由生產者來指定每條消息屬于哪 個分區。一個分區內的消息以到達代理的時間為準進行排序,將來按此順序將消息發送給使用者。這么一來,就用不著為每一天消息保存一條元數據(比如說,將消 息標示為已使用)了,我們只需為使用者、話題和分區的每種組合記錄一個“最高水位標記”(high water mark)即可。因此,標示使用者狀態所需的元數據總量實際上特別小。在Kafka中,我們將該最高水位標記稱為“偏移量”(offset),這么叫的原 因將在實現細節部分講解。

使用者的狀態

在Kafka中,由使用者負責維護反映哪些消息已被使用的狀態信息(偏移量)。典型情況下,Kafka使用者的library會把狀態數據保存到 Zookeeper之中。然而,讓使用者將狀態信息保存到保存它們的消息處理結果的那個數據存儲(datastore)中也許會更佳。例如,使用者也許就 是要把一些統計值存儲到集中式事物OLTP數據庫中,在這種情況下,使用者可以在進行那個數據庫數據更改的同一個事務中將消息使用狀態信息存儲起來。這樣 就消除了分布式的部分,從而解決了分布式中的一致性問題!這在非事務性系統中也有類似的技巧可用。搜索系統可用將使用者狀態信息同它的索引段(index segment)存儲到一起。盡管這么做可能無法保證數據的持久性(durability),但卻可用讓索引同使用者狀態信息保存同步:如果由于宕機造成 有一些沒有刷新到磁盤的索引段信息丟了,我們總是可用從上次建立檢查點(checkpoint)的偏移量處繼續對索引進行處理。與此類似,Hadoop的 加載作業(load job)從Kafka中并行加載,也有相同的技巧可用。每個Mapper在map任務結束前,將它使用的最后一個消息的偏移量存入HDFS。

這個決策還帶來一個額外的好處。使用者可用故意回退(rewind)到以前的偏移量處,再次使用一遍以前使用過的數據。雖然這 么做違背了隊列的一般協約(contract),但對很多使用者來講卻是個很基本的功能。舉個例子,如果使用者的代碼里有個Bug,而且是在它處理完一些 消息之后才被發現的,那么當把Bug改正后,使用者還有機會重新處理一遍那些消息。

Push和Pull

相關問題還有一個,就是到底是應該讓使用者從代理那里吧數據Pull(拉)回來還是應該讓代理把數據Push(推)給使用者。和大部分消息系統一 樣,Kafka在這方面遵循了一種更加傳統的設計思路:由生產者將數據Push給代理,然后由使用者將數據代理那里Pull回來。近來有些系統,比如 scribe和flume,更著重于日志統計功能,遵循了一種非常不同的基于Push的設計思路,其中每個節點都可以作為代理,數據一直都是向下游 Push的。上述兩種方法都各有優缺點。然而,因為基于Push的系統中代理控制著數據的傳輸速率,因此它難以應付大量不同種 類的使用者。我們的設計目標是,讓使用者能以它最大的速率使用數據。不幸的是,在Push系統中當數據的使用速率低于產生的速率時,使用者往往會處于超載 狀態(這實際上就是一種拒絕服務攻擊)。基于Pull的系統在使用者的處理速度稍稍落后的情況下會表現更佳,而且還可以讓使用者在有能力的時候往往前趕 趕。讓使用者采用某種退避協議(backoff protocol)向代理表明自己處于超載狀態,可以解決部分問題,但是,將傳輸速率調整到正好可以完全利用(但從不能過度利用)使用者的處理能力可比初 看上去難多了。以前我們嘗試過多次,想按這種方式構建系統,得到的經驗教訓使得我們選擇了更加常規的Pull模型。

分發

Kafka通常情況下是運行在集群中的服務器上。沒有中央的“主”節點。代理彼此之間是對等的,不需要任何手動配置即可可隨時添加和刪除。同樣,生產者和 消費者可以在任何時候開啟。 每個代理都可以在Zookeeper(分布式協調系統)中注冊的一些元數據(例如,可用的主題)。生產者和消費者可以使用Zookeeper發現主題和相 互協調。關于生產者和消費者的細節將在下面描述。

生產者

生產者自動負載均衡

對于生產者,Kafka支持客戶端負載均衡,也可以使用一個專用的負載均衡器對TCP連接進行負載均衡調整。專用的第四層負載均衡器在Kafka代理之上 對TCP連接進行負載均衡。在這種配置的情況,一個給定的生產者所發送的消息都會發送給一個單個的代理。使用第四層負載均衡器的好處是,每個生產者僅需一 個單個的TCP連接而無須同Zookeeper建立任何連接。不好的地方在于所有均衡工作都是在TCP連接的層次完成的,因而均衡效果可能并不佳(如果有 些生產者產生的消息遠多于其它生產者,按每個代理對TCP連接進行平均分配可能會導致每個代理接收到的消息總數并不平均)。

采用客戶端基于zookeeper的負載均衡可以解決部分問題。如果這么做就能讓生產者動態地發現新的代理,并按請求數量進行負載均衡。類似的,它還能讓 生產者按照某些鍵值(key)對數據進行分區(partition)而不是隨機亂分,因而可以保存同使用者的關聯關系(例如,按照用戶id對數據使用進行 分區)。這種分法叫做“語義分區”(semantic partitioning),下文再討論其細節。

下面講解基于zookeeper的負載均衡的工作原理。在發生下列事件時要對zookeeper的監視器(watcher)進行注冊:

  • 加入了新的代理
  • 有一個代理下線了
  • 注冊了新的話題
  • 代理注冊了已有話題。

生產者在其內部為每一個代理維護了一個彈性的連接(同代理建立的連接)池。通過使用zookeeper監視器的回調函 數(callback),該連接池在建立/保持同所有在線代理的連接時都要進行更新。當生產者要求進入某特定話題時,由分區者(partitioner) 選擇一個代理分區(參加語義分區小結)。從連接池中找出可用的生產者連接,并通過它將數據發送到剛才所選的代理分區。

異步發送

對于可伸縮的消息系統而言,異步非阻塞式操作是不可或缺的。在Kafka中,生產者有個選項(producer.type=async)可用指定使用異步 分發出產請求(produce request)。這樣就允許用一個內存隊列(in-memory queue)把生產請求放入緩沖區,然后再以某個時間間隔或者事先配置好的批量大小將數據批量發送出去。因為一般來說數據會從一組以不同的數據速度生產數 據的異構的機器中發布出,所以對于代理而言,這種異步緩沖的方式有助于產生均勻一致的流量,因而會有更佳的網絡利用率和更高的吞吐量。

語義分區

下面看看一個想要為每個成員統計一個個人空間訪客總數的程序該怎么做。應該把一個成員的所有個人空間訪問事件發送給某特定分區,因此就可以把對一個成員的 所有更新都放在同一個使用者線程中的同一個事件流中。生產者具有從語義上將消息映射到有效的Kafka節點和分區之上的能力。這樣就可以用一個語義分區函 數將消息流按照消息中的某個鍵值進行分區,并將不同分區發送給各自相應的代理。通過實現kafak.producer.Partitioner接口,可以 對分區函數進行定制。在缺省情況下使用的是隨即分區函數。上例中,那個鍵值應該是member_id,分區函數可以是 hash(member_id)%num_partitions。

對Hadoop以及其它批量數據裝載的支持

具有伸縮性的持久化方案使得Kafka可支持批量數據裝載,能夠周期性將快照數據載入進行批量處理的離線系統。我們利用這個功能將數據載入我們的數據倉庫(data warehouse)和Hadoop集群。

批量處理始于數據載入階段,然后進入非循環圖(acyclic graph)處理過程以及輸出階段(支持情況在這里)。支持這種處理模型的一個重要特性是,要有重新裝載從某個時間點開始的數據的能力(以防處理中有任何錯誤發生)。

對于Hadoop,我們通過在單個的map任務之上分割裝載任務對數據的裝載進行了并行化處理,分割時,所有節點/話題/分區的每種組合都要分出一個來。Hadoop提供了任務管理,失敗的任務可以重頭再來,不存在數據被重復的危險。

實施細則

下面給出了一些在上一節所描述的低層相關的實現系統的某些部分的細節的簡要說明。

API 設計

生產者 APIs

生產者 API 是給兩個底層生產者的再封裝 -kafka.producer.SyncProducerandkafka.producer.async.AsyncProducer.

class Producer {

  /* Sends the data, partitioned by key to the topic using either the */
  /* synchronous or the asynchronous producer */
  public void send(kafka.javaapi.producer.ProducerData producerData);

  /* Sends a list of data, partitioned by key to the topic using either */
  /* the synchronous or the asynchronous producer */
  public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData);

  /* Closes the producer and cleans up */   
  public void close();

} 

該API的目的是將生產者的所有功能通過一個單個的API公開給其使用者(client)。新建的生產者可以:

  • 對多個生產者請求進行排隊/緩沖并異步發送批量數據 —— kafka.producer.Producer提供了在將多個生產請求序列化并發送給適當的Kafka代理分區之前,對這些生產請求進行批量處理的能力 (producer.type=async)。批量的大小可以通過一些配置參數進行控制。當事件進入隊列時會先放入隊列進行緩沖,直到時間到了 queue.time或者批量大小到達batch.size為止,后臺線程 (kafka.producer.async.ProducerSendThread)會將這批數據從隊列中取出,交給 kafka.producer.EventHandler進行序列化并發送給適當的kafka代理分區。通過event.handler這個配置參數,可 以在系統中插入一個自定義的事件處理器。在該生產者隊列管道中的各個不同階段,為了插入自定義的日志/跟蹤代碼或者自定義的監視邏輯,如能注入回調函數會 非常有用。通過實現kafka.producer.asyn.CallbackHandler接口并將配置參數callback.handler設置為實 現類就能夠實現注入。
  • 使用用戶指定的Encoder處理數據的序列化(serialization)
    1 interfaceEncoder<T> {
    2   publicMessage toMessage(T data);
    3 }
    Encoder的缺省值是一個什么活都不干的kafka.serializer.DefaultEncoder。
  • 提供基于zookeeper的代理自動發現功能 —— 通過使用zk.connect配置參數指定zookeeper的連接url,就能夠使用基于zookeeper的代理發現和負載均衡功能。在有些應用場 合,可能不太適合于依賴zookeeper。在這種情況下,生產者可以從broker.list這個配置參數中獲得一個代理的靜態列表,每個生產請求會被 隨即的分配給各代理分區。如果相應的代理宕機,那么生產請求就會失敗。
  • 通過使用一個可選性的、由用戶指定的Partitioner,提供由軟件實現的負載均衡功能 —— 數據發送路徑選擇決策受kafka.producer.Partitioner的影響。
    1 interfacePartitioner<T> {
    2    intpartition(T key,intnumPartitions);
    3 }
    分區API根據相關的鍵值以及系統中具有的代理分區的數量返回一個分區id。將該id用作索引,在broker_id和partition組成的經過排序 的列表中為相應的生產者請求找出一個代理分區。缺省的分區策略是hash(key)%numPartitions。如果key為null,那就進行隨機選 擇。使用partitioner.class這個配置參數也可以插入自定義的分區策略。

使用者API

我們有兩個層次的使用者API。底層比較簡單的API維護了一個同單個代理建立的連接,完全同發送給服務器的網絡請求相吻合。該API完全是無狀態的,每個請求都帶有一個偏移量作為參數,從而允許用戶以自己選擇的任意方式維護該元數據。

高層API對使用者隱藏了代理的具體細節,讓使用者可運行于集群中的機器之上而無需關心底層的拓撲結構。它還維護著數據使用的狀態。高層API還提供了訂閱同一個過濾表達式(例如,白名單或黑名單的正則表達式)相匹配的多個話題的能力。

底層API

class SimpleConsumer {

  /* Send fetch request to a broker and get back a set of messages. */ 
  public ByteBufferMessageSet fetch(FetchRequest request);

  /* Send a list of fetch requests to a broker and get back a response set. */ 
  public MultiFetchResponse multifetch(List<FetchRequest> fetches);

  /**
   * Get a list of valid offsets (up to maxSize) before the given time.
   * The result is a list of offsets, in descending order.
   * @param time: time in millisecs,
   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

底層API不但用于實現高層API,而且還直接用于我們的離線使用者(比如Hadoop這個使用者),這些使用者還對狀態的維護有比較特定的需求。

高層API

/* create a connection to the cluster */ 
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {

  /**
   * This method is used to get a list of KafkaStreams, which are iterators over
   * MessageAndMetadata objects from which you can obtain messages and their
   * associated metadata (currently only topic).
   *  Input: a map of <topic, #streams>
   *  Output: a map of <topic, list of message streams>
   */
  public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); 

  /**
   * You can also obtain a list of KafkaStreams, that iterate over messages
   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
   * whitelist or a blacklist which is a standard Java regex.)
   */
  public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);

  /* Commit the offsets of all messages consumed so far. */
  public commitOffsets()

  /* Shut down the connector */
  public shutdown()
}

該API的中心是一個由KafkaStream這個類實現的迭代器(iterator)。每個KafkaStream都代表著一個從一個或多個分區到一個 或多個服務器的消息流。每個流都是使用單個線程進行處理的,所以,該API的使用者在該API的創建調用中可以提供所需的任意個數的流。這樣,一個流可能 會代表多個服務器分區的合并(同處理線程的數目相同),但每個分區只會把數據發送給一個流中。

createMessageStreams方法為使用者注冊到相應的話題之上,這將導致需要對使用者/代理的分配情況進行重新平衡。為了將重新平衡操作減 少到最小。該API鼓勵在一次調用中就創建多個話題流。createMessageStreamsByFilter方法為發現同其過濾條件想匹配的話題 (額外地)注冊了多個監視器(watchers)。應該注意,createMessageStreamsByFilter方法所返回的每個流都可能會對多 個話題進行迭代(比如,在滿足過濾條件的話題有多個的情況下)。

網絡層

網絡層就是一個特別直截了當的NIO服務器,在此就不進行過于細致的討論了。sendfile是通過給MessageSet接口添加了一個writeTo 方法實現的。這樣就可以讓基于文件的消息更加高效地利用transferTo實現,而不是使用線程內緩沖區讀寫方式。線程模型用的是一個單個的接收器 (acceptor)線程和每個可以處理固定數量網絡連接的N個處理器線程。這種設計方案在別處已經經過了非常徹底的檢驗,發現其實現起來簡單、運行起來很快。其中使用的協議一直都非常簡單,將來還可以用其它語言實現其客戶端。

消息

消息由一個固定大小的消息頭和一個變長不透明字節數字的有效載荷構成(opaque byte array payload)。消息頭包含格式的版本信息和一個用于探測出壞數據和不完整數據的CRC32校驗。讓有效載荷保持不透明是個非常正確的決策:在用于序列 化的代碼庫方面現在正在取得非常大的進展,任何特定的選擇都不可能適用于所有的使用情況。都不用說,在Kafka的某特定應用中很有可能在它的使用中需要 采用某種特殊的序列化類型。MessageSet接口就是一個使用特殊的方法對NIOChannel進行大宗數據讀寫(bulk reading and writing to an NIOChannel)的消息迭代器。

消息的格式

    /** 
     * A message. The format of an N byte message is the following: 
     * 
     * If magic byte is 0 
     * 
     * 1. 1 byte "magic" identifier to allow format changes 
     * 
     * 2. 4 byte CRC32 of the payload 
     * 
     * 3. N - 5 byte payload 
     * 
     * If magic byte is 1 
     * 
     * 1. 1 byte "magic" identifier to allow format changes 
     * 
     * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) 
     * 
     * 3. 4 byte CRC32 of the payload 
     * 
     * 4. N - 6 byte payload 
     * 
     */

日志

具有兩個分區的、名稱為"my_topic"的話題的日志由兩個目錄組成(即:my_topic_0和my_topic_1),目錄中存儲的是內容為該話 題的消息的數據文件。日志的文件格式是一系列的“日志項”;每條日志項包含一個表示消息長度的4字節整數N,其后接著保存的是N字節的消息。每條消息用一 個64位的整數偏移量進行唯一性標示,該偏移量表示了該消息在那個分區中的那個話題下發送的所有消息組成的消息流中所處的字節位置。每條消息在磁盤上的格 式如下文所示。每個日志文件的以它所包含的第一條消息的偏移量來命名。因此,第一個創建出來的文件的名字將為00000000000.kafka,隨后每 個后加的文件的名字將是前一個文件的文件名大約再加S個字節所得的整數,其中,S是配置文件中指定的最大日志文件的大小。

消息的確切的二進制格式都有版本,它保持為一個標準的接口,讓消息集可以根據需要在生產者、代理、和使用者直接進行自由傳輸而無須重新拷貝或轉換。其格式如下所示:

On-disk format of a message

message length : 4 bytes (value: 1+4+n) 
"magic" value  : 1 byte
crc            : 4 bytes
payload        : n bytes

將消息的偏移量作為消息的可不常見。我們原先的想法是使用由生產者產生的GUID作為消息id,然后在每個代理上作一個從GUID到偏移量的映射。但是, 既然使用者必須為每個服務器維護一個ID,那么GUID所具有的全局唯一性就失去了價值。更有甚者,維護將從一個隨機數到偏移量的映射關系帶來的復雜性, 使得我們必須使用一種重量級的索引結構,而且這種結構還必須與磁盤保持同步,這樣我們還就必須使用一種完全持久化的、需隨機訪問的數據結構。如此一來,為 了簡化查詢結構,我們就決定使用一個簡單的依分區的原子計數器(atomic counter),這個計數器可以同分區id以及節點id結合起來唯一的指定一條消息;這種方法使得查詢結構簡化不少,盡管每次在處理使用者請求時仍有可 能會涉及多次磁盤尋道操作。然而,一旦我們決定使用計數器,跳向直接使用偏移量作為id就非常自然了,畢竟兩者都是分區內具有唯一性的、單調增加的整數。 既然偏移量是在使用者API中并不會體現出來,所以這個決策最終還是屬于一個實現細節,進而我們就選擇了這種更加高效的方式。

分布式發布訂閱消息系統 Kafka 架構設計

寫操作

日志可以順序添加,添加的內容總是保存到最后一個文件。當大小超過配置中指定的大小(比如說1G)后,該文件就會換成另外一個新文件。有關日志的配置參數 有兩個,一個是M,用于指出寫入多少條消息之后就要強制OS將文件刷新到磁盤;另一個是S,用來指定過多少秒就要強制進行一次刷新。這樣就可以保證一旦發 生系統崩潰,最多會有M條消息丟失,或者最長會有S秒的數據丟失,

讀操作

可以通過給出消息的64位邏輯偏移量和S字節的數據塊最大的字節數對日志文件進行讀取。讀取操作返回的是這S個字節中包含的消息的迭代器。S應該要比最長 的單條消息的字節數大,但在出現特別長的消息情況下,可以重復進行多次讀取,每次的緩沖區大小都加倍,直到能成功讀取出這樣長的一條消息。也可以指定一個 最大的消息和緩沖區大小并讓服務器拒絕接收比這個大小大一些的消息,這樣也能給客戶端一個能夠讀取一條完整消息所需緩沖區的大小的上限。很有可能會出現讀 取緩沖區以一個不完整的消息結尾的情況,這個情況用大小界定(size delimiting)很容易就能探知。

從某偏移量開始進行日志讀取的實際過程需要先找出存儲所需數據的日志段文件,從全局偏移量計算出文件內偏移量,然后再從該文件偏移量處開始讀取。搜索過程通過對每個文件保存在內存中的范圍值進行一種變化后的二分查找完成。

日志提供了獲取最新寫入的消息的功能,從而允許從“當下”開始消息訂閱。這個功能在使用者在SLA規定的天數內沒能正常使用數據的情況下也很有用。當使用 者企圖從一個并不存在的偏移量開始使用數據時就會出現這種情況,此時使用者會得到一個OutOfRangeException異常,它可以根據具體的使用 情況對自己進行重啟或者僅僅失敗而退出。

以下是發送給數據使用者(consumer)的結果的格式。

MessageSetSend (fetch result)

total length     : 4 bytes
error code       : 2 bytes
message 1        : x bytes
...
message n        : x bytes
MultiMessageSetSend (multiFetch result)

total length       : 4 bytes
error code         : 2 bytes
messageSetSend 1
...
messageSetSend n

刪除

一次只能刪除一個日志段的數據。 日志管理器允許通過可加載的刪除策略設定刪除的文件。 當前策略刪除修改事件超過 N 天以上的文件,也可以選擇保留最后 N GB 的數據。 為了避免刪除時的讀取鎖定沖突,我們可以使用副本寫入模式,以便在進行刪除的同時對日志段的一個不變的靜態快照進行二進制搜索。

數據正確性保證

日志功能里有一個配置參數M,可對在強制進行磁盤刷新之前可寫入的消息的最大條目數進行控制。在系統啟動時會運行一個日志恢復過程,對最新的日志段內所有消息進行迭代,以對每條消息項的有效性進行驗證。一條消息項是合法的,僅當其大小加偏移量小于文件的大小并且該消息中有效載荷的CRC32值同該消息中存儲的CRC值相等。在探測出有數據損壞的情況下,就要將文件按照最后一個有效的偏移量進行截斷。

要注意,這里有兩種必需處理的數據損壞情況:由于系統崩潰造成的未被正常寫入的數據塊(block)因而需要截斷的情況以及由于文件中被加入了毫無意義的 數據塊而造成的數據損壞情況。造成數據損壞的原因是,一般來說OS并不能保證文件索引節點(inode)和實際數據塊這兩者的寫入順序,因此,除了可能會 丟失未刷新的已寫入數據之外,在索引節點已經用新的文件大小更新了但在將數據塊寫入磁盤塊之前發生了系統崩潰的情況下,文件就可能會獲得一些毫無意義的數 據。CRC值就是用于這種極端情況,避免由此造成整個日志文件的損壞(盡管未得到保存的消息當然是真的找不回來了)。

分發

Zookeeper目錄

接下來討論zookeeper用于在使用者和代理直接進行協調的結構和算法。

記法

當一個路徑中的元素是用[xyz]這種形式表示的時,其意思是, xyz的值并不固定而且實際上xyz的每種可能的值都有一個zookpeer z節點(znode)。例如,/topics/[topic]表示了一個名為/topics的目錄,其中包含的子目錄同話題對應,一個話題一個目錄并且目 錄名即為話題的名稱。也可以給出數字范圍,例如[0...5],表示的是子目錄0、1、2、3、4。箭頭->用于給出z節點的內容。例如 /hello -> world表示的是一個名稱為/hello的z節點,包含的值為"world"。

代理節點的注冊

/brokers/ids/[0...N] --> host:port (ephemeral node)

上面是所有出現的代理節點的列表,列表中每一項都提供了一個具有唯一性的邏輯代理id,用于讓使用者能夠識別代理的身份(這個必須在配置中給出)。在啟動 時,代理節點就要用/brokers/ids下列出的邏輯代理id創建一個z節點,并在自己注冊到系統中。使用邏輯代理id的目的是,可以讓我們在不影響 數據使用者的情況下就能把一個代理搬到另一臺不同的物理機器上。試圖用已在使用中的代理id(比如說,兩個服務器配置成了同一個代理id)進行注冊會導致 發生錯誤。

因為代理是以非長久性z節點的方式注冊的,所以這個注冊過程是動態的,當代理關閉或宕機后注冊信息就會消失(至此要數據使用者,該代理不再有效)。

代理話題的注冊

/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

每個代理會都要注冊在某話題之下,注冊后它會維護并保存該話題的分區總數。

使用者和使用者小組

為了對數據的使用進行負載均衡并記錄使用者使用的每個代理上的每個分區上的偏移量,所有話題的使用者都要在Zookeeper中進行注冊。

多個使用者可以組成一個小組共同使用一個單個的話題。同一小組內的每個使用者共享同一個給定的group_id。比如說,如果某個使用者負責用三臺機器進行某某處理過程,你就可以為這組使用者分配一個叫做“某某”的id。這個小組id是在使用者的配置文件中指定的,并且這就是你告訴使用者它到底屬于哪個組的方法。

小組內的使用者要盡量公正地劃分出分區,每個分區僅為小組內的一個使用者所使用。

使用者ID的注冊

除了小組內的所有使用者都要共享一個group_id之外,每個使用者為了要同其它使用者區別開來,還要有一個非永久性的、具有唯一性的consumer_id(采用hostname:uuid的形式)。 consumer_id要在以下的目錄中進行注冊。

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

小組內的每個使用者都要在它所屬的小組中進行注冊并采用consumer_id創建一個z節點。z節點的值包含了一個<topic, #streams>的map。 consumer_id只是用來識別小組內活躍的每個使用者。使用者建立的z節點是個臨時性的節點,因此如果這個使用者進程終止了,注冊信息也將隨之消失。

數據使用者偏移追蹤

數據使用者跟蹤他們在每個分區中耗用的最大偏移量。這個值被存儲在一個Zookeeper(分布式協調系統)目錄中。

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

分區擁有者注冊表

每個代理分區都被分配給了指定使用者小組中的單個數據使用者。數據使用者必須在耗用給定分區前確立對其的所有權。要確立其所有權,數據使用者需要將其 id 寫入到特定代理分區中的一個臨時節點(ephemeral node)中。

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)

代理節點的注冊

代理節點之間基本上都是相互獨立的,因此它們只需要發布它們擁有的信息。當有新的代理加入進來時,它會將自己注冊到代理節點注冊目錄中,寫下它的主機名和 端口。代理還要將已有話題的列表和它們的邏輯分區注冊到代理話題注冊表中。在代理上生成新話題時,需要動態的對話題進行注冊。

使用者注冊算法

當使用者啟動時,它要做以下這些事情:

  1. 將自己注冊到它屬小組下的使用者id注冊表。
  2. 注冊一個監視使用者id列的表變化情況(有新的使用者加入或者任何現有使用者的離開)的變化監視器。(每個變化都會觸發一次對發生變化的使用者所屬的小組內的所有使用者進行負載均衡。)
  3. 主次一個監視代理id注冊表的變化情況(有新的代理加入或者任何現有的代理的離開)的變化監視器。(每個變化都會觸發一次對所有小組內的所有使用者負載均衡。)
  4. 如果使用者使用某話題過濾器創建了一個消息流,它還要注冊一個監視代理話題變化情況(添加了新話題)的變化監視器。(每個變化都會觸發一次對所有可用話題 的評估,以找出話題過濾器過濾出哪些話題。新過濾出來的話題將觸發一次對該使用者所在的小組內所有的使用者負載均衡。)
  5. 迫使自己在小組內進行重新負載均衡。

使用者重新負載均衡的算法

使用者重新復雜均衡的算法可用讓小組內的所有使用者對哪個使用者使用哪些分區達成一致意見。使用者重新負載均衡的動作每次添加或移除代理以及同一小組內的 使用者時被觸發。對于一個給定的話題和一個給定的使用者小組,代理分區是在小組內的所有使用者中進行平均劃分的。一個分區總是由一個單個的使用者使用。這 種設計方案簡化了實施過程。假設我們運行多個使用者以并發的方式同時使用同一個分區,那么在該分區上就會形成爭用(contention)的情況,這樣一來就需要某種形式的鎖定機制。如果使用者的個數比分區多,就會出現有寫使用者根本得不到數據的情況。在重新進行負載均衡的過程中,我們按照盡量減少每個使用者需要連接的代理的個數的方式,嘗嘗試著將分區分配給使用者。

每個使用者在重新進行負載均衡時需要做下列的事情:

   1. 針對Ci所訂閱的每個話題T
   2.   將PT設為生產話題T的所有分區
   3.   將CG設為小組內同Ci 一樣使用話題T的所有使用者
   4.   對PT進行排序(讓同一個代理上的各分區挨在一起)
   5.   對CG進行排序 
   6.   將i設為Ci在CG中的索引值并讓N = size(PT)/size(CG)
   7.   將從i*N到(i+1)*N - 1的分區分配給使用者Ci 
   8.   將Ci當前所擁有的分區從分區擁有者注冊表中刪除
   9.   將新分配的分區加入到分區擁有者注冊表中
        (我們可能需要多次嘗試才能讓原先的分區擁有者釋放其擁有權)
       

在觸發了一個使用者要重新進行負載均衡時,同一小組內的其它使用者也會幾乎在同時被觸發重新進行負載均衡。

 本文由用戶 javap 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!