乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      alpakka-kafka(2)-consumer

       怡紅公子0526 2021-04-29

         alpakka-kafka-consumer的功能描述很簡單:向kafka訂閱某些topic然后把讀到的消息傳給akka-streams做業(yè)務(wù)處理。在kafka-consumer的實現(xiàn)細節(jié)上,為了達到高可用、高吞吐的目的,topic又可用劃分出多個分區(qū)partition。分區(qū)是分布在kafka集群節(jié)點broker上的。由于一個topic可能有多個partition,對應(yīng)topic就會有多個consumer,形成一個consumer組,共用統(tǒng)一的groupid。一個partition只能對應(yīng)一個consumer、而一個consumer負責從多個partition甚至多個topic讀取消息。kafka會根據(jù)實際情況將某個partition分配給某個consumer,即partition-assignment。所以一般來說我們會把topic訂閱與consumer-group掛鉤。這個可以在典型的ConsumerSettings證實:

        val system = ActorSystem("kafka-sys")
        val config = system.settings.config.getConfig("akka.kafka.consumer")
        val bootstrapServers = "localhost:9092"
        val consumerSettings =
          ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)
            .withBootstrapServers(bootstrapServers)
            .withGroupId("group1")
            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      

      我們先用一個簡單的consumer plainSource試試把前一篇示范中producer寫入kafka的消息讀出來: 

      import akka.actor.ActorSystem
      import akka.kafka._
      import akka.kafka.scaladsl._
      import akka.stream.{RestartSettings, SystemMaterializer}
      import akka.stream.scaladsl.{Keep, RestartSource, Sink}
      import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
      import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
      
      import scala.concurrent._
      import scala.concurrent.duration._
      object plain_source extends App {
        val system = ActorSystem("kafka-sys")
        val config = system.settings.config.getConfig("akka.kafka.consumer")
        implicit val mat = SystemMaterializer(system).materializer
        implicit val ec: ExecutionContext = mat.executionContext
        val bootstrapServers = "localhost:9092"
        val consumerSettings =
          ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
            .withBootstrapServers(bootstrapServers)
            .withGroupId("group1")
            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      
        val subscription = Subscriptions.topics("greatings")
        Consumer.plainSource(consumerSettings, subscription)
          .runWith(Sink.foreach(msg => println(msg.value())))
      
        scala.io.StdIn.readLine()
        system.terminate()
      
      }
      

      以上我們沒有對讀出的消息做任何的業(yè)務(wù)處理,直接顯示出來。注意每次都會從頭完整讀出,因為設(shè)置了 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),也就是kafka-consumer的auto.offset.reset = "earliest" 。那么如果需要用讀出的數(shù)據(jù)進行業(yè)務(wù)處理的話,每次開始運行應(yīng)用時都會重復(fù)從頭執(zhí)行這些業(yè)務(wù)。所以需要某種機制來標注已經(jīng)讀取的消息,也就是需要記住當前讀取位置offset。

      Consumer.plainSource輸入ConsumerRecord類型:

          public ConsumerRecord(String topic,
                                int partition,
                                long offset,
                                K key,
                                V value) {
              this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
                      NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
          }
      

      這個ConsumerRecord類型里包括了offset,用戶可以自行commit這個位置參數(shù),也就是說用戶可以選擇把這個offset存儲在kafka或者其它的數(shù)據(jù)庫里。說到commit-offset,offset管理機制在kafka-consumer業(yè)務(wù)應(yīng)用中應(yīng)該屬于關(guān)鍵技術(shù)。kafka-consumer方面的業(yè)務(wù)流程可以簡述為:從kafka讀出業(yè)務(wù)指令,執(zhí)行指令并更新業(yè)務(wù)狀態(tài),然后再從kafka里讀出下一批指令。為了實現(xiàn)業(yè)務(wù)狀態(tài)的準確性,無論錯過一些指令或者重復(fù)執(zhí)行一些指令都是不能容忍的。所以,必須準確的標記每次從kafka讀取數(shù)據(jù)后的指針位置,commit-offset。但是,如果讀出數(shù)據(jù)后即刻commit-offset,那么在執(zhí)行業(yè)務(wù)指令時如果系統(tǒng)發(fā)生異常,那么下次再從標注的位置開始讀取數(shù)據(jù)時就會越過一批業(yè)務(wù)指令。這種情況稱為at-most-once,即可能會執(zhí)行一次,但絕不會重復(fù)。另一方面:如果在成功改變業(yè)務(wù)狀態(tài)后再commit-offset,那么,一旦執(zhí)行業(yè)務(wù)指令時發(fā)生異常而無法進行commit-offset,下次讀取的位置將使用前一次的標注位置,就會出現(xiàn)重復(fù)改變業(yè)務(wù)狀態(tài)的情況,這種情況稱為at-least-once,即一定會執(zhí)行業(yè)務(wù)指令,但可能出現(xiàn)重復(fù)更新情況。如果涉及資金、庫存等業(yè)務(wù),兩者皆不可接受,只能采用exactly-once保證一次這種模式了。不過也有很多業(yè)務(wù)要求沒那么嚴格,比如某個網(wǎng)站統(tǒng)計點擊量,只需個約莫數(shù),無論at-least-once,at-most-once都可以接受。

      kafka-consumer-offset是一個Long類型的值,可以存放在kafka內(nèi)部或者外部的數(shù)據(jù)庫里。如果選擇在kafka內(nèi)部存儲offset, kafka配置里可以設(shè)定按時間間隔自動進行位置標注,自動把當前offset存入kafka里。當我們在上面例子的ConsumerSettings里設(shè)置自動commit后,多次重新運行就不會出現(xiàn)重復(fù)數(shù)據(jù)的情況了:

      val consumerSettings =
          ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
            .withBootstrapServers(bootstrapServers)
            .withGroupId("group1")
            .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")        //自動commit
            .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")   //自動commit間隔
            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      

      alpakka-kafka提供了Committer類型,是akka-streams的Sink或Flow組件,負責把offset寫入kafka。如果用Committer的Sink或Flow就可以按用戶的需要控制commit-offset的發(fā)生時間。如下面這段示范代碼: 

       

        val committerSettings = CommitterSettings(system)
      
        val control: DrainingControl[Done] =
          Consumer
            .committableSource(consumerSettings, Subscriptions.topics("greatings"))
            .mapAsync(10) { msg =>
              BusinessLogic.runBusiness(msg.record.key, msg.record.value)
                .map(_ => msg.committableOffset)
            }
            .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
            .run()
      
      control.drainAndShutdown();
      
        scala.io.StdIn.readLine()
        system.terminate()
      
      }
      object BusinessLogic {
        def runBusiness(key: String, value: String): Future[Done] = Future.successful(Done)
      }
      

      上面這個例子里BusinessLogic.runBusiess()模擬一段業(yè)務(wù)處理代碼,也就是說完成了業(yè)務(wù)處理之后就用Committer.sink進行了commit-offset。這是一種at-least-once模式,因為runBusiness可能會發(fā)生異常失敗,所以有可能出現(xiàn)重復(fù)運算的情況。Consumer.committableSource輸出CommittableMessage: 

      def committableSource[K, V](settings: ConsumerSettings[K, V],
                                    subscription: Subscription): Source[CommittableMessage[K, V], Control] =
          Source.fromGraph(new CommittableSource[K, V](settings, subscription))
      
      
      
        final case class CommittableMessage[K, V](
            record: ConsumerRecord[K, V],
            committableOffset: CommittableOffset
        )
      
        @DoNotInherit sealed trait CommittableOffset extends Committable {
          def partitionOffset: PartitionOffset
        }
      

      Committer.sink接受輸入Committable類型并將之寫入kafka,上游的CommittableOffset 繼承了 Committable。另外,這個DrainingControl類型結(jié)合了Control類型和akka-streams終結(jié)信號可以有效控制整個consumer-streams安全終結(jié)。

      alpakka-kafka還有一個atMostOnceSource。這個Source組件每讀一條數(shù)據(jù)就會立即自動commit-offset:

        def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
                                   subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
          committableSource[K, V](settings, subscription).mapAsync(1) { m =>
            m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.sameThreadExecutionContext)
          }
      

      可以看出來,atMostOnceSource在輸出ConsumerRecord之前就進行了commit-offset。atMostOnceSource的一個具體使用示范如下:

        import scala.collection.immutable
        val control: DrainingControl[immutable.Seq[Done]] =
          Consumer
            .atMostOnceSource(consumerSettings, Subscriptions.topics("greatings"))
            .mapAsync(1)(record => BussinessLogic.runBusiness(record.key, record.value()))
            .toMat(Sink.seq)(DrainingControl.apply)
            .run()
      
        control.drainAndShutdown();
        scala.io.StdIn.readLine()
        system.terminate()
      

      所以,使用atMostOnceSource后是不需要任何Committer來進行commit-offset的了。值得注意的是atMostOnceSource是對每一條數(shù)據(jù)進行位置標注的,所以運行效率必然會受到影響,如果要求不是那么嚴格的話還是啟動自動commit比較合適。

      對于任何類型的交易業(yè)務(wù)系統(tǒng)來說,無論at-least-once或at-most-once都是不可接受的,只有exactly-once才妥當。實現(xiàn)exactly-once的其中一個方法是把offset與業(yè)務(wù)數(shù)據(jù)存放在同一個外部數(shù)據(jù)庫中。如果在外部數(shù)據(jù)庫通過事務(wù)處理機制(transaction-processing)把業(yè)務(wù)狀態(tài)更新與commit-offset放在一個事務(wù)單元中同進同退就能實現(xiàn)exactly-once模式了。下面這段是官方文檔給出的一個示范:

        val db = new mongoldb
        val control = db.loadOffset().map { fromOffset =>
          Consumer
            .plainSource(
              consumerSettings,
              Subscriptions.assignmentWithOffset(
                new TopicPartition(topic, /* partition = */ 0) -> fromOffset
              )
            )
            .mapAsync(1)(db.businessLogicAndStoreOffset)
            .toMat(Sink.seq)(DrainingControl.apply)
            .run()
        }
      
      class mongoldb {
        def businessLogicAndStoreOffset(record: ConsumerRecord[String, String]): Future[Done] = // ...
        def loadOffset(): Future[Long] = // ...
      }
      

      在上面這段代碼里:db.loadOffset()從mongodb里取出上一次讀取位置,返回Future[Long],然后用Subscriptions.assignmentWithOffset把這個offset放在一個tuple (TopicPartition,Long)里。TopicPartition定義如下: 

          public TopicPartition(String topic, int partition) {
              this.partition = partition;
              this.topic = topic;
          }
      

      這樣Consumer.plainSource就可以從offset開始讀取數(shù)據(jù)了。plainSource輸出ConsumerRecord類型:

          public ConsumerRecord(String topic,
                                int partition,
                                long offset,
                                K key,
                                V value) {
              this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
                      NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
          }
      

      這里面除業(yè)務(wù)指令value外還提供了當前offset。這些已經(jīng)足夠在businessLogicAndStoreOffset()里運算一個單獨的business+offset事務(wù)了(transaction)。 

       

       

       

       

       

       

       

       

        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多