歡迎訪問我的個人博客休息的風 與dubbo服務(wù)發(fā)布相對的,是引用服務(wù)進行調(diào)用的過程,這個很多步驟都是與服務(wù)發(fā)布相對的,但是也有特有的地方,比如,負載均衡 ,集群容錯等。這篇博客,我們主要關(guān)注dubbo服務(wù)調(diào)用的一個核心過程。 dubbo服務(wù)調(diào)用的主要過程:將調(diào)用信息注冊到zk上-> 通知RegistryDirectory刷新可用服務(wù)列表->刷新過程中,新服務(wù)會與netty服務(wù)端建立連接,并封裝到DubboInvoker中。-> 選擇失敗策略通過負載均衡算法,選擇服務(wù)端具體哪個服務(wù)去執(zhí)行 -> 通過netty返回執(zhí)行結(jié)果。 Dubbo服務(wù)調(diào)用過程圖如下:(看不清,請點擊新的頁簽進行查看) 在ReferenceConfig.init的方法里,會把配置信息封閉成一個map,然后去構(gòu)建一個代理類。在構(gòu)建這個代理類,會用DubboProtocol獲取DubboInvoker。 private T createProxy(Map<String, String> map) { // 省略一些代碼。。。 // 通過注冊中心配置拼裝URL List<URL> us = loadRegistries(false); if (us != null && us.size() > 0) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls == null || urls.size() == 0) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (urls.size() == 1) { //會調(diào)用RegistryProtocol.refer invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); //省略很多代碼 // 創(chuàng)建服務(wù)代理,使用JavassistProxyFactory去創(chuàng)建 return (T) proxyFactory.getProxy(invoker); } 在RegistryProtocol里,會創(chuàng)建一個RegistryDirectory,這個對象保存的調(diào)用服務(wù)的信息,并且也是一個監(jiān)聽。 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { //調(diào)用zk去真實的注冊 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } //最終也是調(diào)用zk去訂閱監(jiān)聽,監(jiān)聽器是RegistryDirectory directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); return cluster.join(directory); } 在真正注冊到zk上,注冊監(jiān)聽器到zk相應(yīng)路徑上,會調(diào)用RegistryDirectory的notify通知方法,去獲取可用的服務(wù)列表。 public synchronized void notify(List<URL> urls) { List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); //初始化值 //省略代碼 // configurators if (configuratorUrls != null && configuratorUrls.size() > 0) { this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && routerUrls.size() > 0) { List<Router> routers = toRouters(routerUrls); if (routers != null) { // null - do nothing setRouters(routers); } } List<Configurator> localConfigurators = this.configurators; // local reference // 合并override參數(shù) this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && localConfigurators.size() > 0) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers 刷新服務(wù)列表 refreshInvoker(invokerUrls); }其中調(diào)用refershInvoker方法時,會去調(diào)用toInvokers把URl列表轉(zhuǎn)換為Invoker列表。 private void refreshInvoker(List<URL> invokerUrls) { if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // 禁止訪問 this.methodInvokerMap = null; // 置空列表 destroyAllInvokers(); // 關(guān)閉所有Invoker } else { this.forbidden = false; // 允許訪問 Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<URL>(); this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls列表,便于交叉對比 } if (invokerUrls.size() == 0) { return; } Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 將URL列表轉(zhuǎn)成Invoker列表 Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker列表 //省略一些代碼 。。。。。。。 } }在將這些url轉(zhuǎn)換為invoker時,會使用DubboProtocol去創(chuàng)建與netty服務(wù)端的連接。 private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>(); if (urls == null || urls.size() == 0) { return newUrlInvokerMap; } Set<String> keys = new HashSet<String>(); String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); for (URL providerUrl : urls) { //封裝url信息 //省略一些代碼 // 緩存key為沒有合并消費端參數(shù)的URL,不管消費端如何合并參數(shù),如果服務(wù)端URL發(fā)生變化,則重新refer Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // 緩存中沒有,重新refer try { boolean enabled = true; if (url.hasParameter(Constants.DISABLED_KEY)) { enabled = !url.getParameter(Constants.DISABLED_KEY, false); } else { enabled = url.getParameter(Constants.ENABLED_KEY, true); } if (enabled) { //服務(wù)可以就使用DubboProtocol去創(chuàng)建連接,封裝DubboInvoker invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); }在DubboProtocol.refer里,會去創(chuàng)建netty客戶端連接。 public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }最后是返回一個DubboInvoker對象,在這之前會調(diào)用getClients先去獲取或創(chuàng)建客戶端連接。是共享連接就獲取之前的加接,不是的話就創(chuàng)建新的連接。 private ExchangeClient initClient(URL url) { //設(shè)置一些參數(shù),比如心跳機制等 // client type setting. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); String version = url.getParameter(Constants.DUBBO_VERSION_KEY); boolean compatible = (version != null && version.startsWith("1.0.")); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); //默認開啟heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); //省略一些代碼 ExchangeClient client; try { //設(shè)置連接應(yīng)該是lazy的 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); } else { //跟服務(wù)發(fā)布相對,具體調(diào)用HeaderExchanger.connect client = Exchangers.connect(url, requestHandler); }在HeaderExchanger里,也是通過具體的NettyTransports連接去創(chuàng)建一個NettyClient,在doOpen方法里,創(chuàng)建netty連接,熟悉netty的應(yīng)該對這段代碼不陌生 protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ClientBootstrap(channelFactory); // config // @see org.jboss.netty.channel.socket.SocketChannelConfig bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("connectTimeoutMillis", getTimeout()); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { //使用適配,用DubboCodec去編碼解碼 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); //工作線程的處理handler pipeline.addLast("handler", nettyHandler); return pipeline; } }); }在RegistryDirectory中,緩存了可用的服務(wù)列表Invoker,之后具體使用哪個服務(wù)去調(diào)用,就看選擇的負載均衡策略了。在RegistryProtocol.doRefer里,會去執(zhí)行“cluster.join(directory)”,這里的cluster看是使用哪種失敗策略。默認會調(diào)用FailoverCluster.join。在FailoverClusterInvoker.doInvoke里,會去選擇具體要調(diào)用哪參機器上的哪個服務(wù)。 public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重試時,進行重新選擇,避免重試時invoker列表已發(fā)生變化. //注意:如果列表發(fā)生了變化,那么invoked判斷會失效,因為invoker示例已經(jīng)改變 if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); //重新檢查一下 checkInvokers(copyinvokers, invocation); } //使用loadbalance負載均衡選擇哪個服務(wù)invoker去調(diào)用 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { //調(diào)用執(zhí)行調(diào)用鏈 Result result = invoker.invoke(invocation); //省略一些代碼。。。。最后真正調(diào)用服務(wù)端的DubboProtocol.requestHandler.reply去處理, private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; //從DubboExporter里獲取Invoker Invoker<?> invoker = getInvoker(channel, inv); //如果是callback 需要處理高版本調(diào)用低版本的問題 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { //callback的處理,省略。。。。 } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); //反射調(diào)用,真正客戶端方法調(diào)用的地方 return invoker.invoke(inv); } 處理結(jié)果再通過netty返回到客戶端resutlt中。 到此,一個服務(wù)調(diào)用過程就基本完成了。與之前的dubbo 源碼學習筆記 (二) —— dubbo發(fā)布服務(wù)的過程相對應(yīng),形成dubbo服務(wù)發(fā)布調(diào)用的整個過程。這里總結(jié)一下整體過程: dubbo工作原理分為服務(wù)發(fā)布和服務(wù)引用兩個核心過程。 1、服務(wù)發(fā)布的時候,DubboProtocol將調(diào)用鏈封裝為DubboExporter,放入到netty服務(wù)端工作線程池中 2、URL配置信息注冊到zk注冊中心,并注冊override監(jiān)聽,觸發(fā)訂閱。 3、服務(wù)引用時,也將服務(wù)引用的信息封裝成URL并注冊到zk注冊中心,同時監(jiān)聽category、providers、configurators、routers這四個目錄節(jié)點(才能感知服務(wù)上下線的變化)。將這些信息包裝成DubboInvoker客戶端的調(diào)用鏈,返回代理。 4、客戶端使用代理進行調(diào)用時,經(jīng)過負載均衡,選擇其中一個服務(wù)的URL,根據(jù)URl信息與netty建立連接,發(fā)送Invocation到netty服務(wù)端;服務(wù)端在工作線程池中找一個線程,處理Invocation,并把RpcResult結(jié)果返回給客戶端;客戶端接收解析RpcResult,獲取處理結(jié)果。 這樣整個過程就基本結(jié)束。理解這一整個過程,也就相當于了解了dubbo整體的原理。也就能從大的方向把握這一框架,當然,dubbo框架還有很多值得學習研究的地方。在之后的博客中會繼續(xù)分析。 |
|
來自: liang1234_ > 《dubbo》