Kafka技術內幕-日志壓縮
Kafka技術內幕樣章:Kafka的日志壓縮(LogCompaction)
3.3 日志管理類的后臺線程
分布式存儲系統除了要保證客戶端寫請求流程的正確性,節點可能會非正常宕機或者需要重啟,在啟動的時候必須要能夠正常地加載/恢復已有的數據,日志管理類在創建的時候要加載已有的所有日志文件,這和創建Log時要加載所有的Segment是類似的。 LogManager 的 logDirs 參數對應了 log.dirs 配置項,每個TopicPartition文件夾都對應一個Log實例,所有的Partition文件夾都在日志目錄下,當成功加載完所有的Log實例后logs才可以被日志管理類真正地用在戰場上。
假設logDirs= /tmp/kafka_logs1,/tmp/kafka_logs2 ,logs1下有[t0-0,t0-1,t1-2],logs2下有[t0-2,t1-0,t1-1],圖3-26的logDir指的是Log對象的dir,和log.dirs是不同的概念,可以認為所有Log的dir都是在每個log.dirs下,如果把Log.dir叫做Partition級別的文件夾,則checkpoint文件和Partition文件夾是同一層級。
圖3-26 日志的組織方式和對應的數據結構
class LogManager(val logDirs: Array[File]){
val logs = new Pool[TopicAndPartition, Log]()
val recoveryPointCheckpoints=logDirs.map((_,new OffsetCheckpoint(new File(_,"checkpoint"))))
loadLogs() //啟動LogManager實例時,如果已經存在日志文件,要把它們加載到內存中
private def loadLogs(): Unit = {
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
for (dir <- this.logDirs) { //按照log.dirs創建線程池,如果只配置一個目錄就只有一個線程池
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
//checkpoint文件一個日志目錄只有一個,并不是每個Partition級別!
//既然所有Partition公共一個checkpoint文件,那么文件內容當然要有Partition信息
var recoveryPoints:Map[TopicAndPartition,Long]=recoveryPointCheckpoints(dir).read
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList //日志目錄下的所有文件/文件夾
logDir <- dirContent if logDir.isDirectory //Partition文件夾,忽略日志目錄下的文件
} yield {
CoreUtils.runnable { //每個Partition文件夾創建一個線程,由線程池執行
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) //分區的恢復點
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) //恢復Log
this.logs.put(topicPartition, current) //這里放入logs集合中,所有分區的Log滿血復活
}
}
jobsForDir.map(pool.submit).toSeq //提交任務
}
}
//只有調用loadLogs后,logs才有值,后面的操作都依賴于logs
def allLogs(): Iterable[Log] = logs.values
def logsByDir = logs.groupBy{case (_,log)=>log.dir.getParent}
val cleaner: LogCleaner = new LogCleaner(cleanerConfig,logDirs,logs)
def startup() {
scheduler.schedule("log-retention", cleanupLogs)
scheduler.schedule("log-flusher", flushDirtyLogs)
scheduler.schedule("recovery-point-checkpoint",checkpointRecoveryPointOffsets)
if(cleanerConfig.enableCleaner) cleaner.startup()
}
}
LogManager.startup()啟動后會在后臺運行多個定時任務和線程,表3-7列舉了這些線程的方法和用途,這些線程最后都會操作Log實例(幸好我們已經成功地加載了logs),畢竟LogManager從名字來看就是要對Log進行管理(把checkpoint也看做是日志文件的一部分,因為它伴隨著日志而生,所以也在LogManager的管理范疇內)。
線程/任務 | 方法 | 作用 |
---|---|---|
日志保留任務(log retention) | cleanupLogs | 刪除失效的Segment或者為了控制日志文件大小要刪除一些文件 |
日志刷寫任務(log flusher) | flushDirtyLogs | 根據時間策略,將還在操作系統緩存層的文件刷寫到磁盤上 |
檢查點刷寫任務(checkpoint) | checkpointRecoveryPointOffsets | 定時地將checkpoint恢復點狀態寫到文件中 |
日志清理線程(cleaner) | cleaner.startup() | 日志壓縮,針對帶有key的消息的清理策略 |
表3-7 日志管理類的后臺線程
日志文件和checkpoint的刷寫 flush 都只是將當前最新的數據寫到磁盤上。checkpoint檢查點也叫做恢復點(顧名思義是從指定的點開始恢復數據),log.dirs的每個目錄下只有一個所有Partition共享的全局checkpoint文件。
//日志文件刷寫任務
private def flushDirtyLogs() = {
for ((topicAndPartition, log) <- logs) {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
if(timeSinceLastFlush >= log.config.flushMs) log.flush
}
}
//checkpoint文件刷寫任務
def checkpointRecoveryPointOffsets() {
this.logDirs.foreach(checkpointLogsInDir)
}
private def checkpointLogsInDir(dir: File): Unit = {
val recoveryPoints = this.logsByDir.get(dir.toString) //checkpoint是log.dirs目錄級別
//logsByDir對于每個dir都有多個Partition對應的Log,所以mapValues對每個Log獲取recoveryPoint
recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}
//只有flush的時候才會更新恢復點,不過flush并不是每次寫都會發生的
def flush(offset: Long) : Unit = {
if (offset <= this.recoveryPoint) return
for(segment<-logSegments(this.recoveryPoint,offset)) //選擇恢復點和當前之間的Segment
segment.flush() //會分別刷寫log數據文件和index索引文件(調用底層的fsync)
if(offset > this.recoveryPoint) {
this.recoveryPoint = offset //recoveryPoint實際上是offset
lastflushedTime.set(time.milliseconds)
}
}
為什么所有Partition共用一個checkpoint文件,而不是每個Partition都有自己的checkpoint文件,因為checkpoint數據量不是很大,那么為什么前面分析的索引文件則是以Partition級別,甚至每個Segment都有對應的數據文件和索引文件,索引本身也是offset,它和checkpoint數據量也都是不大的啊,那么是不是也可以每個Partition只有一個索引文件,而不是每個Segment一個索引文件,實際上索引文件的用途是為了更快地查詢,該省的地方還是要節約資源(所有Partition只有一個checkpoint文件),不該節省的還是要大方點(每個Segment一個索引文件),做人何嘗不是這個道理。
3.3.1 日志清理
清理日志實際上是清理過期的Segment,或者日志文件太大了需要刪除最舊的數據,使得整體的日志文件大小不超過指定的值。舉例用隊列來緩存所有的請求任務,每個任務都有一定的存活時間,超過時間后任務就應該自動被刪除掉,同時隊列也有一個上限,不能無限制地添加任務,如果超過指定大小時,就要把最舊的任務刪除掉,以維持隊列的固定大小,這樣可以保證隊列不至于無限大導致系統資源被耗盡。
//日志清理任務
def cleanupLogs() {
for(log <- allLogs; if !log.config.compact)
cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
private def cleanupExpiredSegments(log: Log): Int = {
log.deleteOldSegments(time.milliseconds-_.lastModified>log.config.retentionMs)
}
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
var diff = log.size - log.config.retentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size
true
} else false
}
log.deleteOldSegments(shouldDelete)
}
Log的deleteOldSegments方法接收一個高階函數,參數是Segment,返回布爾類型表示這個Segment是否需要被刪除,在LogManager中調用的地方并沒有傳遞Segment,而是在Log中獲取每個Segment。這是因為LogManager無法跨過Log直接和Segment通信,LogManager無法直接管理Segment,Segment只屬于Log,只能由Log管理。
def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
//logSegments是Log的所有Segment,s是每個Segment
val deletable = logSegments.takeWhile(s => predicate(s))
if(segments.size == numToDelete) roll()
deletable.foreach(deleteSegment(_))
}
private def deleteSegment(segment: LogSegment) {
segments.remove(segment.baseOffset) //刪除數據結構
asyncDeleteSegment(segment) //異步刪除Segment
}
private def asyncDeleteSegment(segment: LogSegment) {
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
def deleteSeg() = segment.delete() //和flush一樣最后調用log和index.delete
scheduler.schedule("delete-file", deleteSeg)
}
清理日志有兩種策略,一種是上面的cleanupLogs根據時間或大小策略(粗粒度),還有一種是針對每個key的日志刪除策略(細粒度)即LogCleaner方式,如果消息沒有key,那只能采用第一種清理策略了。刪除策略是以topic為級別的,所以不同的topic可以設置不同的刪除策略,所以一個集群中可能存在有些topic按照粗粒度模式刪除,有些則按照細粒度模式刪除,完全取決于你的業務需求(當然要不要給消息設置key是一個關鍵決策)。
3.3.2 日志壓縮
不管是傳統的RDBMS還是分布式的NoSQL存儲在數據庫中的數據總是會更新的,相同key的新記錄更新數據的方式簡單來說有兩種:直接更新(找到數據庫中的已有位置以最新的值替換舊的值),或者以追加的方式(保留舊的值,查詢時再合并,或者也有一個后臺線程會定期合并)。采用追加記錄的做法在節點崩潰時可以用于恢復數據,還有一個好處是寫性能很高,因為這樣在寫的時候就不需要查詢操作,這也是表3-8中很多和存儲相關的分布式系統都采用這種方式的原因,它的代價就是需要有Compaction操作來保證相同key的多條記錄需要合并。
分布式系統 | 更新數據追加到哪里 | 數據文件 | 是否需要Compaction |
---|---|---|---|
ZooKeeper | log | snapshot | 不需要,因為數據量不大 |
Redis | aof | rdb | 不需要,因為是內存數據庫 |
Cassandra | commit log | data.db | 需要,數據存在本地文件 |
HBase | commit log | HFile | 需要,數據存在HDFS |
Kafka | commit log | commit log | 需要,數據存在Partition的多個Segment里 |
表3-8 分布式系統的更新操作用commit log保存
Kafka中如果消息有key,相同key的消息在不同時刻有不同的值,則只允許存在最新的一條消息,這就好比傳統數據庫的update操作,查詢結果一定是最近update的那一條,而不應該查詢出多條或者查詢出舊的記錄,當然對于HBase/Cassandra這種支持多版本的數據庫而言,update操作可能導致添加新的列,查詢時是合并的結果而不一定就是最新的記錄。圖3-27中示例了多條消息,一旦key已經存在,相同key的舊的消息會被刪除,新的被保留。
圖3-27 更新操作要刪除舊的消息
Kafka的更新操作也采用追加(commit log就是追加)也需要有Compaction操作,當然它并不是像上面那樣一條消息一條消息地比較,通常Compaction是對多個文件做一次整體的壓縮,圖3-28是Log的壓縮操作前后示例,壓縮確保了相同key只存在一個最新的value,舊的value在壓縮過程會被刪除掉。
圖3-28 LogCompaction的過程
每個Partition的(Leader Replica的)Log有多個Segment文件,為了不影響正在寫的最近的那個activeSegment,日志壓縮不應該清理activeSegment,而是清理剩下的所有Segment。清理Segment時也不是一個個Segment慢吞吞地清理,也不是一次性所有Segment想要全部清理,而是幾個Segment分成一組,分批清理。清理線程會占用一定的CPU,因為要讀取已有的Segment并壓縮成新的Segment,為了不影響其他組件(主要是讀,因為讀操作會讀取舊的Segment,而寫不會被影響因為寫操作只往activeSegment寫,而activeSegment不會被清理),可以設置清理線程的線程個數,同時Kakfa還支持Throttler限速(讀取舊的Segment時和寫入新的Segment都可以限速)。當然也并不是每個Partition在同一時間都進行清理,而是選擇其中最需要被清理的Partition。
清理/壓縮指的是刪除舊的更新操作,只保留最近的一個更新操作,清理方式有多種,比如JVM中的垃圾回收算法將存活的對象拷貝/整理到指定的區域,HBase/Cassandra的Compaction會將多個數據文件合并/整理成新的數據文件。Kafka的LogCleaner清理Log時會將所有的Segment在CleanerPoint清理點位置分成Tail和Head兩部分,圖3-29中每條消息所在的Segment并沒有畫出來(這些消息可能在不同的Segment里),因為清理是以Partition為級別,就淡化了Segment的邊界問題,不過具體的清理動作還是要面向Segment,因為復制消息時不得不面對Segment文件。
圖3-29 Log包括Tail和Head兩部分
清理后Log Head部分每條消息的offset都是逐漸遞增的,而Tail部分消息的offset是斷斷續續的。 LogToClean 表示需要被清理的日志,其中firstDirtyOffset會作為Tail和Head的分界點,圖3-20中舉例了在一個Log的分界點發生Compaction的步驟。
圖3-30 日志分成Tail和Head的消息壓縮步驟
每個Partition的Log都對應一個LogToClean對象,在選擇哪個Partition需要優先做Compaction操作時是依據cleanableRatio的比率即Head部分大小(dirtyBytes)除于總大小中最大的,假設日志文件一樣大,firstDirtyOffset越小,dirtyBytes就越大。而firstDirtyOffset每次Compaction后都會增加,所以實際上選擇算法是優先選擇還沒有發生或者發生次數比較少的Partition,因為這樣的Partition的firstDirtyOffset沒有機會增加太多。
case class LogToClean(topicPartition: TopicAndPartition, log: Log,
firstDirtyOffset: Long) extends Ordered[LogToClean] {
val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum
val dirtyBytes = log.logSegments(firstDirtyOffset,
math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
val cleanableRatio = dirtyBytes / totalBytes.toDouble
def totalBytes = cleanBytes + dirtyBytes
override def compare(th:LogToClean)=math.signum(this.cleanableRatio-th.cleanableRatio)
}
不僅僅是更新需要清理舊的數據,刪除操作也需要清理,生產者客戶端如果發送的消息key的value是空的,表示要刪除這條消息,發生在刪除標記之前的記錄都需要刪除掉,而發生在刪除標記之后的記錄則不會被刪除。日志壓縮保證了:
-
任何消費者如果能夠趕上Log的Head部分,它就會看到寫入的每條消息,這些消息都是順序遞增(中間不會間斷)的offset
-
總是維持消息的有序性,壓縮并不會對消息進行重新排序,而是移除一些消息
-
每條消息的offset永遠不會被改變,它是日志文件標識位置的永久編號
-
讀取/消費時如果從最開始的offset=0開始,那么至少可以看到所有記錄按照它們寫入的順序得到的最終狀態(狀態指的是value,相同key不同value,最終的狀態以最新的value為準):因為這種場景下寫入順序和讀取順序是一致的,寫入時和讀取時offset都是不斷遞增。舉例寫入key1的value在offset=1和offst=5的值分別是v1和v2,那么讀取到offset=1時,最終的狀態(value值)是v1,讀取到offset=5時,最終狀態是v2(不能指望說讀取到offset=1時就要求狀態是v2)
來自: zqhxuyuan.github.io/2016/05/13/2016-5-13-Kafka-Book-Sample-LogCompaction/