乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      dubbo 源碼學習筆記 (三)

       liang1234_ 2019-05-15

      歡迎訪問我的個人博客休息的風

      與dubbo服務(wù)發(fā)布相對的,是引用服務(wù)進行調(diào)用的過程,這個很多步驟都是與服務(wù)發(fā)布相對的,但是也有特有的地方,比如,負載均衡 ,集群容錯等。這篇博客,我們主要關(guān)注dubbo服務(wù)調(diào)用的一個核心過程。

      dubbo服務(wù)調(diào)用的主要過程:將調(diào)用信息注冊到zk上-> 通知RegistryDirectory刷新可用服務(wù)列表->刷新過程中,新服務(wù)會與netty服務(wù)端建立連接,并封裝到DubboInvoker中。-> 選擇失敗策略通過負載均衡算法,選擇服務(wù)端具體哪個服務(wù)去執(zhí)行 -> 通過netty返回執(zhí)行結(jié)果。

      Dubbo服務(wù)調(diào)用過程圖如下:看不清,請點擊新的頁簽進行查看)  


      在ReferenceConfig.init的方法里,會把配置信息封閉成一個map,然后去構(gòu)建一個代理類。在構(gòu)建這個代理類,會用DubboProtocol獲取DubboInvoker。

      private T createProxy(Map<String, String> map) {
          // 省略一些代碼。。。
          // 通過注冊中心配置拼裝URL
                  List<URL> us = loadRegistries(false);
                  if (us != null && us.size() > 0) {
                      for (URL u : us) {
                          URL monitorUrl = loadMonitor(u);
                          if (monitorUrl != null) {
                              map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                          }
                          urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                      }
                  }
                  if (urls == null || urls.size() == 0) {
                      throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                  }
              }
      
              if (urls.size() == 1) {
                  //會調(diào)用RegistryProtocol.refer
                  invoker = refprotocol.refer(interfaceClass, urls.get(0));
              } else {
                  List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                 //省略很多代碼
          // 創(chuàng)建服務(wù)代理,使用JavassistProxyFactory去創(chuàng)建
          return (T) proxyFactory.getProxy(invoker);
      }

      在RegistryProtocol里,會創(chuàng)建一個RegistryDirectory,這個對象保存的調(diào)用服務(wù)的信息,并且也是一個監(jiān)聽。

      private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
          RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
          directory.setRegistry(registry);
          directory.setProtocol(protocol);
          URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
          if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                  && url.getParameter(Constants.REGISTER_KEY, true)) {
              //調(diào)用zk去真實的注冊
              registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                      Constants.CHECK_KEY, String.valueOf(false)));
          }
          //最終也是調(diào)用zk去訂閱監(jiān)聽,監(jiān)聽器是RegistryDirectory
          directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                  Constants.PROVIDERS_CATEGORY
                          + "," + Constants.CONFIGURATORS_CATEGORY
                          + "," + Constants.ROUTERS_CATEGORY));
          return cluster.join(directory);
      }

      在真正注冊到zk上,注冊監(jiān)聽器到zk相應(yīng)路徑上,會調(diào)用RegistryDirectory的notify通知方法,去獲取可用的服務(wù)列表。

      public synchronized void notify(List<URL> urls) {
          List<URL> invokerUrls = new ArrayList<URL>();
          List<URL> routerUrls = new ArrayList<URL>();
          List<URL> configuratorUrls = new ArrayList<URL>();
         //初始化值 
          //省略代碼
          // configurators
          if (configuratorUrls != null && configuratorUrls.size() > 0) {
              this.configurators = toConfigurators(configuratorUrls);
          }
          // routers
          if (routerUrls != null && routerUrls.size() > 0) {
              List<Router> routers = toRouters(routerUrls);
              if (routers != null) { // null - do nothing
                  setRouters(routers);
              }
          }
          List<Configurator> localConfigurators = this.configurators; // local reference
          // 合并override參數(shù)
          this.overrideDirectoryUrl = directoryUrl;
          if (localConfigurators != null && localConfigurators.size() > 0) {
              for (Configurator configurator : localConfigurators) {
                  this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
              }
          }
          // providers  刷新服務(wù)列表
          refreshInvoker(invokerUrls);
      }
      其中調(diào)用refershInvoker方法時,會去調(diào)用toInvokers把URl列表轉(zhuǎn)換為Invoker列表。
      private void refreshInvoker(List<URL> invokerUrls) {
          if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                  && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
              this.forbidden = true; // 禁止訪問
              this.methodInvokerMap = null; // 置空列表
              destroyAllInvokers(); // 關(guān)閉所有Invoker
          } else {
              this.forbidden = false; // 允許訪問
              Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
              if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
                  invokerUrls.addAll(this.cachedInvokerUrls);
              } else {
                  this.cachedInvokerUrls = new HashSet<URL>();
                  this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls列表,便于交叉對比
              }
              if (invokerUrls.size() == 0) {
                  return;
              }
              Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 將URL列表轉(zhuǎn)成Invoker列表
              Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker列表
              //省略一些代碼
              。。。。。。。
          }
      }
      在將這些url轉(zhuǎn)換為invoker時,會使用DubboProtocol去創(chuàng)建與netty服務(wù)端的連接。
      private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
          Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
          if (urls == null || urls.size() == 0) {
              return newUrlInvokerMap;
          }
          Set<String> keys = new HashSet<String>();
          String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
          for (URL providerUrl : urls) {
             //封裝url信息
              //省略一些代碼
              // 緩存key為沒有合并消費端參數(shù)的URL,不管消費端如何合并參數(shù),如果服務(wù)端URL發(fā)生變化,則重新refer
              Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
              Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
              if (invoker == null) { // 緩存中沒有,重新refer
                  try {
                      boolean enabled = true;
                      if (url.hasParameter(Constants.DISABLED_KEY)) {
                          enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                      } else {
                          enabled = url.getParameter(Constants.ENABLED_KEY, true);
                      }
                      if (enabled) {
                          //服務(wù)可以就使用DubboProtocol去創(chuàng)建連接,封裝DubboInvoker
                          invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                      }
      在DubboProtocol.refer里,會去創(chuàng)建netty客戶端連接。
      public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
          // create rpc invoker.
          DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
          invokers.add(invoker);
          return invoker;
      }
      最后是返回一個DubboInvoker對象,在這之前會調(diào)用getClients先去獲取或創(chuàng)建客戶端連接。是共享連接就獲取之前的加接,不是的話就創(chuàng)建新的連接。
      private ExchangeClient initClient(URL url) {
      
          //設(shè)置一些參數(shù),比如心跳機制等
          // client type setting.
          String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
      
          String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
          boolean compatible = (version != null && version.startsWith("1.0."));
          url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
          //默認開啟heartbeat
          url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
      
         //省略一些代碼
      
          ExchangeClient client;
          try {
              //設(shè)置連接應(yīng)該是lazy的 
              if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                  client = new LazyConnectExchangeClient(url, requestHandler);
              } else {
                  //跟服務(wù)發(fā)布相對,具體調(diào)用HeaderExchanger.connect
                  client = Exchangers.connect(url, requestHandler);
              }
      在HeaderExchanger里,也是通過具體的NettyTransports連接去創(chuàng)建一個NettyClient,在doOpen方法里,創(chuàng)建netty連接,熟悉netty的應(yīng)該對這段代碼不陌生
      protected void doOpen() throws Throwable {
          NettyHelper.setNettyLoggerFactory();
          bootstrap = new ClientBootstrap(channelFactory);
          // config
          // @see org.jboss.netty.channel.socket.SocketChannelConfig
          bootstrap.setOption("keepAlive", true);
          bootstrap.setOption("tcpNoDelay", true);
          bootstrap.setOption("connectTimeoutMillis", getTimeout());
          final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
          bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
              public ChannelPipeline getPipeline() {
                  //使用適配,用DubboCodec去編碼解碼
                  NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                  ChannelPipeline pipeline = Channels.pipeline();
                  pipeline.addLast("decoder", adapter.getDecoder());
                  pipeline.addLast("encoder", adapter.getEncoder());
                  //工作線程的處理handler
                  pipeline.addLast("handler", nettyHandler);
                  return pipeline;
              }
          });
      }
      在RegistryDirectory中,緩存了可用的服務(wù)列表Invoker,之后具體使用哪個服務(wù)去調(diào)用,就看選擇的負載均衡策略了。在RegistryProtocol.doRefer里,會去執(zhí)行“cluster.join(directory)”,這里的cluster看是使用哪種失敗策略。默認會調(diào)用FailoverCluster.join。在FailoverClusterInvoker.doInvoke里,會去選擇具體要調(diào)用哪參機器上的哪個服務(wù)。
      public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
          List<Invoker<T>> copyinvokers = invokers;
          checkInvokers(copyinvokers, invocation);
          int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
          if (len <= 0) {
              len = 1;
          }
          // retry loop.
          RpcException le = null; // last exception.
          List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
          Set<String> providers = new HashSet<String>(len);
          for (int i = 0; i < len; i++) {
              //重試時,進行重新選擇,避免重試時invoker列表已發(fā)生變化.
              //注意:如果列表發(fā)生了變化,那么invoked判斷會失效,因為invoker示例已經(jīng)改變
              if (i > 0) {
                  checkWhetherDestroyed();
                  copyinvokers = list(invocation);
                  //重新檢查一下
                  checkInvokers(copyinvokers, invocation);
              }
              //使用loadbalance負載均衡選擇哪個服務(wù)invoker去調(diào)用
              Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
              invoked.add(invoker);
              RpcContext.getContext().setInvokers((List) invoked);
              try {
                  //調(diào)用執(zhí)行調(diào)用鏈
                  Result result = invoker.invoke(invocation);
                  
                  //省略一些代碼。。。。
      最后真正調(diào)用服務(wù)端的DubboProtocol.requestHandler.reply去處理,
      private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
      
          public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
              if (message instanceof Invocation) {
                  Invocation inv = (Invocation) message;
                  //從DubboExporter里獲取Invoker
                  Invoker<?> invoker = getInvoker(channel, inv);
                  //如果是callback 需要處理高版本調(diào)用低版本的問題
                  if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                     //callback的處理,省略。。。。
                  }
                  RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                  //反射調(diào)用,真正客戶端方法調(diào)用的地方
                  return invoker.invoke(inv);
              }

      處理結(jié)果再通過netty返回到客戶端resutlt中。

      到此,一個服務(wù)調(diào)用過程就基本完成了。與之前的dubbo 源碼學習筆記 (二) —— dubbo發(fā)布服務(wù)的過程相對應(yīng),形成dubbo服務(wù)發(fā)布調(diào)用的整個過程。這里總結(jié)一下整體過程:


      dubbo工作原理分為服務(wù)發(fā)布和服務(wù)引用兩個核心過程。

      1、服務(wù)發(fā)布的時候,DubboProtocol將調(diào)用鏈封裝為DubboExporter,放入到netty服務(wù)端工作線程池中

      2、URL配置信息注冊到zk注冊中心,并注冊override監(jiān)聽,觸發(fā)訂閱。

      3、服務(wù)引用時,也將服務(wù)引用的信息封裝成URL并注冊到zk注冊中心,同時監(jiān)聽category、providers、configurators、routers這四個目錄節(jié)點(才能感知服務(wù)上下線的變化)。將這些信息包裝成DubboInvoker客戶端的調(diào)用鏈,返回代理。

      4、客戶端使用代理進行調(diào)用時,經(jīng)過負載均衡,選擇其中一個服務(wù)的URL,根據(jù)URl信息與netty建立連接,發(fā)送Invocation到netty服務(wù)端;服務(wù)端在工作線程池中找一個線程,處理Invocation,并把RpcResult結(jié)果返回給客戶端;客戶端接收解析RpcResult,獲取處理結(jié)果。

      這樣整個過程就基本結(jié)束。理解這一整個過程,也就相當于了解了dubbo整體的原理。也就能從大的方向把握這一框架,當然,dubbo框架還有很多值得學習研究的地方。在之后的博客中會繼續(xù)分析。

        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多