ODPS跨集群遷移與數據同步經驗分享

jopen 10年前發布 | 20K 次閱讀 數據同步 集群/負載均衡

隨著業務的迅猛發展,阿里各業務部門如淘寶、天貓、一淘、B2B等每天都會產生大量的數據,日均增量數百TB。2013年初,阿里內部的生 產集群PA所在機房的存儲量最多可擴容到數十PB,而當時已使用75%的存儲量。存儲容量告急,迫切需要將生產集群PA上的大量數據遷移到其他集群。

此時,如何安全地跨集群遷移幾十PB的數據和其上相關業務,是我們面臨的第一個挑戰。數據遷移之后,兩個集群間存在大量的數據依賴,需要互相訪問最 新的數據,如何安全快速地實現跨集群數據同步,這是緊跟其后的第二個挑戰。本文將結合這兩個挑戰詳細地介紹實現的細節和解決方案。

數據多版本和讀寫

首先,跨集群復制意味著同一份數據將存儲在多個集群上。那么,每個集群上的數據可能會對應不同的版本。本文首先講述數據多版本的表示,數據多版本與跨集群復制的一些基本假設以及如何做好多版本數據的讀寫。

數據多版本的表示

ODPS中的元數據管理依賴于OTS(開放結構化數據服務)。系統中的元數據,按照project、table和partition的層級來組織和 表示。每一層級的meta都存儲在同一張OTS的大表中。如圖1所示,在每個project的meta中,有一列專用于該project多集群復制的相關 配置,還有一列專用于表示這個partition在多集群中的數據版本。

ODPS跨集群遷移與數據同步經驗分享

每個最低層級的數據(為了描述方便,將分區表的每個分區和非分區表的表統稱為分區),在meta中會有一列專用于表示這個數據的版本信息,具體內容如下:

ODPS跨集群遷移與數據同步經驗分享

表示這個集群的有效版本號為V1,在ClusterA上的版本號為V1,在ClusterB上的版本號為V0。其 中,“LatestVersion”是一個抽象描述,表示這個分區當前有效的版本號。V1是一個字符串,通常不用于做語義上的判斷,V1和V0之間并無時 序上的關系。“ClusterStatus”描述這個分區對應于每個計算集群上的物理文件的版本。

跨集群復制前提條件1:計算集群之間的關系是對等的,即計算集群之間沒有主從關系,只根據版本信息分為有效版本數據集群和無效版本數據集群。數據同步只與分區的版本信息相關,永遠將有效版本的集群分區數據同步至無效版本集群。

跨集群復制前提條件2:數據同步是準實時的,且不涉及跨集群的事務。假設一個分區當前版本信息為:

ODPS跨集群遷移與數據同步經驗分享

那么該分區在ClusterA和ClusterB均為有效數據。假設此時在ClusterA上執行一個作業,修改了分區數據,該作業僅影響ClusterA和分區有效版本號,成功執行后該分區版本信息為:

ODPS跨集群遷移與數據同步經驗分享

后臺異步將V4版本數據從ClusterA同步至ClusterB,成功后,分區版本信息變為:

ODPS跨集群遷移與數據同步經驗分享

如果在V4版本數據從ClusterA復制至ClusterB的過程中,ClusterA執行一個SQL作業,試圖修改分區版本為V5,復制任務是 否成功取決于修改meta和SQL作業修改meta的先后,如果復制任務在SQL作業之后修改meta,此時有效meta已被修改為V5,而復制作業的有 效meta是V4,此時復制任務會失敗。后臺將再次復制分區數據V5版本至ClusterB。meta修改的原子性可由OTS的事務保證。

數據訪問集群選擇

既然數據可能在多個集群上存在多個版本,那么,對這份數據進行訪問的時候如何在集群和版本之間做選擇呢?本小節會闡述這個問題。

在ODPS中,每一個project,都有一個指定的計算集群default cluster。

跨集群復制前提條件3:project的用戶空間內所有作業默認在該project的default cluster上執行。

如圖2所示,假設有兩個project分別為p1和p2,p1的default cluster為C1,p2的default cluster為C2,p1和p2的數據可能同時存在于C1和C2。此時執行一個SQL作業,“create table p2.t3 as select * from p2.t1”,該作業在p2的用戶空間內訪問p2的數據,那么按照假設3,所有數據操作都在C2上進行。

ODPS跨集群遷移與數據同步經驗分享

再執行一個SQL作業“create table p1.t1 as select * fromp2.t1”,此作業在p1的用戶空間內,按照假設3,作業將執行在C1上,而作業中對p2.t2的數據訪問遵循如下規則:

■ 如果C1上配置了p2.t2的跨集群數據復制,那么在C1上完成對p2.t2的數據操作;

■ 如果C1上未配置p2.t2的跨集群數據復制,那么在p2的default cluster即C2上完成對p2的數據操作。

也就是說,如果數據設置了跨集群數據復制,那么總是選擇當前集群的數據。如果沒有設置跨集群數據復制,那么就直接訪問數據的default cluster。

如果數據都配置跨集群數據復制,那么不僅會對數據中心間的帶寬產生很大的沖擊,還會造成不必要的計算集群存儲資源浪費。如果數據都不配置跨集群數據復制,那么跨數據中心數據訪問的網絡延遲無法保證。有跨機房數據依賴的作業的執行則得不到有效的保障。

數據讀寫

跨集群復制前提條件4:永遠讀寫有效版本的數據,允許出錯或者等待。

當選擇了一個計算集群進行數據訪問后,在實際數據訪問之前,會驗證這個分區在這個集群的數據版本。

如圖2所示,Job1在C2上訪問P2.t2,假設t2的版本信息如下:

ODPS跨集群遷移與數據同步經驗分享

那么作業順利進行。如果版本信息為:

ODPS跨集群遷移與數據同步經驗分享

那么作業會報版本信息錯誤的異常。

如果是會對數據寫回的操作,如SQL中的insert into或者insertoverwrite等,那么數據操作完成后有效版本信息也會發生改變。比如,還是訪問C2的數據,進行寫操作,作業之前版本信息為:

ODPS跨集群遷移與數據同步經驗分享

作業順利進行,完成后分區數據更新版本為V5,修改分區meta為:

ODPS跨集群遷移與數據同步經驗分享

假如當前同時有兩個作業Job3和Job4,分別對同一個分區進行寫操作,Job3在C1執行,Job4在C2執行,Job3先完成,將分區版本更新為:

ODPS跨集群遷移與數據同步經驗分享

當Job4執行完,試圖將分區數據更新為版本V6時,Job4會報錯,因為當前C2的版本V4不是最新版本,無法在該版本上進行數據讀寫。

至此,闡述了跨集群復制在元數據層的抽象表示,以及基于這樣的抽象表示,作業會如何選擇數據進行讀寫等問題。搞清楚這些,是實現跨集群復制的基礎,也是關鍵。下節會詳細描述跨集群復制系統的實現。

系統實現

ODPS是構建在飛天系統上的管理控制系統,主要負責用戶空間和對象的管理、Query和命令的解析與啟動、數據對象的訪問控制與授權等功能。系統中有worker、scheduler和executor三個角色:

■ worker處理所有RESTful請求,包括用戶空間(project)管理操作、資源(resource)管理操作、作業管理等,而SQL DML、MR、DT等啟動伏羲任務的作業及其他異步作業,會提交給scheduler進一步處理;

■ scheduler負責instance調度,包括將instance分解為task、對等待提交的task進行排序,以及向計算集群的伏羲Master詢問資源占用情況來進行流控;

■ executor負責啟動SQL/DT/MR task,向計算集群提交任務,并監控這些任務的運行。

跨集群復制在ODPS控制集群增加了兩個角色:replicationworker,一個單獨的進程; replication task,運行在executor中的抽象task。下面分別介紹這兩個角色。

replication task

一個replication task會且僅會負責管理一個project所有數據的跨集群復制任務。不同于其他task,replication task不能由用戶通過RESTful API訪問。replication task的啟動/停止由集群的管理者通過admin console操作。

復制任務產生

一個分區的數據發生版本變化,就會產生一個復制任務,該任務要完成兩件事:

■ 將有效版本的物理數據從一個有效版本集群拷貝到一個非有效版本集群;

■ 在成功拷貝后修改分區元數據,更新分區版本信息。

一個復制任務所包含的信息為:源/目的集群、table、partition、version、物理文件地址、物理文件meta等。

ODPS跨集群遷移與數據同步經驗分享

圖3是復制任務工作流程,分區的版本信息發生變化可通過以下三種途徑被感知。

■ 周期性掃描OTS中的元數據表,將有無效版本信息的集群和分區信息加入到復制任務的等待隊列。

■ 當元數據發生變化時,message service會發出相應的event。通過訂閱相關project的變化,可以在數據發生變化的第一時間感知到,加入到復制任務的等待隊列。

■ 當 用戶使用ODPS CLT提交作業時,如果要訪問的數據需要等待復制,CLT會推送復制信息,最終會加入到復制任務的等待隊列。

復制任務在等待隊列中按照復制任務的優先級高低排列,等待處理。一旦復制任務從等待隊列中取出并成功執行,則記錄到運行隊列中。

復制作業

replication task會一直試圖從等待隊列中取出復制任務,按照計劃將若干個復制任務組織成一個Job。分區的物理文件通過Job完成從源集群到目的集群的拷貝。

當一個Job計劃出來后,這個Job需要的instance個數也按照每個instance處理的上限計算好。

復制資源申請

復制資源,主要指帶寬資源,后文將詳細介紹。當復制的Job計劃好后,replication task會向replication worker申請需要的instance資源。replication worker會根據當前全局情況,授予一定instance資源給task。

復制任務執行

每個replication Job由三個task組成,即plan task、replicate task和statics task。

■ plan task:將需要復制的C物理文件按照文件大小均分在replicate task的各instance之間。

■ replicate task:完成物理文件從源集群到目的集群的拷貝。

■ statics task:根據replicate task的每個instance拷貝文件的結果,統計記錄每個分區是否復制成功。

replication task感知到一個Job結束后,會對這個Job里的每個復制成功的分區修改元數據中的版本信息。

數據版本變化

在線上生產環境中,源數據發生變化后的處理如下。

■ 新增表,新增分區:task收到event后會立即對新增的表和分區啟動OTS掃描復制任務,加入到等待隊列。

■ rename表和分區:從等待隊列中刪除相關任務,從運行隊列中停止相關任務。

■ 表和分區的版本發生變化:更新等待隊列中的相關任務,或者從運行隊列中停止相關任務。

Failover

replication task是在executor中執行的一個task。如同其他task,executor會定期向scheduler心跳更新其進度。如果長期未更 新,scheduler會再次啟動這個task,新啟動的task會根據之前task的Failover信息,恢復至之前的進度。

replication worker

worker是ODPS service中的一個角色,一個service中只有一個instance。replication worker是跨集群復制與其他模塊交互的接口,提供以下功能。

1. 復制詳情的查詢。

■ 當前所有project正在復制的Job。

2. 帶寬管理(后文詳細講述)。

■ 根據帶寬使用情況和replication task申請的帶寬和優先級等信息,統籌帶寬分配。

■ 當前集群使用帶寬查詢。

3. 某一個project/table/partition是否正在/可能會被復制。

4. 訂閱復制的project數據變化message,接受數據變化的event。

5. 維護與replication task的心跳;task通過心跳向worker更新該task目前使用的instance總數,目前該task運行的Job詳情,向worker申請instance,而worker通過心跳分配帶寬,發送相關event和命令。

流量控制

集群間的帶寬是一個有限的固定值。復制Job讀寫的數據都會競爭這個資源。假設每個instance讀寫吞吐率為a MB/s,集群間可使用帶寬為 bGb,那么需要控制instance個數不超過上限C:C = b Gb*1024/(8*a)流量控制在task端和worker端都有體現。task端每次replication task計劃好一個Job,會相應地計算出這個Job需要的instance數。此時,task會嘗試做project內的并發控制,如果該 project設置了并發instance的個數上限,那么project已經使用的instance個數 + 這個Job申請的instance個數的總和不能超過這個上限。如果project沒有設置并發上限,則會向worker申請計算出來的instance 數。

worker端

worker收到replication task的instance請求(設為a)后,會根據目前已經分配的instance總數(設為b),和集群間可使用的instance總數(設為c),來決定分配多少:

■ a < c 且b

■ a>c 且b < c則分配c

■ b >=c分配0

這種分配策略,會避免帶寬分配零碎化,零碎化會導致每個Job執行時間都過長,也可以防止帶寬浪費。同時,也帶來一個問題,在一定的時間內,使用的 帶寬總量會超過限制值。由于每個instance處理的文件大小會不超過一個上限,因此每個instance實際執行的時間不會很長。超過帶寬總量的情況 很快就可以緩解。

復制優先級

優先級的產生

不同的project,數據的重要程度不同,對數據同步的需求程度也不同;同一個project之間的不同table,在任務中的優先級也不同,關 鍵節點上的相關table對數據同步的實時性要求也會更高一些。同時,如果一個復制請求是通過CLT推送的,說明用戶依賴的數據沒有同步好,用戶正在等 待,一旦等待一定時間還未同步好,用戶請求就會超時。那么,這種復制任務的優先級也會非常高。

在這兩種場景下,為復制任務設置了優先級,優先級越高的復制任務會優先以盡可能多的資源完成。

優先級的設置和比較

用戶可以在跨集群的配置里設置每個需要同步的表的優先級,優先級為0~9,依次遞增。復制任務之間的優先級規則如下:

■ 一個Job的優先級由Job內優先級最高的分區來決定;

■ 由非CLT推送產生的復制任務,優先級高低以配置值為準;

■ CLT推送復制任務的優先級高于所有非CLT推送復制任務;

■ 同為CLT推送的復制任務,則以配置為準。

task中的等待隊列是一個優先級隊列。

在worker端,worker會優先將instance資源分配給優先級更高的請求。如果曾經有高級優先級的請求來過,那worker會暫將低優先級的請求hold住一段時間,如果沒有高優先級的請求繼續過來,再分配instance給低優先級的請求。

總結與未來展望

總結一下,ODPS的跨集群復制,完成了下面幾個技術難點:

■ 支持數據的準實時跨集群復制;

■ 動態配置作業對跨集群數據的依賴;

■ 根據任務的優先級等合理管理和分配資源。

同時,跨集群數據復制也為未來數據業務長期發展打下了堅實的基礎:

■ 突破了單集群的數據存儲上限。由于目前主流分布式系統Master/Slave的結構,單集群受限于Master的內存和處理能力上限,而現在數據可以存儲在多個集群上,不再受單集群的限制;

■ 可以實現多機房數據容災,將來可以動態的跨機房備份重要數據;

■ 實現跨數據中心動態負載均衡,將熱點集群上的數據和作業動態遷移到空閑集群,緩解熱點集群的壓力,提高空閑集群的使用率;

■ 對于響應速度要求比較高的請求,可以在多個集群同時調度這個作業,將響應最快的請求返回給用戶。

跨集群復制上線后,2013年8月下旬,在十天之內,生成集群PA完成了數十PB數據的分批次遷移,安全無事故地遷移到了生成集群PB,全程沒有人 工干預,完美解決了這個項目的第一個挑戰。隨后,在每天的生成作業中,由于數據在兩個集群之間互有依賴,每天有上十TB的線上數據通過跨集群復制在兩個集 群之間同步更新,解決了這個項目的第二個挑戰。線上監控數據表明,每天數十TB的修改數據均在20分鐘之內通過跨集群復制同步到了其他需要的集群。沒有作 業由于訪問其他集群的依賴數據而失敗。隨著數據規模的增大,ODPS的生產集群早已從2個增加到了更多個。不同的生產集群分布在不同的地域,跨集群復制承 擔的責任也越來越重大。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!