乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      如何保證Kafka不丟失消息?

       沙門空海 2020-03-16

      技術(shù)學(xué)習(xí)視頻教程網(wǎng)盤資源整理 https://pan.baidu.com/s/13dbR69NLIEyP1tQyRTl4xw


      kafka如何保證不丟消息

      ps:這篇文章自我感覺說的很大白話了 !希望你們看過了之后能有收獲。

      生產(chǎn)者丟失消息的情況

      生產(chǎn)者(Producer) 調(diào)用send方法發(fā)送消息之后,消息可能因?yàn)榫W(wǎng)絡(luò)問題并沒有發(fā)送過去。

      所以,我們不能默認(rèn)在調(diào)用send方法發(fā)送消息之后消息消息發(fā)送成功了。為了確定消息是發(fā)送成功,我們要判斷消息發(fā)送的結(jié)果。但是要注意的是  Kafka 生產(chǎn)者(Producer) 使用  send 方法發(fā)送消息實(shí)際上是異步的操作,我們可以通過 get()方法獲取調(diào)用結(jié)果,但是這樣也讓它變?yōu)榱送讲僮?,示例代碼如下:

      SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
      if (sendResult.getRecordMetadata() != null) {
      logger.info('生產(chǎn)者成功發(fā)送消息到' + sendResult.getProducerRecord().topic() + '-> ' + sendRe
      sult.getProducerRecord().value().toString());
      }

      但是一般不推薦這么做!可以采用為其添加回調(diào)函數(shù)的形式,示例代碼如下:

      ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
      future.addCallback(result -> logger.info('生產(chǎn)者成功發(fā)送消息到topic:{} partition:{}的消息',result.getRecordMetadata().topic(),result.getRecordMetadata().partition()),ex -> logger.error('生產(chǎn)者發(fā)送消失敗,原因:{}', ex.getMessage()));

      如果消息發(fā)送失敗的話,我們檢查失敗的原因之后重新發(fā)送即可!

      另外這里推薦為 Producer 的retries(重試次數(shù))設(shè)置一個(gè)比較合理的值,一般是 3 ,但是為了保證消息不丟失的話一般會設(shè)置比較大一點(diǎn)。設(shè)置完成之后,當(dāng)出現(xiàn)網(wǎng)絡(luò)問題之后能夠自動重試消息發(fā)送,避免消息丟失。另外,建議還要設(shè)置重試間隔,因?yàn)殚g隔太小的話重試的效果就不明顯了,網(wǎng)絡(luò)波動一次你3次一下子就重試完了

      消費(fèi)者丟失消息的情況

      我們知道消息在被追加到 Partition(分區(qū))的時(shí)候都會分配一個(gè)特定的偏移量(offset)。偏移量(offset)表示 Consumer 當(dāng)前消費(fèi)到的 Partition(分區(qū))的所在的位置。Kafka 通過偏移量(offset)可以保證消息在分區(qū)內(nèi)的順序性。

      kafka offset

      當(dāng)消費(fèi)者拉取到了分區(qū)的某個(gè)消息之后,消費(fèi)者會自動提交了 offset。自動提交的話會有一個(gè)問題,試想一下,當(dāng)消費(fèi)者剛拿到這個(gè)消息準(zhǔn)備進(jìn)行真正消費(fèi)的時(shí)候,突然掛掉了,消息實(shí)際上并沒有被消費(fèi),但是 offset 卻被自動提交了。

      解決辦法也比較粗暴,我們手動關(guān)閉自動提交 offset,每次在真正消費(fèi)完消息之后之后再自己手動提交 offset 。 但是,細(xì)心的朋友一定會發(fā)現(xiàn),這樣會帶來消息被重新消費(fèi)的問題。比如你剛剛消費(fèi)完消息之后,還沒提交 offset,結(jié)果自己掛掉了,那么這個(gè)消息理論上就會被消費(fèi)兩次。

      Kafka 弄丟了消息

      我們知道 Kafka 為分區(qū)(Partition)引入了多副本(Replica)機(jī)制。分區(qū)(Partition)中的多個(gè)副本之間會有一個(gè)叫做 leader 的家伙,其他副本稱為 follower。我們發(fā)送的消息會被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進(jìn)行同步。生產(chǎn)者和消費(fèi)者只與 leader 副本交互。你可以理解為其他副本只是 leader 副本的拷貝,它們的存在只是為了保證消息存儲的安全性。

      試想一種情況:假如 leader 副本所在的 broker 突然掛掉,那么就要從 follower 副本重新選出一個(gè) leader ,但是 leader 的數(shù)據(jù)還有一些沒有被 follower 副本的同步的話,就會造成消息丟失。

      設(shè)置 acks = all

      解決辦法就是我們設(shè)置  acks = all。acks 是 Kafka 生產(chǎn)者(Producer)  很重要的一個(gè)參數(shù)。

      acks 的默認(rèn)值即為1,代表我們的消息被leader副本接收之后就算被成功發(fā)送。當(dāng)我們配置 acks = all 代表則所有副本都要接收到該消息之后該消息才算真正成功被發(fā)送。

      設(shè)置 replication.factor >= 3

      為了保證 leader 副本能有 follower 副本能同步消息,我們一般會為 topic 設(shè)置 replication.factor >= 3。這樣就可以保證每個(gè) 分區(qū)(partition) 至少有 3 個(gè)副本。雖然造成了數(shù)據(jù)冗余,但是帶來了數(shù)據(jù)的安全性。

      設(shè)置 min.insync.replicas > 1

      一般情況下我們還需要設(shè)置 min.insync.replicas> 1 ,這樣配置代表消息至少要被寫入到 2 個(gè)副本才算是被成功發(fā)送。min.insync.replicas 的默認(rèn)值為 1 ,在實(shí)際生產(chǎn)中應(yīng)盡量避免默認(rèn)值 1。

      但是,為了保證整個(gè) Kafka 服務(wù)的高可用性,你需要確保 replication.factor > min.insync.replicas 。為什么呢?設(shè)想一下加入兩者相等的話,只要是有一個(gè)副本掛掉,整個(gè)分區(qū)就無法正常工作了。這明顯違反高可用性!一般推薦設(shè)置成 replication.factor = min.insync.replicas + 1

      設(shè)置 unclean.leader.election.enable = false

      Kafka 0.11.0.0版本開始 unclean.leader.election.enable 參數(shù)的默認(rèn)值由原來的true 改為false

      我們最開始也說了我們發(fā)送的消息會被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進(jìn)行同步。多個(gè) follower 副本之間的消息同步情況不一樣,當(dāng)我們配置了 unclean.leader.election.enable = false  的話,當(dāng) leader 副本發(fā)生故障時(shí)就不會從  follower 副本中和 leader 同步程度達(dá)不到要求的副本中選擇出  leader ,這樣降低了消息丟失的可能性。

        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多