課程筆記分布式消息通訊之kafka的實現(xiàn)原理_第1頁
課程筆記分布式消息通訊之kafka的實現(xiàn)原理_第2頁
課程筆記分布式消息通訊之kafka的實現(xiàn)原理_第3頁
課程筆記分布式消息通訊之kafka的實現(xiàn)原理_第4頁
課程筆記分布式消息通訊之kafka的實現(xiàn)原理_第5頁
已閱讀5頁,還剩7頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

1、如何處理所有的Replica不工作的情況在ISR中至少有一個follower時,Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,但如果某個Partition的所 有Replica都宕機(jī)了,就無法保證數(shù)據(jù)不丟失了1. 等待ISR中的任一個Replica“活”過來,并且選它作為Leader2. 選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader這就需要在可用性和一致性當(dāng)中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者數(shù)據(jù)都丟失了,這個Partition將永遠(yuǎn)不

2、可用。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它并不保證已 經(jīng)包含了所有已commit的消息,它也會成為Leader而作為consumer的數(shù)據(jù)源(前文有說明,所有讀 寫都由Leader完成)。在我們課堂講的版本中,使用的是第一種策略。副本數(shù)據(jù)同步原理了解了副本的協(xié)同過程以后,還有一個最重要的機(jī)制,就是數(shù)據(jù)的同步過程。它需要解決1. 怎么傳播消息2. 在向消息發(fā)送端返回ack之前需要保證多少個Replica已經(jīng)接收到這個消息數(shù)據(jù)的處理過程是下圖中,深紅色部分表示test_replica分區(qū)的leader副本,另外兩個節(jié)點上淺

3、色部分表示follower副本Producer在發(fā)布消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader get/brokers/topics/partitions/2/state ,然后無論該Topic的Replication Factor為多少(也即該Partition有多少個Replica),Producer只將該消息發(fā)送到該Partition的Leader。Leader會將該消息寫入其本地Log。每個Follower都從Leader pull數(shù)據(jù)。這種,F(xiàn)ollower的數(shù)據(jù)順序與Leader保持一致。Follower在收到該消息并寫入其Log

4、后,向Leader發(fā)送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認(rèn)為已經(jīng)commit了,Leader將增加HW(HighWatermark)并且向Producer發(fā)送ACK。LEO:即日志末端位移(log end oset),記錄了該副本底層日志(log)中下一條消息的位移值。注意是下一條消息!也就是說,如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是0, 9。另外, leader LEO和follower LEO的更新是有區(qū)別的。我們后面會詳細(xì)說HW:即上面提到的水位值(Hight Water)。對于同一個副本對象而言,其HW值不會大于LEO

5、值。小于等于HW值的所有消息都被認(rèn)為是“已備份”的(replicated)。同理,leader副本和follower副本的HW更新是有區(qū)別的通過下面這幅圖來表達(dá)LEO、HW的含義,隨著follower副本不斷和leader副本進(jìn)行數(shù)據(jù)同步,follower 副本的LEO會主鍵后移并且追趕到leader副本,這個追趕上的判斷標(biāo)準(zhǔn)是當(dāng)前副本的LEO是否大于或者 等于leader副本的HW,這個追趕上也會使得被踢出的follower副本重新加入到ISR集合中。另外, 假如說下圖中的最右側(cè)的follower副本被踢出ISR集合,也會導(dǎo)致這個分區(qū)的HW發(fā)生變化,變成了3初始狀態(tài)初始狀態(tài)下,leader和

6、follower的HW和LEO都是0,leader副本會保存remote LEO,表示所有follower LEO,也會被初始化為0,這個時候,producer沒有發(fā)送消息。follower會不斷地個leader發(fā)送FETCH 請求,但是因為沒有數(shù)據(jù),這個請求會被leader寄存,當(dāng)在指定的時間之后會強(qiáng)制完成請求,這個時間配置是(replica.fetch.wait.max.ms),如果在指定時間內(nèi)producer有消息發(fā)送過來,那么kafka會喚醒fetch請求,讓leader繼續(xù)處理數(shù)據(jù)的同步處理會分兩種情況,這兩種情況下處理方式是不一樣的第一種是leader處理完producer請求之后

7、,follower發(fā)送一個fetch請求過來第二種是follower阻塞在leader指定時間之內(nèi),leader副本收到producer的請求。第一種情況生產(chǎn)者發(fā)送一條消息leader處理完producer請求之后,follower發(fā)送一個fetch請求過來 。狀態(tài)圖如下leader副本收到請求以后,會做幾件事情1. 把消息追加到log文件,同時更新leader副本的LEO2. 嘗試更新leader HW值。這個時候由于follower副本還沒有發(fā)送fetch請求,那么leader的remote LEO仍然是0。leader會比較 的LEO以及remote LEO的值發(fā)現(xiàn)最小值是0,與HW的值

8、相同,所以 更新HWfollower fetch消息follower 發(fā)送fetch請求,leader副本的處理邏輯是:1. log數(shù)據(jù)、更新remote LEO=0(follower還沒有寫入這條消息,這個值是根據(jù)follower的fetch請求中的oset來確定的)2. 嘗試更新HW,因為這個時候LEO和remoteLEO還是不一致,所以仍然是HW=03. 把消息內(nèi)容和當(dāng)前分區(qū)的HW值發(fā)送給follower副本follower副本收到response以后1. 將消息寫入到本地log,同時更新follower的LEO2. 更新follower HW,本地的LEO和leader返回的HW進(jìn)行比

9、較取小的值,所以仍然是0第一次交互結(jié)束以后,HW仍然還是0,這個值會在下一次follower發(fā)起fetch請求時被更新follower發(fā)第二次fetch請求,leader收到請求以后1. log數(shù)據(jù)2. 更新remote LEO=1, 因為這次fetch攜帶的oset是1.3. 更新當(dāng)前分區(qū)的HW,這個時候leader LEO和remote LEO都是1,所以HW的值也更新為14. 把數(shù)據(jù)和當(dāng)前分區(qū)的HW值返回給follower副本,這個時候如果沒有數(shù)據(jù),則返回為空follower副本收到response以后1. 如果有數(shù)據(jù)則寫本地日志,并且更新LEO2. 更新follower的HW值到目前為

10、止,數(shù)據(jù)的同步就完成了,意味著消費端能夠消費oset=1這條消息。第二種情況前面說過,由于leader副本暫時沒有數(shù)據(jù)過來,所以follower的fetch會被阻塞,直到等待超時或者 leader接收到新的數(shù)據(jù)。當(dāng)leader收到請求以后會喚醒處于阻塞的fetch請求。處理過程基本上和前面說 的一致1. leader將消息寫入本地日志,更新Leader的LEO2. 喚醒follower的fetch請求3. 更新HWkafka使用HW和LEO的方式來實現(xiàn)副本數(shù)據(jù)的同步,本身是一個好的設(shè)計,但是在這個地方會存在一個數(shù)據(jù)丟失的問題,當(dāng)然這個丟失只出現(xiàn)在特定的背景下。我們回想一下,HW的值是在新的一輪

11、FETCH 中才會被更新。我們分析下這個過程為什么會出現(xiàn)數(shù)據(jù)丟失數(shù)據(jù)丟失的問題前提min.insync.replicas=1 /設(shè)定ISR中的最小副本數(shù)是多少,默認(rèn)值為1(在perties中配置), 并且acks參數(shù)設(shè)置為-1(表示需要所有副本確認(rèn))時,此參數(shù)才生效.表達(dá)的含義是,至少需要多少個副本同步才能表示消息是提交的, 所以,當(dāng) min.insync.replicas=1 的時候,一旦消息被寫入leader端log即被認(rèn)為是“已提交”,而延遲一輪FETCH RPC更新HW值的設(shè)計使得follower HW值是異步延遲更新的,倘若在這個過程中l(wèi)eader發(fā)生變更,那么成

12、為新leader的follower的HW值就有可能是過期的,使得clients端認(rèn)為是成功提交的消息被刪除。producer的ackacks配置表示producer發(fā)送消息到broker上以后的確認(rèn)值。有三個可選項0:表示producer不需要等待broker的消息確認(rèn)。這個選項時延最小但同時風(fēng)險最大(因為當(dāng)server宕 機(jī)時,數(shù)據(jù)將會丟失)。1:表示producer只需要獲得kafka集群中的leader節(jié)點確認(rèn)即可,這個選擇時延較小同時確保了leader節(jié)點確認(rèn)接收成功。all(-1):需要ISR中所有的Replica給予接收確認(rèn),速度最慢,安全性最高,但是由于ISR可能會縮小到僅包含一

13、個Replica,所以設(shè)置參數(shù)為all并不能一定避免數(shù)據(jù)丟失,數(shù)據(jù)丟失的解決方案在kafka版本之后,引入了一個leader epoch來解決這個問題,所謂的leader epoch實際上是一對值(epoch,oset),epoch代表leader的版本號,從0開始遞增,當(dāng)leader發(fā)生過變更,epoch 就+1,而oset則是對應(yīng)這個epoch版本的leader寫入第一條消息的oset,比如(0,0), (1,50) ,表示第一個leader從oset=0開始寫消息,一共寫了50條。第二個leader版本號是1,從oset=50開始寫,這個信息會持久化在對應(yīng)的分區(qū)的本地磁盤

14、上,文件名是 /tmp/kafka-。leader broker中會保存這樣一個緩存,并且定期寫入到checkpoint文件中當(dāng)leader寫log時它會嘗試更新整個緩存: 如果這個leader首次寫消息,則會在緩存中增加一個條目;否則就不做更新。而每次副本重新成為leader時會查詢這部分緩存,獲取出對應(yīng)leader版本的oset我們基于同樣的情況來分析,follower宕機(jī)并且恢復(fù)之后,有兩種情況,如果這個時候leader副本沒有掛,也就是意味著沒有發(fā)生leader選舉,那么follower恢復(fù)之后并不會去截斷自己的日志,而是先發(fā)送 一個OsetsForLeaderEpochRequest

15、請求給到leader副本,leader副本收到請求之后返回當(dāng)前的LEO。如果follower副本的leaderEpoch和leader副本的epoch相同, leader的leo只可能大于或者等于follower副本的leo值,所以這個時候不會發(fā)生截斷如果follower副本和leader副本的epoch值不同,那么leader副本會查找follower副本傳過來的epoch+1在本地文件中存儲的StartOset返回給follower副本,也就是新leader副本的LEO。這樣也避免了數(shù)據(jù)丟失的問題如果leader副本宕機(jī)了重新選舉新的leader,那么原本的follower副本就會變成le

16、ader,意味著epoch 從0變成1,使得原本follower副本中LEO的值的到了保留。Leader副本的選舉過程1. KafkaController會監(jiān)聽ZooKeeper的/brokers/ids節(jié)點路徑,一旦發(fā)現(xiàn)有broker掛了,執(zhí)行下面的邏輯。這里暫時先不考慮KafkaController所在broker掛了的情況,KafkaController掛了,各個broker會重新leader選舉出新的KafkaController2. leader副本在該broker上的分區(qū)就要重新進(jìn)行l(wèi)eader選舉,目前的選舉策略是a) 優(yōu)先從isr列表中選出第一個作為leader副本,這個叫優(yōu)先

17、副本,理想情況下有限副本就是該分區(qū)的leader副本b) 如果isr列表為空,則查看該topic的unclean.leader.election.enable配置。unclean.leader.election.enable:為true則代表允許選用非isr列表的副本作為leader,那么此 時就意味著數(shù)據(jù)可能丟失,為log/topic/leader-epoch-checkpointfalse的話,則表示不允許,直接拋出NoReplicaOnlineException異常,造成leader副本選舉失敗。c) 如果上述配置為true,則從其他副本中選出一個作為leader副本,并且isr列表只包

18、含該leader 副本。一旦選舉成功,則將選舉后的leader和isr和其他副本信息寫入到該分區(qū)的對應(yīng)的zk路徑上。消息的存儲消息發(fā)送端發(fā)送消息到broker上以后,消息是如何持久化的呢?那么接下來去分析下消息的存儲首先我們需要了解的是,kafka是使用日志文件的方式來保存生產(chǎn)者和發(fā)送者的消息,每條消息都有一 個oset值來表示它在分區(qū)中的偏移量。Kafka中存儲的一般都是海量的消息數(shù)據(jù),為了避免日志文件過大,Log并不是直接對應(yīng)在一個磁盤上的日志文件,而是對應(yīng)磁盤上的一個目錄,這個目錄的命名規(guī)則 是_消息的文件存儲機(jī)制一個topic的多個partition在物理磁盤上的保存路徑,路徑保存在

19、/tmp/kafka-logs/topic_partition,包含日志文件、索引文件和時間索引文件kafka是通過分段的方式將Log分為多個LogSegment,LogSegment是一個邏輯上的概念,一個LogSegment對應(yīng)磁盤上的一個日志文件和一個索引文件,其中日志文件是用來記錄消息的。索引文件是用來保存消息的索引。那么這個LogSegment是什么呢?LogSegment假設(shè)kafka以partition為最小存儲單位,那么我們可以想象當(dāng)kafka producer不斷發(fā)送消息,必然會引起partition文件的無線擴(kuò)張,這樣對于消息文件的維護(hù)以及被消費的消息的清理帶來非常大的挑戰(zhàn)

20、,所 以kafka 以segment為單位又把partition進(jìn)行細(xì)分。每個partition相當(dāng)于一個巨型文件被平均分配到多個大小相等的segment數(shù)據(jù)文件中(每個segment文件中的消息不一定相等),這種特性方便已經(jīng)被消 費的消息的清理,提高磁盤的利用率。log.segment.bytes=107370 (設(shè)置分段大小),默認(rèn)是1gb,我們把這個值調(diào)小以后,可以看到日志分段的效果抽取其中3個分段來進(jìn)行分析segment le由2大部分組成,分別為index le和data le,此2個文件一一對應(yīng),成對出現(xiàn),后綴.index和“.log”分別表示為segment索引文件、數(shù)據(jù)文件.s

21、egment文件命名規(guī)則:partion全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的oset值進(jìn)行遞增。數(shù)值最大為64位long大小,20位數(shù)字字符長度,沒有數(shù)字用0填充查看segment文件命名規(guī)則通過下面這條命令可以看到kafka消息日志的內(nèi)容sh kafka-run-class.sh kafka.tools.DumpLogSegments -files /tmp/kafka-logs/test- 0/00000000000000000000.log -print-data-log假如第一個log文件的最后一個oset為:5376

22、,所以下一個segment的文件命名為: 00000000000000005376.log。對應(yīng)的index為00000000000000005376.indexsegment中index和log的對應(yīng)從所有分段中,找一個分段進(jìn)行分析為了提高查找消息的性能,為每一個日志文件添加2個索引索引文件:OsetIndex 和 TimeIndex,分別對應(yīng).index以及.timeindex, TimeIndex索引文件格式:它是 時間戳和相對oset查看索引內(nèi)容:sh kafka-run-class.sh kafka.tools.DumpLogSegments -files /tmp/kafka-lo

23、gs/test- 0/00000000000000000000.index -print-data-log,index中 了索引以及物理偏移量。 log 了消息的內(nèi)容。索引文件的元數(shù)據(jù)執(zhí)行對應(yīng)數(shù)據(jù)文件中message的物理偏移地址。舉個簡單的案例來說,以4053,80899為例,在log文件中,對應(yīng)的是第4053條 ,物理偏移量(position)為80899. position是ByteBuer的指針位置在partition中如何通過oset查找message查找的算法是1. 根據(jù)oset的值,查找segment段中的index索引文件。由于索引文件命名是以上一個文件的最后一個oset進(jìn)行命

24、名的,所以,使用二分查找算法能夠根據(jù)oset快速定位到指定的索引文件。2. 找到索引文件后,根據(jù)oset進(jìn)行定位,找到索引文件中的符合范圍的索引。(kafka采用稀疏索引的方式來提高查找性能)3. 得到position以后,再到對應(yīng)的log文件中,從position出開始查找oset對應(yīng)的消息,將每條消息的oset與目標(biāo)oset進(jìn)行比較,直到找到消息比如說,我們要查找oset=2490這條消息,那么先找到00000000000000000000.index, 然后找到2487,49111這個索引,再到log文件中,根據(jù)49111這個position開始查找,比較每條消息的oset是否大于等于2

25、490。最后查找到對應(yīng)的消息以后返回Log文件的消息內(nèi)容分析前面我們通過kafka提供 令,可以查看二進(jìn)制的日志文件 ,一條消息,會包含很多的字段。offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize:-1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTran ional: false headerKeys: payload: message_5371ose

26、t和position這兩個前面已經(jīng)講過了、 createTime表示創(chuàng)建時間、keysize和valuesize表示key和value的大小、 compresscodec表示壓縮編碼、payload:表示消息的具體內(nèi)容日志的清除策略以及壓縮策略日志清除策略前面提到過,日志的分段。日志的 策略有兩個1. 根據(jù)消息的保留時間,當(dāng)消息在kafka中保存的時間超過了指定的時間,就會觸發(fā) 過程2. 根據(jù)topic 的數(shù)據(jù)大小,當(dāng)topic所占的日志文件大小大于一定的閥值,則可以開始刪除最舊的消息。kafka會啟動一個 線程,定期檢查是否 可以刪除的消息通過log.retention.bytes和log.

27、retention.hours這兩個參數(shù)來設(shè)置,當(dāng)其中任意一個達(dá)到要求,都會執(zhí)行刪除。默認(rèn)的保留時間是:7天日志壓縮策略Kafka還提供了“日志壓縮(Log Compaction)”功能,通過這個功能可以有效的減少日志文件的大小, 緩解磁盤緊張的情況,在很多實際場景中,消息的key和value的值之間的對應(yīng) 是不斷變化的,就像數(shù)據(jù)庫中的數(shù)據(jù)會不斷被修改一樣,消費者只關(guān)心key對應(yīng)的最新的value。因此,我們可以開啟kafka 的日志壓縮功能,服務(wù)端會在 啟動啟動Cleaner線程池,定期將相同的key進(jìn)行合并,只保留最新的value值。日志的壓縮原理是磁盤 的性能問題磁盤 的性能優(yōu)化我們現(xiàn)在

28、大部分企業(yè)仍然用的是機(jī)械結(jié)構(gòu)的磁盤,如果把消息以隨機(jī)的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要找到對應(yīng)的柱面、磁頭以及對應(yīng)的扇區(qū);這個過程相對內(nèi)存來說會消耗大量時間,為了規(guī)避隨機(jī)讀寫帶來的時間消耗,kafka采用順序?qū)懙姆绞?數(shù)據(jù)。即使是這樣,但是頻繁的I/O操作仍然會造成磁盤的性能瓶頸零拷貝消息從發(fā)送到落地保存,broker維護(hù)的消息日志本身就是文件目錄,每個文件都是二進(jìn)制保存,生產(chǎn)者和消費者使用相同的格式來處理。在消費者獲取消息時,服務(wù)器先從硬盤 數(shù)據(jù)到內(nèi)存,然后把內(nèi)存中的數(shù)據(jù)原封不動的通過socket發(fā)送給消費者。雖然這個操作描述起來很簡單

29、,但實際上經(jīng)歷了很多步驟。操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存,一方面能夠減少單個文件內(nèi)容的大小,另一方面,方便kafka進(jìn)行日志 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中 操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū) 到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出通過“零拷貝”技術(shù),可以去掉這些沒必要的數(shù)據(jù) 操作,同時也會減少上下文切換次數(shù)?,F(xiàn)代的unix 操作系統(tǒng)提供一個優(yōu)化的代碼路徑,用于將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)絪ocket;在Linux中,是通過sendle系統(tǒng)調(diào)用來完成的。Java提供了 這個系統(tǒng)調(diào)用的 :FileChannel.transferTo

30、 API使用sendle,只需要一次拷貝就行, 操作系統(tǒng)將數(shù)據(jù)直接從頁緩存發(fā)送到網(wǎng)絡(luò)上。所以在這個優(yōu)化的路徑中,只有最 數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的頁緩存頁緩存是操作系統(tǒng)實現(xiàn)的一種主要的磁盤緩存,但凡設(shè)計到緩存的,基本都是為了提升i/o性能,所以頁 緩存是用來減少磁盤I/O操作的。磁盤高速緩存有兩個重要因素:第一,磁盤的速度要遠(yuǎn)低于內(nèi)存的速度,若從處理器L1和L2高速緩存則速度更快。第二,數(shù)據(jù)一旦被,就很有可能短時間內(nèi)再次。正是由于基于內(nèi)存比磁盤快的多,所以磁盤的內(nèi)存緩存將給系統(tǒng)性能帶來質(zhì)的飛越。當(dāng) 一 個進(jìn)程準(zhǔn)備磁盤上的文件內(nèi)容時, 操作系統(tǒng)會先查的數(shù)據(jù)所在的頁(page)是否在頁緩存(pagecache)中,如果(命中)則直接返回數(shù)據(jù), 從而避免了對物理磁盤的I/0操作;如果沒有命中, 則操作系統(tǒng)會向磁盤發(fā)起請求并將的數(shù)據(jù)頁存入頁緩存, 之后再將數(shù)據(jù)返回給進(jìn)程。同樣,如果 一 個進(jìn)程需要將數(shù)據(jù)寫入磁盤, 那么操作系統(tǒng)也會檢測數(shù)據(jù)對應(yīng)的頁是否在頁緩存中,如果不, 則會先在頁緩存中添加相應(yīng)

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論