技術(shù)學(xué)習(xí)視頻教程網(wǎng)盤資源整理 https://pan.baidu.com/s/13dbR69NLIEyP1tQyRTl4xwkafka如何保證不丟消息ps:這篇文章自我感覺說的很大白話了 !希望你們看過了之后能有收獲。 生產(chǎn)者丟失消息的情況生產(chǎn)者(Producer) 調(diào)用 所以,我們不能默認(rèn)在調(diào)用 SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get(); 但是一般不推薦這么做!可以采用為其添加回調(diào)函數(shù)的形式,示例代碼如下:
如果消息發(fā)送失敗的話,我們檢查失敗的原因之后重新發(fā)送即可! 另外這里推薦為 Producer 的 消費(fèi)者丟失消息的情況我們知道消息在被追加到 Partition(分區(qū))的時(shí)候都會分配一個(gè)特定的偏移量(offset)。偏移量(offset)表示 Consumer 當(dāng)前消費(fèi)到的 Partition(分區(qū))的所在的位置。Kafka 通過偏移量(offset)可以保證消息在分區(qū)內(nèi)的順序性。 當(dāng)消費(fèi)者拉取到了分區(qū)的某個(gè)消息之后,消費(fèi)者會自動提交了 offset。自動提交的話會有一個(gè)問題,試想一下,當(dāng)消費(fèi)者剛拿到這個(gè)消息準(zhǔn)備進(jìn)行真正消費(fèi)的時(shí)候,突然掛掉了,消息實(shí)際上并沒有被消費(fèi),但是 offset 卻被自動提交了。 解決辦法也比較粗暴,我們手動關(guān)閉自動提交 offset,每次在真正消費(fèi)完消息之后之后再自己手動提交 offset 。 但是,細(xì)心的朋友一定會發(fā)現(xiàn),這樣會帶來消息被重新消費(fèi)的問題。比如你剛剛消費(fèi)完消息之后,還沒提交 offset,結(jié)果自己掛掉了,那么這個(gè)消息理論上就會被消費(fèi)兩次。 Kafka 弄丟了消息我們知道 Kafka 為分區(qū)(Partition)引入了多副本(Replica)機(jī)制。分區(qū)(Partition)中的多個(gè)副本之間會有一個(gè)叫做 leader 的家伙,其他副本稱為 follower。我們發(fā)送的消息會被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進(jìn)行同步。生產(chǎn)者和消費(fèi)者只與 leader 副本交互。你可以理解為其他副本只是 leader 副本的拷貝,它們的存在只是為了保證消息存儲的安全性。 試想一種情況:假如 leader 副本所在的 broker 突然掛掉,那么就要從 follower 副本重新選出一個(gè) leader ,但是 leader 的數(shù)據(jù)還有一些沒有被 follower 副本的同步的話,就會造成消息丟失。 設(shè)置 acks = all解決辦法就是我們設(shè)置 acks = all。acks 是 Kafka 生產(chǎn)者(Producer) 很重要的一個(gè)參數(shù)。 acks 的默認(rèn)值即為1,代表我們的消息被leader副本接收之后就算被成功發(fā)送。當(dāng)我們配置 acks = all 代表則所有副本都要接收到該消息之后該消息才算真正成功被發(fā)送。 設(shè)置 replication.factor >= 3為了保證 leader 副本能有 follower 副本能同步消息,我們一般會為 topic 設(shè)置 replication.factor >= 3。這樣就可以保證每個(gè) 分區(qū)(partition) 至少有 3 個(gè)副本。雖然造成了數(shù)據(jù)冗余,但是帶來了數(shù)據(jù)的安全性。 設(shè)置 min.insync.replicas > 1一般情況下我們還需要設(shè)置 min.insync.replicas> 1 ,這樣配置代表消息至少要被寫入到 2 個(gè)副本才算是被成功發(fā)送。min.insync.replicas 的默認(rèn)值為 1 ,在實(shí)際生產(chǎn)中應(yīng)盡量避免默認(rèn)值 1。 但是,為了保證整個(gè) Kafka 服務(wù)的高可用性,你需要確保 replication.factor > min.insync.replicas 。為什么呢?設(shè)想一下加入兩者相等的話,只要是有一個(gè)副本掛掉,整個(gè)分區(qū)就無法正常工作了。這明顯違反高可用性!一般推薦設(shè)置成 replication.factor = min.insync.replicas + 1。 設(shè)置 unclean.leader.election.enable = false
我們最開始也說了我們發(fā)送的消息會被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進(jìn)行同步。多個(gè) follower 副本之間的消息同步情況不一樣,當(dāng)我們配置了 unclean.leader.election.enable = false 的話,當(dāng) leader 副本發(fā)生故障時(shí)就不會從 follower 副本中和 leader 同步程度達(dá)不到要求的副本中選擇出 leader ,這樣降低了消息丟失的可能性。 |
|