乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      聊聊java高并發(fā)系統(tǒng)之異步非阻塞

       流曲頻陽 2017-01-23

      聊聊java高并發(fā)系統(tǒng)之異步非阻塞

      在做電商系統(tǒng)時,流量入口如首頁、活動頁、商品詳情頁等系統(tǒng)承載了網(wǎng)站的大部分流量,而這些系統(tǒng)的主要職責(zé)包括聚合數(shù)據(jù)拼裝模板、熱點統(tǒng)計、緩存、下游功能降級開關(guān)、托底數(shù)據(jù)等等。其中聚合數(shù)據(jù)需要調(diào)用其它多個系統(tǒng)服務(wù)獲取數(shù)據(jù)、拼裝數(shù)據(jù)/模板然后返回給前端,聚合數(shù)據(jù)來源主要有依賴系統(tǒng)/服務(wù)、緩存、數(shù)據(jù)庫等;而系統(tǒng)之間的調(diào)用可以通過如http接口調(diào)用(如HttpClient)、SOA服務(wù)調(diào)用(如dubbo、thrift)等等。

      在Java中,如使用Tomcat,一個請求會分配一個線程進行請求處理,該線程負責(zé)獲取數(shù)據(jù)、拼裝數(shù)據(jù)或模板然后返回給前端;在同步調(diào)用獲取數(shù)據(jù)接口的情況下(等待依賴系統(tǒng)返回數(shù)據(jù)),整個線程是一直被占用并阻塞的。如果有大量的這種請求,每個請求占用一個線程,但線程一直處于阻塞,降低了系統(tǒng)的吞吐量,這將導(dǎo)致應(yīng)用的吞吐量下降;我們希望在調(diào)用依賴的服務(wù)響應(yīng)比較慢,此時應(yīng)該讓出線程和CPU來處理下一個請求,當(dāng)依賴的服務(wù)返回了再分配相應(yīng)的線程來繼續(xù)處理。而這應(yīng)該有更好的解決方案:異步/協(xié)程。而Java是不支持協(xié)程的(雖然有些Java框架說支持,但還是高層API的封裝),因此在Java中我們還可以使用異步來提升吞吐量。目前java一些開源框架(HttpClient\HttpAsyncClient、dubbo、thrift等等)大部分都支持。

      幾種調(diào)用方式

      同步阻塞調(diào)用

      即串行調(diào)用,響應(yīng)時間為所有服務(wù)的響應(yīng)時間總和;

      半異步(異步Future)

      線程池,異步Future,使用場景:并發(fā)請求多服務(wù),總耗時為最長響應(yīng)時間;提升總響應(yīng)時間,但是阻塞主請求線程,高并發(fā)時依然會造成線程數(shù)過多,CPU上下文切換;

      全異步(Callback)

      Callback方式調(diào)用,使用場景:不考慮回調(diào)時間且只能對結(jié)果做簡單處理,如果依賴服務(wù)是兩個或兩個以上服務(wù),則不能合并兩個服務(wù)的處理結(jié)果;不阻塞主請求線程,但使用場景有限。

      異步回調(diào)鏈?zhǔn)骄幣?/strong>

      異步回調(diào)鏈?zhǔn)骄幣?JDK8 CompletableFuture),使用場景:其實不是異步調(diào)用方式,只是對依賴多服務(wù)的Callback調(diào)用結(jié)果處理做結(jié)果編排,來彌補Callback的不足,從而實現(xiàn)全異步鏈?zhǔn)秸{(diào)用。

      接下來看看如何設(shè)計利用全異步Callback調(diào)用和異步回調(diào)鏈?zhǔn)骄幣盘幚斫Y(jié)果來實現(xiàn)全異步系統(tǒng)設(shè)計。

      同步阻塞調(diào)用

      1. public class Test { 
      2.    public static void main(String[] args) throws Exception { 
      3.        RpcService rpcService = new RpcService(); 
      4.        HttpService httpService = new HttpService(); 
      5.        //耗時10ms 
      6.        Map<String, String> result1 = rpcService.getRpcResult(); 
      7.        //耗時20ms 
      8.        Integer result2 = httpService.getHttpResult(); 
      9.        //總耗時30ms 
      10.     } 
      11.    static class RpcService { 
      12.        Map<String, String> getRpcResult() throws Exception { 
      13.            //調(diào)用遠程方法(遠程方法耗時約10ms,可以使用Thread.sleep模擬) 
      14.        } 
      15.     } 
      16.    static class HttpService { 
      17.        Integer getHttpResult() throws Exception { 
      18.            //調(diào)用遠程方法(遠程方法耗時約20ms,可以使用Thread.sleep模擬) 
      19.            Thread.sleep(20); 
      20.            return 0; 
      21.        } 
      22.     } 

      半異步(異步Future)

      1. public class Test { 
      2.    final static ExecutorService executor = Executors.newFixedThreadPool(2); 
      3.    public static void main(String[] args) { 
      4.        RpcService rpcService = new RpcService(); 
      5.        HttpService httpService = new HttpService(); 
      6.        Future<Map<String, String>> future1 = null
      7.        Future<Integer> future2 = null
      8.        try { 
      9.            future1 = executor.submit(() -> rpcService.getRpcResult()); 
      10.            future2 = executor.submit(() -> httpService.getHttpResult()); 
      11.            //耗時10ms 
      12.            Map<String, String> result1 = future1.get(300, TimeUnit.MILLISECONDS); 
      13.            //耗時20ms 
      14.            Integer result2 = future2.get(300, TimeUnit.MILLISECONDS); 
      15.            //總耗時20ms 
      16.        } catch (Exception e) { 
      17.            if (future1 != null) { 
      18.                 future1.cancel(true); 
      19.            } 
      20.            if (future2 != null) { 
      21.                 future2.cancel(true); 
      22.            } 
      23.            throw new RuntimeException(e); 
      24.        } 
      25.     } 
      26.    static class RpcService { 
      27.        Map<String, String> getRpcResult() throws Exception { 
      28.            //調(diào)用遠程方法(遠程方法耗時約10ms,可以使用Thread.sleep模擬) 
      29.        } 
      30.     } 
      31.    static class HttpService { 
      32.        Integer getHttpResult() throws Exception { 
      33.            //調(diào)用遠程方法(遠程方法耗時約20ms,可以使用Thread.sleep模擬) 
      34.        } 
      35.     } 
      36.   

      全異步(Callback)

      1. public class AsyncTest { 
      2. public staticHttpAsyncClient httpAsyncClient; 
      3.    public static CompletableFuture<String> getHttpData(String url) { 
      4.        CompletableFuture asyncFuture = new CompletableFuture(); 
      5.        HttpPost post = new HttpPost(url); 
      6.        HttpAsyncRequestProducer producer = HttpAsyncMethods.create(post); 
      7.        AsyncCharConsumer<HttpResponse> consumer = newAsyncCharConsumer<HttpResponse>() { 
      8.             HttpResponse response; 
      9.            protected HttpResponse buildResult(final HttpContext context) { 
      10.                 return response; 
      11.            } 
      12. …... 
      13.        }; 
      14.        FutureCallback callback = new FutureCallback<HttpResponse>() { 
      15.            public void completed(HttpResponse response) { 
      16.                asyncFuture.complete(EntityUtils.toString(response.getEntity())); 
      17.            } 
      18. …... 
      19.        }; 
      20.        httpAsyncClient.execute(producer, consumer, callback); 
      21.        return asyncFuture; 
      22.     } 
      23.   
      24.    public static void main(String[] args) throws Exception { 
      25.        AsyncTest.getHttpData("http://www.jd.com"); 
      26.        Thread.sleep(1000000); 
      27.     } 

      本示例使用HttpAsyncClient演示。

      異步回調(diào)鏈?zhǔn)骄幣?/strong>

      CompletableFuture提供了50多個API,可以滿足所需的各種場景的異步處理的編排,在此列舉三個場景:

      場景1:三個服務(wù)并發(fā)異步調(diào)用,返回CompletableFuture,不阻塞主線程;

      三個服務(wù)并發(fā)異步調(diào)用,返回CompletableFuture

      方法test1:

      1. public static void test1() throws Exception { 
      2.       HelloClientDemoTest service = new HelloClientDemoTest(); 
      3.       /** 
      4.        * 場景1 兩個以上服務(wù)并發(fā)異步調(diào)用,返回CompletableFuture,不阻塞主線程 
      5.        * 并且兩個服務(wù)也是異步非阻塞調(diào)用 
      6.        */ 
      7.       CompletableFuture future1 = service.getHttpData("http://www.jd.com"); 
      8.       CompletableFuture future2 = service.getHttpData("http://www.jd.com"); 
      9.       CompletableFuture future3 =service.getHttpData("http://www.jd.com"); 
      10.       List<CompletableFuture> futureList = Lists.newArrayList(future1,future2, future3); 
      11.       CompletableFuture<Void> allDoneFuture =CompletableFuture.allOf(futureList.toArray(newCompletableFuture[futureList.size()])); 
      12.       CompletableFuture<String> future4 =allDoneFuture.thenApply(v -> { 
      13.            List<Object> result =futureList.stream().map(CompletableFuture::join) 
      14.                   .collect(Collectors.toList()); 
      15.            //注意順序 
      16.            String result1 = (String)result.get(0); 
      17.            String result2 = (String)result.get(1); 
      18.            String result3 = (String)result.get(2); 
      19.            //處理業(yè)務(wù).... 
      20.            return result1 + result2 + result3; 
      21.        }).exceptionally(e -> { 
      22.            //e.printStackTrace(); 
      23.            return ""; 
      24.        }); 
      25.       //返回 
      26.    } 

      場景2、兩個服務(wù)并發(fā)異步調(diào)用,返回CompletableFuture,不阻塞主線程;

      兩個服務(wù)并發(fā)異步調(diào)用,返回CompletableFuture

      方法test2:

      1. public void test2() throws Exception { 
      2.       HelloClientDemoTest service = new HelloClientDemoTest(); 
      3.       /** 
      4.        * 場景2 兩個接口并發(fā)異步調(diào)用,返回CompletableFuture,不阻塞主線程 
      5.        * 并且兩個服務(wù)也是異步非阻塞調(diào)用 
      6.        */ 
      7.       CompletableFuture future1 = service.getHttpData("http://www.jd.com"); 
      8.       CompletableFuture future2 =service.getHttpData("http://www.jd.com"); 
      9.       CompletableFuture future3 =future1.thenCombine(future2, (f1, f2) -> { 
      10.            //處理業(yè)務(wù).... 
      11.            return f1 + "," + f2; 
      12.        }).exceptionally(e -> { 
      13.            return ""; 
      14.        }); 
      15.       //返回 
      16.    } 

      場景3、兩個服務(wù),并發(fā)異步調(diào)用兩個服務(wù),并且一個服務(wù)的結(jié)果返回后再次調(diào)用另一服務(wù),然后將三個結(jié)果后并處理,返回CompletableFuture,整個處理過程中不阻塞任何線程;

      方法test3:

      1. publicvoid test3() throws Exception { 
      2.        HelloClientDemoTest service = new HelloClientDemoTest(); 
      3.        /** 
      4.         * 場景3 兩請求依賴調(diào)用,然后與另一服務(wù)結(jié)果組合處理,返回CompletableFuture,不阻塞主線程 
      5.         * 并且兩個服務(wù)也是異步非阻塞調(diào)用 
      6.         */ 
      7.         CompletableFuture future1 = service.getHttpData("http://www.jd.com"); 
      8.         CompletableFuture future2 = service.getHttpData("http://www.jd.com"); 
      9.         CompletableFuture<String> future3future1.thenApply((param) -> { 
      10.             CompletableFuture future4 =service.getHttpData("http://www.jd.com"); 
      11.             return future4; 
      12.         }); 
      13.         CompletableFuture future5 =future2.thenCombine(future3, (f2, f3) -> { 
      14.             //....處理業(yè)務(wù) 
      15.             return f2 + "," + f3; 
      16.         }).exceptionally(e -> { 
      17.             return ""; 
      18.         }); 
      19.         //返回future5 
      20.     } 

      全異步Web系統(tǒng)設(shè)計

      主要技術(shù):servlet3,JDK8 CompletableFuture,支持異步Callback調(diào)用的RPC框架。

      先看一下處理流程圖:

      全異步Web系統(tǒng)設(shè)計主要技術(shù)處理流程圖

      servlet3:Servlet 接收到請求之后,可能首先需要對請求攜帶的數(shù)據(jù)進行一些預(yù)處理;接著,Servlet 線程將請求轉(zhuǎn)交給一個異步線程來執(zhí)行業(yè)務(wù)處理,線程本身返回至容器。針對業(yè)務(wù)處理較耗時的情況,這將大大減少服務(wù)器資源的占用,并且提高并發(fā)處理速度。servlet3可參考商品詳情頁系統(tǒng)的Servlet3異步化實踐,結(jié)合其中講解的servlet3整合:

      1. public void submitFuture(finalHttpServletRequest req, final Callable<CompletableFuture> task) throwsException{ 
      2.        final String uri = req.getRequestURI(); 
      3.        final Map<String, String[]> params = req.getParameterMap(); 
      4.        final AsyncContext asyncContext = req.startAsync(); 
      5.        asyncContext.getRequest().setAttribute("uri", uri); 
      6.        asyncContext.getRequest().setAttribute("params", params); 
      7.        asyncContext.setTimeout(asyncTimeoutInSeconds * 1000); 
      8.        if(asyncListener != null) { 
      9.            asyncContext.addListener(asyncListener); 
      10.        } 
      11.        CompletableFuture future = task.call(); 
      12.        future.thenAccept(result -> { 
      13.            HttpServletResponse resp = (HttpServletResponse)asyncContext.getResponse(); 
      14.            try { 
      15.                 if(result instanceof String) { 
      16.                     byte[] bytes = new byte[0]; 
      17.                     if (StringUtils.isBlank(result)){ 
      18.                        resp.setContentType("text/html;charset=gbk"); 
      19.                        resp.setContentLength(0); 
      20.                     } else { 
      21.                         bytes =result.getBytes("GBK"); 
      22.                     } 
      23.                    //resp.setBufferSize(bytes.length); 
      24.                    resp.setContentType("text/html;charset=gbk"); 
      25.                    if(StringUtils.isNotBlank(localIp)) { 
      26.                        resp.setHeader("t.ser", localIp); 
      27.                     } 
      28.                    resp.setContentLength(bytes.length); 
      29.                    resp.getOutputStream().write(bytes); 
      30.                 } else { 
      31.                     write(resp,JSONUtils.toJSON(result)); 
      32.                 } 
      33.            } catch (Throwable e) { 
      34.                resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); //程序內(nèi)部錯誤 
      35.                 try { 
      36.                     LOG.error("get infoerror, uri : {},  params : {}", uri,JSONUtils.toJSON(params), e); 
      37.                 } catch (Exception ex) { 
      38.                 } 
      39.            } finally { 
      40.                 asyncContext.complete(); 
      41.            } 
      42.        }).exceptionally(e -> { 
      43.            asyncContext.complete(); 
      44.            return null; 
      45.        }); 

      另外還有Java中協(xié)程庫Quasar,可參考《Java的纖程庫 - Quasar》,目前沒有在應(yīng)用中使用并在測試FiberHttpServlet的時候遇到很多坑,日后把Quasar自如運用后形成日記,希望能結(jié)實更多的朋友一起研究,踩坑。

      作者:孫偉,目前負責(zé)京東商品詳情頁統(tǒng)一服務(wù)系統(tǒng),寫過java,寫過ngx_lua,還寫過storm等,喜歡學(xué)習(xí)研究新事物。

      【本文來自51CTO專欄作者張開濤的微信公眾號(開濤的博客),公眾號id: kaitao-1234567】

       戳這里,看該作者更多好文

      【編輯推薦】

      【責(zé)任編輯:IT瘋 TEL:(010)68476606】

      點贊 0

        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多