乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Kafka詳解五、Kafka Consumer的底層API

       codingparty 2016-03-18
      Kafka提供了兩套API給Consumer
      1. The high-level Consumer API
      2. The SimpleConsumer API     

      第一種高度抽象的Consumer API,它使用起來(lái)簡(jiǎn)單、方便,但是對(duì)于某些特殊的需求我們可能要用到第二種更底層的API,那么先介紹下第二種API能夠幫助我們做哪些事情

      • 一個(gè)消息讀取多次
      • 在一個(gè)處理過(guò)程中只消費(fèi)Partition其中的一部分消息
      • 添加事務(wù)管理機(jī)制以保證消息被處理且僅被處理一次

      使用SimpleConsumer有哪些弊端呢?

      • 必須在程序中跟蹤offset值
      • 必須找出指定Topic Partition中的lead broker
      • 必須處理broker的變動(dòng)

      使用SimpleConsumer的步驟

      1. 從所有活躍的broker中找出哪個(gè)是指定Topic Partition中的leader broker
      2. 找出指定Topic Partition中的所有備份broker
      3. 構(gòu)造請(qǐng)求
      4. 發(fā)送請(qǐng)求查詢(xún)數(shù)據(jù)
      5. 處理leader broker變更
      代碼實(shí)例:
      package bonree.consumer;
      
      import java.nio.ByteBuffer;
      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      
      import kafka.api.FetchRequest;
      import kafka.api.FetchRequestBuilder;
      import kafka.api.PartitionOffsetRequestInfo;
      import kafka.common.ErrorMapping;
      import kafka.common.TopicAndPartition;
      import kafka.javaapi.FetchResponse;
      import kafka.javaapi.OffsetResponse;
      import kafka.javaapi.PartitionMetadata;
      import kafka.javaapi.TopicMetadata;
      import kafka.javaapi.TopicMetadataRequest;
      import kafka.javaapi.consumer.SimpleConsumer;
      import kafka.message.MessageAndOffset;
      
      public class SimpleExample {
        private List<String> m_replicaBrokers = new ArrayList<String>();
      
        public SimpleExample() {
          m_replicaBrokers = new ArrayList<String>();
        }
      
        public static void main(String args[]) {
          SimpleExample example = new SimpleExample();
          // 最大讀取消息數(shù)量
          long maxReads = Long.parseLong("3");
          // 要訂閱的topic
          String topic = "mytopic";
          // 要查找的分區(qū)
          int partition = Integer.parseInt("0");
          // broker節(jié)點(diǎn)的ip
          List<String> seeds = new ArrayList<String>();
          seeds.add("192.168.4.30");
          seeds.add("192.168.4.31");
          seeds.add("192.168.4.32");
          // 端口
          int port = Integer.parseInt("9092");
          try {
            example.run(maxReads, topic, partition, seeds, port);
          } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
          }
        }
      
        public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
          // 獲取指定Topic partition的元數(shù)據(jù)
          PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
          if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
          }
          if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
          }
          String leadBroker = metadata.leader().host();
          String clientName = "Client_" + a_topic + "_" + a_partition;
      
          SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
          long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
          int numErrors = 0;
          while (a_maxReads > 0) {
            if (consumer == null) {
              consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(req);
      
            if (fetchResponse.hasError()) {
              numErrors++;
              // Something went wrong!
              short code = fetchResponse.errorCode(a_topic, a_partition);
              System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
              if (numErrors > 5)
                break;
              if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                // We asked for an invalid offset. For simple case ask for
                // the last element to reset
                readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                continue;
              }
              consumer.close();
              consumer = null;
              leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
              continue;
            }
            numErrors = 0;
      
            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
              long currentOffset = messageAndOffset.offset();
              if (currentOffset < readOffset) {
                System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                continue;
              }
      
              readOffset = messageAndOffset.nextOffset();
              ByteBuffer payload = messageAndOffset.message().payload();
      
              byte[] bytes = new byte[payload.limit()];
              payload.get(bytes);
              System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
              numRead++;
              a_maxReads--;
            }
      
            if (numRead == 0) {
              try {
                Thread.sleep(1000);
              } catch (InterruptedException ie) {
              }
            }
          }
          if (consumer != null)
            consumer.close();
        }
      
        public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
          TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
          Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
          requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
          kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
          OffsetResponse response = consumer.getOffsetsBefore(request);
      
          if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
          }
          long[] offsets = response.offsets(topic, partition);
          return offsets[0];
        }
      
        /**
         * @param a_oldLeader
         * @param a_topic
         * @param a_partition
         * @param a_port
         * @return String
         * @throws Exception
         *             找一個(gè)leader broker
         */
        private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
          for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
              goToSleep = true;
            } else if (metadata.leader() == null) {
              goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
              // first time through if the leader hasn't changed give
              // ZooKeeper a second to recover
              // second time, assume the broker did recover before failover,
              // or it was a non-Broker issue
              //
              goToSleep = true;
            } else {
              return metadata.leader().host();
            }
            if (goToSleep) {
              try {
                Thread.sleep(1000);
              } catch (InterruptedException ie) {
              }
            }
          }
          System.out.println("Unable to find new leader after Broker failure. Exiting");
          throw new Exception("Unable to find new leader after Broker failure. Exiting");
        }
      
        private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
          PartitionMetadata returnMetaData = null;
          loop: for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
              consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
              List<String> topics = Collections.singletonList(a_topic);
              TopicMetadataRequest req = new TopicMetadataRequest(topics);
              kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
      
              List<TopicMetadata> metaData = resp.topicsMetadata();
              for (TopicMetadata item : metaData) {
                for (PartitionMetadata part : item.partitionsMetadata()) {
                  if (part.partitionId() == a_partition) {
                    returnMetaData = part;
                    break loop;
                  }
                }
              }
            } catch (Exception e) {
              System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
            } finally {
              if (consumer != null)
                consumer.close();
            }
          }
          if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
              m_replicaBrokers.add(replica.host());
            }
          }
          return returnMetaData;
        }
      }

        本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶(hù)發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請(qǐng)遵守用戶(hù) 評(píng)論公約

        類(lèi)似文章 更多