乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      asp .net core發(fā)布訂閱kafka

       怡紅公子0526 2021-04-03

      Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統,有如下特性:

      • 通過O的磁盤數據結構提供消息的持久化,這種結構對于即使數以TB的消息存儲也能夠保持長時間的穩(wěn)定性能。

      • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數百萬 [2] 的消息。

      • 支持通過Kafka服務器和消費機集群來分區(qū)消息。

      • 支持Hadoop并行數據加載。
        Kafka通過官網發(fā)布了最新版本2.3.0

      相關術語介紹

      • Broker
        Kafka集群包含一個或多個服務器,這種服務器被稱為broker

      • Topic
        每條發(fā)布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存于何處)

      • Partition
        Partition是物理上的概念,每個Topic包含一個或多個Partition.

      • Producer
        負責發(fā)布消息到Kafka broker

      • Consumer
        消息消費者,向Kafka broker讀取消息的客戶端。

      • Consumer Group
        每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于默認的group)。

      在這里我們用了一個第三方庫叫Confluent.kafka,在nuget上搜索一下就出來了,感謝原作者。

      新建一個 .net core類庫項目

      安裝第三方依賴庫,如下圖所示:
      41.png

      新建一個SUPKafkaTopicConsumer類

      這是用來創(chuàng)建并初始化消費者,接下來看看這個類里面包含了什么。

      • 首先聲明一個委托,用來接收訂閱消息

      public delegate void OnReceivedHandle(object data);

      初始化消費者,構造函數中傳入kafka地址,以及要訂閱的組groupId,另外注入了log4net記錄日志信息。
      init()方法用來初始化,新建一個消費者,具體代碼如下。

       public class SUPKafkaTopicConsumer<TKey, TValue>
          {
              private IConsumer<TKey, TValue> consumer;
              private SUPLogger logger_;
              private string BootStrapServer;
              private string GroupId;
            
              public SUPKafkaTopicConsumer(string bootStrapServer, string groupId, SUPLogger logger = null)
              {
                  BootStrapServer = bootStrapServer;
                  GroupId = groupId;
                  logger_ = logger;
              }
      
              public bool Init()
              {
                  try
                  {
                      var conf = new ConsumerConfig
                      {
                          GroupId = GroupId,
                          BootstrapServers = BootStrapServer,
                          AutoOffsetReset = AutoOffsetReset.Earliest,
                          EnableAutoCommit = false // 設置非自動偏移,業(yè)務邏輯完成后手動處理偏移,防止數據丟失
                      };
                      consumer = new ConsumerBuilder<TKey, TValue>(conf)
                          .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                          .Build();
      
                      return true;
                  }
                  catch (Exception ex)
                  {
                      throw;
                  }
              }
      • 定義回調事件,用以處理用戶自定義方法。

      public event OnReceivedHandle onReceivedHandle;
      • 定義一個訂閱的方法,傳入topic,以及是否需要提交偏移量。
        其實看init()方法中我把EnableAutoCommit=false,取消了自動提交,讓應用程序決定何時提交 偏移量,為什么這么做呢?
        自動提交雖然方便,但是也有一些弊端,自動提交的弊端是通過間隔時間。 一般是默認5s提交時間間隔,在最近一次提交之后的 3s發(fā)生了再均衡,再均衡之后,消費者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落后 了 3s,所以在這 3s 內到達的消息會被重復處理。可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重復消息的時間窗,不過這種情況是無也完全避免的 。
        大部分開發(fā)者通過控制偏移量提交時間來消除丟失消息的可能性,井在發(fā)生再均衡時減少 重復消息的數量。消費者 API提供了另一種提交偏移量的方式 , 開發(fā)者可以在必要的時候 提交當前偏移盤,而不是基于時間間隔。

      public void Subscribe(string topic, bool isCommit)
              {
                  try
                  {
                      if (consumer != null)
                      {
                          consumer.Subscribe(topic);
                          while (true)
                          {
                              var consume = consumer.Consume();
                              if (onReceivedHandle != null)
                              {
                                  onReceivedHandle(consume);
      
                                  if (isCommit)
                                  {
                                      consumer.Commit(consume);
                                  }
                              }
                          }
                      }
                  }
                  catch (Exception ex)
                  {
                      //consumer.Close();
                      throw ex;
                  }
              }
      • 取消訂閱

       public void UnSubscribe()
              {
                  if (consumer != null)
                  {
                      consumer.Unsubscribe();
                  }
              }

      新建生產者類

      • 首先定義了ISUPKafkaProducer<Tkey, TValue>接口,包含四個方法

       public interface ISUPKafkaProducer<Tkey,TValue>
          {
              ISendResult Send(Tkey key, TValue value, string topic,Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);
              ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);
      
              ISendResult AsyncSend(Tkey key, TValue value,string topic);
              ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition);
          }
      • 接口的實現,初始化過程類似消費者

      internal class SUPKafkaTopicProducer<Tkey, TValue> : ISUPKafkaProducer<Tkey, TValue>
          {
              private IProducer<Tkey, TValue> producer;
              private SUPLogger logger_;
              private string m_bootStrapServer;
      
              public SUPKafkaTopicProducer(string bootStrapServer,SUPLogger logger = null)
              {
                  m_bootStrapServer = bootStrapServer;
                  logger_ = logger;
              }
              public bool Init()
              {
                  try
                  {
                      var config = new ProducerConfig
                      {
                          BootstrapServers = m_bootStrapServer
                      };
                      producer = new ProducerBuilder<Tkey, TValue>(config)
                          .SetErrorHandler((producer, error) =>
                          {
                              logger_.Fatal(string.Format("Kafka Error Handler {0},ErrorCode:{2},Reason:{3}",
                                  m_bootStrapServer, error.Code, error.Reason));
                          })
                          .SetLogHandler((producer, msg) =>
                          {
                              logger_.Info(string.Format("Kafka Log Handler {0}-{1},Name:{2},Message:{3}",
                                  m_bootStrapServer, msg.Name, msg.Message));
                          })
                          .Build();
      
                      return true;
                  }
                  catch (Exception ex)
                  {
                      throw ex;
                  }
              }

      實現繼承至ISUPKafkaProducer<Tkey, TValue>的方法

       public ISendResult Send(Tkey key, TValue value,string topic, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
              {
                  try
                  {
                      if (producer != null)
                      {
                          var message = new Message<Tkey, TValue>
                          {
                              Value = value,
                              Key = key
                          };
                          producer.Produce(topic, message, sendCallBack);
                          return new SendResult(true);
                      }
                      else
                      {
                          return new SendResult(true, "沒有初始化生產者");
                      }
                  }
                  catch (Exception ex)
                  {
                      throw ex;
                  }
              }
      
              public ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
              {
                  try
                  {
                      if (producer != null)
                      {
                          var message = new Message<Tkey, TValue>
                          {
                              Value = value,
                              Key = key
                          };
                          producer.Produce(topicPartition, message, sendCallBack);
                          return new SendResult(true);
                      }
                      else
                      {
                          return new SendResult(true, "沒有初始化生產者");
                      }
                  }
                  catch (Exception ex)
                  {
                      throw ex;
                  }
              }
      
              public ISendResult AsyncSend(Tkey key, TValue value,string topic)
              {
                  try
                  {
                      if (producer != null)
                      {
                          var message = new Message<Tkey, TValue>
                          {
                              Value = value,
                              Key = key
                          };
                          var deliveryReport = producer.ProduceAsync(topic, message);
                          deliveryReport.ContinueWith(task =>
                         {
                             Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset);
                         });
                          producer.Flush(TimeSpan.FromSeconds(10));
                          return new SendResult(true);
                      }
                      else
                      {
                          return new SendResult(true, "沒有初始化生產者");
                      }
                  }
                  catch (Exception ex)
                  {
                      throw ex;
                  }
              }
      
              public ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition)
              {
                  try
                  {
                      if (producer != null)
                      {
                          var message = new Message<Tkey, TValue>
                          {
                              Value = value,
                              Key = key
                          };
      
                          var deliveryReport = producer.ProduceAsync(topicPartition, message);
                          deliveryReport.ContinueWith(task =>
                          {
                              Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topicPartition.Topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset);
                          });
      
                          producer.Flush(TimeSpan.FromSeconds(10));
                          return new SendResult(true);
                      }
                      else
                      {
                          return new SendResult(true, "沒有初始化生產者");
                      }
                  }
                  catch (Exception ex)
                  {
                      throw ex;
                  }
              }

      新建一個SUPKafkaMessageCenter類

      這個類是對外開放的,我們利用這個類來管理生產者和消費者,看下代碼非常簡單。

      public static class SUPKafkaMessageCenter<Tkey, TValue>
          {
              private static SUPLogger logger = null;
              static SUPKafkaMessageCenter()
              {
                  SUPLoggerManager.Configure();
                  logger = new SUPLogger("KafkaCenter");
              }
              /// <summary>
              /// 創(chuàng)建生產者
              /// </summary>
              /// <param name="bootstrapServer"></param>
              /// <param name="topicName"></param>
              /// <returns></returns>
              public static ISUPKafkaProducer<Tkey, TValue> CreateTopicProducer(string bootstrapServer)
              {
                  if (string.IsNullOrEmpty(bootstrapServer))
                  {
                      return null;
                  }
                  var producer = new SUPKafkaTopicProducer<Tkey, TValue>(bootstrapServer, logger);
                  if (!producer.Init())
                  {
                      return null;
                  }
                  return producer;
              }
      
              /// <summary>
              /// 創(chuàng)建消費者
              /// </summary>
              /// <param name="bootstrapServer"></param>
              /// <param name="groupId"></param>
              /// <returns></returns>
              public static SUPKafkaTopicConsumer<Tkey, TValue> CreateTopicConsumer(string bootstrapServer, string groupId= "default-consumer-group")
              {
                  if (string.IsNullOrEmpty(bootstrapServer))
                  {
                      return null;
                  }
                  var consumer = new SUPKafkaTopicConsumer<Tkey, TValue>(bootstrapServer, groupId,logger);
                  if (!consumer.Init())
                  {
                      return null;
                  }
                  return consumer;
              }

      測試

      新建一個測試的控制臺程序,調用代碼如下

      • 消費者

      var consumer = SUPKafkaMessageCenter<string, string>.CreateTopicConsumer("localhost:9092");
                  //綁定接收信息,回調函數
                  consumer.onReceivedHandle += CallBack;
      
                  var topics = new List<string>();
                  topics.Add("kafka-default-topic");
                  topics.Add("test");
                  //訂閱主題
                  consumer.Subscribe(topics, false);
      • 生產者

      ISUPKafkaProducer<string, string> kafkaCenter = SUPKafkaMessageCenter<string, string>.CreateTopicProducer("localhost:9092");
      kafkaCenter.Send(i.ToString(), "", "kafka-default-topic",deliveryReport =>{...});

      除了上面寫的這些方法,其實對于kafka還有很多功能,比如topic的增刪改查,我把它認為是管理類的,這里就不貼代碼了。

        本站是提供個人知識管理的網絡存儲空間,所有內容均由用戶發(fā)布,不代表本站觀點。請注意甄別內容中的聯系方式、誘導購買等信息,謹防詐騙。如發(fā)現有害或侵權內容,請點擊一鍵舉報。
        轉藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多