乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Netty入門

       貪挽懶月 2022-06-20 發(fā)布于廣東

      一、netty概述

      「1、NIO存在的問題:」

      • NIO的API比較復(fù)雜,需要熟練掌握3個(gè)核心組件,channel、buffer和selector;
      • 需要熟悉多線程、網(wǎng)絡(luò)編程等技術(shù);
      • 開發(fā)工作量大,難度也比較大,需要解決斷連、重連、網(wǎng)絡(luò)閃斷、半包讀寫、失敗緩存、網(wǎng)絡(luò)擁堵等各種情況;
      • NIO存在bug,一個(gè)叫Epoll的bug,會(huì)導(dǎo)致選擇器空輪詢,形成死循環(huán),最后CPU飆到100%

      正是因?yàn)镹IO存在這些問題,netty就應(yīng)運(yùn)而生了。

      「2、Netty簡(jiǎn)介:」

      netty是一個(gè)異步的,基于事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架??梢钥焖俚亻_發(fā)高性能的服務(wù)器端和客戶端,像dubbo和elasticsearch底層都用了netty。它具有以下優(yōu)點(diǎn):

      • 設(shè)計(jì)優(yōu)雅,靈活可擴(kuò)展;
      • 使用方便,用戶指南清晰明確;
      • 安全,完整的SSL/TLS和StartTLS支持;
      • 社區(qū)活躍,不斷地更新完善

      官方下載地址:https:///netty/downloads 我本次下載的版本是:4.1.51.Final

      二、netty的架構(gòu)設(shè)計(jì)

      「1、線程模型:」

      目前存在的線程模式:

      • 傳統(tǒng)阻塞IO的服務(wù)模型
      • Reactor模式

      根據(jù)Reactor的數(shù)量和1處理資源的線程數(shù)不同,又分3種:

      • 單Reactor單線程;
      • 單Reactor多線程;
      • 主從Reactor多線程

      Netty的線程模型是基于主從Reactor多線程做了改進(jìn)。

      「2、傳統(tǒng)阻塞IO的線程模型:」

      采用阻塞IO獲取輸入的數(shù)據(jù),每個(gè)連接都需要獨(dú)立的線程來(lái)處理邏輯。存在的問題就是,當(dāng)并發(fā)數(shù)很大時(shí),就需要?jiǎng)?chuàng)建很多的線程,占用大量的資源。連接創(chuàng)建后,如果當(dāng)前線程沒有數(shù)據(jù)可讀,該線程將會(huì)阻塞在讀數(shù)據(jù)的方法上,造成線程資源浪費(fèi)。

      「3、Reactor模式(分發(fā)者模式/反應(yīng)器模式/通知者模式):」

      針對(duì)傳統(tǒng)阻塞IO的模型,做了以下兩點(diǎn)改進(jìn):

      • 基于IO復(fù)用模型:多個(gè)客戶端共用一個(gè)阻塞對(duì)象,而不是每個(gè)客戶端都對(duì)應(yīng)一個(gè)阻塞對(duì)象
      • 基于線程池復(fù)用線程資源:使用了線程池,而不是每來(lái)一個(gè)客戶端就創(chuàng)建一個(gè)線程

      Reactor模式的核心組成:

      • Reactor:Reactor就是多個(gè)客戶端共用的那一個(gè)阻塞對(duì)象,它單獨(dú)起一個(gè)線程運(yùn)行,負(fù)責(zé)監(jiān)聽和分發(fā)事件,將請(qǐng)求分發(fā)給適當(dāng)?shù)奶幚沓绦騺?lái)進(jìn)行處理
      • Handler:處理程序要完成的實(shí)際事件,也就是真正執(zhí)行業(yè)務(wù)邏輯的程序,它是非阻塞的

      「4、單Reactor單線程:」

      模型圖

      這個(gè)圖其實(shí)就跟之前的NIO群聊系統(tǒng)對(duì)應(yīng)。多個(gè)客戶端請(qǐng)求連接,然后Reactor通過(guò)selector輪詢判斷哪些通道是有事件發(fā)生的,如果是連接事件,就到了Acceptor中建立連接;如果是其他讀寫事件,就有dispatch分發(fā)到對(duì)應(yīng)的handler中進(jìn)行處理。這種模式的缺點(diǎn)就是Reactor和Handler是在一個(gè)線程中的,如果Handler阻塞了,那么程序就阻塞了。

      「5、單Reactor多線程:」

      單reactor多線程

      處理流程如下:

      • Reactor對(duì)象通過(guò)Selector監(jiān)聽客戶端請(qǐng)求事件,通過(guò)dispatch進(jìn)行分發(fā);
      • 如果是連接事件,則由Acceptor通過(guò)accept方法處理連接請(qǐng)求,然后創(chuàng)建一個(gè)Handler對(duì)象響應(yīng)事件;
      • 如果不是連接請(qǐng)求,則由Reactor對(duì)象調(diào)用對(duì)應(yīng)handler對(duì)象進(jìn)行處理;handler只響應(yīng)事件,不做具體的業(yè)務(wù)處理,它通過(guò)read方法讀取數(shù)據(jù)后,會(huì)分發(fā)給線程池的某個(gè)線程進(jìn)行業(yè)務(wù)處理,并將處理結(jié)果返回給handler;
      • handler收到響應(yīng)后,通過(guò)send方法將結(jié)果返回給client。

      相比單Reactor單線程,這里將業(yè)務(wù)處理的事情交給了不同的線程去做,發(fā)揮了多核CPU的性能。但是Reactor只有一個(gè),所有事件的監(jiān)聽和響應(yīng),都由一個(gè)Reactor去完成,并發(fā)性還是不好。

      「6、主從Reactor多線程:」

      主從reactor多線程

      這個(gè)模型相比單reactor多線程的區(qū)別就是:專門搞了一個(gè)MainReactor來(lái)處理連接事件,如果不是連接事件,就分發(fā)給SubReactor進(jìn)行處理。圖中這個(gè)SubReactor只有一個(gè),其實(shí)是可以有多個(gè)的,所以性能就上去了。

      • 優(yōu)點(diǎn):父線程與子線程的交互簡(jiǎn)單、職責(zé)明確,父線程負(fù)責(zé)接收連接,子線程負(fù)責(zé)完成后續(xù)的業(yè)務(wù)處理;
      • 缺點(diǎn):編程復(fù)雜度高

      「7、netty的模型:」

      netty模型是基于主從Reactor多線程模型設(shè)計(jì)的,其工作流程如下:

      • Netty有兩組線程池,一個(gè)Boss Group,它專門負(fù)責(zé)客戶端連接,另一個(gè)Work Group,專門負(fù)責(zé)網(wǎng)絡(luò)讀寫;
      • Boss Group和Work Group的類型都是NIOEventLoopGroup;
      • NIOEventLoopGroup相當(dāng)于一個(gè)事件循環(huán)組,這個(gè)組包含了多個(gè)事件循環(huán),每一個(gè)循環(huán)都是NIOEventLoop;
      • NIOEventLoop表示一個(gè)不斷循環(huán)執(zhí)行處理任務(wù)的線程,每個(gè)NIOEventLoop都有一個(gè)Selector,用于監(jiān)聽綁定在其上的socket;
      • Boss Group下的每個(gè)NIOEventLoop的執(zhí)行步驟有3步:(1). 輪詢accept連接事件;(2). 處理accept事件,與client建立連接,生成一個(gè)NioSocketChannel,并將其注冊(cè)到某個(gè)work group下的NioEventLoop的selector上;(3). 處理任務(wù)隊(duì)列的任務(wù),即runAllTasks;
      • 每個(gè)Work Group下的NioEventLoop循環(huán)執(zhí)行以下步驟:(1). 輪詢r(jià)ead、write事件;(2). 處理read、write事件,在對(duì)應(yīng)的NioSocketChannel處理;(3). 處理任務(wù)隊(duì)列的任務(wù),即runAllTasks;
      • 每個(gè)Work Group下的NioEventLoop在處理業(yè)務(wù)時(shí),會(huì)使用pipeline(管道),pipeline中包含了channel,即通過(guò)pipeline可以獲取到對(duì)應(yīng)的channel,pipeline中維護(hù)了很多的處理器。


      netty模型圖如下,對(duì)應(yīng)了上面那段流程:

      三、netty入門實(shí)例

      使用netty創(chuàng)建一個(gè)服務(wù)端與客戶端,監(jiān)聽6666端口。

      「1、服務(wù)端:」

      • NettyServer:
      public class NettyServer {

       public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建boss group (boss group和work group含有的子線程數(shù)默認(rèn)是cpu數(shù) * 2)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 2. 創(chuàng)建work group
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
         // 3. 創(chuàng)建服務(wù)端啟動(dòng)對(duì)象
         ServerBootstrap bootstrap = new ServerBootstrap();
         // 4. 配置啟動(dòng)參數(shù)
         bootstrap.group(bossGroup, workGroup) // 設(shè)置兩個(gè)線程組
                  .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為服務(wù)器的通道
                  .option(ChannelOption.SO_BACKLOG, 128) // 設(shè)置線程隊(duì)列等待連接個(gè)數(shù)
                  .childOption(ChannelOption.SO_KEEPALIVE, true) // 設(shè)置保持活動(dòng)連接狀態(tài)
                  .childHandler(new ChannelInitializer<SocketChannel>() { // 創(chuàng)建通道初始化對(duì)象
                  // 給pipeline設(shè)置處理器
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
             // 傳入自定義的handler
             sc.pipeline().addLast(new NettyServerHandler());
            } 
           });
         // 5. 啟動(dòng)服務(wù)器并綁定端口
         ChannelFuture cf = bootstrap.bind(6666).sync();
         // 6. 對(duì)關(guān)閉通道進(jìn)行監(jiān)聽
         cf.channel().closeFuture().sync();
        } finally {
         bossGroup.shutdownGracefully();
         workGroup.shutdownGracefully();
        }
        
       }
      }
      • NettyServerHandler:
      public class NettyServerHandler extends ChannelInboundHandlerAdapter{
       
       // 讀取數(shù)據(jù)
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("接收到客戶端消息:" + buf.toString(CharsetUtil.UTF_8));
       }
       
       // 數(shù)據(jù)讀取完畢后
       @Override
       public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,我是服務(wù)端", CharsetUtil.UTF_8));
       }
       
       // 處理異常
       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
       }
      }

      「2、客戶端:」

      • NettyClient:
      public class NettyClient {

       public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建事件循環(huán)組
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
         // 2. 創(chuàng)建啟動(dòng)對(duì)象
         Bootstrap bootstrap = new Bootstrap();
         // 3. 設(shè)置相關(guān)參數(shù)
         bootstrap.group(eventLoopGroup) // 設(shè)置線程組
                     .channel(NioSocketChannel.class) // 設(shè)置通道
                     .handler(new ChannelInitializer<SocketChannel>() {
                           // 給pipeline設(shè)置處理器
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
             sc.pipeline().addLast(new NettyClientHandler());
            }
           });
         // 4. 連接服務(wù)端
         ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
         // 5. 監(jiān)聽通道關(guān)閉
         channelFuture.channel().closeFuture().sync();
        } finally {
         eventLoopGroup.shutdownGracefully();
        }
        
       }
      }
      • NettyClientHandler:
      public class NettyClientHandler extends ChannelInboundHandlerAdapter{

          // 通道就緒就被觸發(fā)
       @Override
       public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client:" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,我是客戶端", CharsetUtil.UTF_8));
       }
       
       // 讀取數(shù)據(jù)
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("接收到服務(wù)端消息:" + buf.toString(CharsetUtil.UTF_8));
       }
       
       // 處理異常
       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
       }
      }

      先啟動(dòng)服務(wù)端,然后啟動(dòng)客戶端,就能看到服務(wù)端和客戶端在控制臺(tái)打印出來(lái)的消息了。

      「3、TaskQueue自定義任務(wù):」

      上面服務(wù)端的NettyServerHandler的channelRead方法中,假如有一個(gè)非常耗時(shí)的業(yè)務(wù),那么就會(huì)阻塞在那里,直到業(yè)務(wù)執(zhí)行完。比如將NettyServerHandler的channelRead方法改成下面這樣:

      // 讀取數(shù)據(jù)
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       // 線程休眠10秒,模擬耗時(shí)業(yè)務(wù)
       TimeUnit.SECONDS.sleep(10);
       ByteBuf buf = (ByteBuf) msg;
       System.out.println("接收到客戶端消息:" + buf.toString(CharsetUtil.UTF_8));
      }

      啟動(dòng)后會(huì)發(fā)現(xiàn),10秒鐘后,服務(wù)端才會(huì)收到客戶端發(fā)送的消息,客戶端也要10秒后,才會(huì)接收到服務(wù)端的消息,即服務(wù)端的channelReadComplete方法是要在channelRead方法執(zhí)行完后才會(huì)執(zhí)行的。

      一直阻塞在那里也不是辦法,希望可以異步執(zhí)行,那我們就可以把該任務(wù)提交到該channel對(duì)應(yīng)的NioEventLoop的TaskQueue中。有以下解決方案:

      • 用戶程序自定義的普通任務(wù):將channelRead方法改成下面這樣:
      // 讀取數(shù)據(jù)
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       ctx.channel().eventLoop().execute(new Runnable() {
        @Override
        public void run() {
         try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) { e.printStackTrace();}
         ByteBuf buf = (ByteBuf) msg;
         System.out.println("接收到客戶端消息:" + buf.toString(CharsetUtil.UTF_8));
        }
       });
      }

      這里仍舊休眠10秒。啟動(dòng)服務(wù)端,再啟動(dòng)客戶端,發(fā)現(xiàn)服務(wù)端要10s后才會(huì)打印出客戶端發(fā)送的消息,但是客戶端立刻就可以收到服務(wù)端在channelReadComplete方法里發(fā)送的消息,說(shuō)明這次是異步的。

      • 用戶自定義定時(shí)任務(wù):與上面的區(qū)別不大,代碼如下:
      // 讀取數(shù)據(jù)
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       ctx.channel().eventLoop().schedule(new Runnable() {
        @Override
        public void run() {
         try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) { e.printStackTrace();}
         ByteBuf buf = (ByteBuf) msg;
         System.out.println("接收到客戶端消息:" + buf.toString(CharsetUtil.UTF_8));
        }
       }, 5, TimeUnit.SECONDS);
      }

      啟動(dòng)服務(wù)端,然后啟動(dòng)客戶端,發(fā)現(xiàn)客戶端還是會(huì)立即收到服務(wù)端發(fā)出的消息,而服務(wù)端,首先要等待5秒才去執(zhí)行run方法,run方法里面線程又休眠了10秒,所以總共要15秒后才會(huì)打印出客戶端發(fā)送的消息。

      • 非當(dāng)前Reactor線程調(diào)用channel的各種方法:這個(gè)的意思就是,如果我別的業(yè)務(wù)代碼,比如消息推送系統(tǒng),也想給客戶端發(fā)送消息,該咋整?其實(shí)很簡(jiǎn)單,在NettyServerHandler的channelRead方法里,我們是通過(guò)ChannelHandlerContext 拿到Channel然后進(jìn)行各種操作的,所以拿到了Channel就可以進(jìn)行操作。那么可以在NettyServer中將Channel保存到集合中去,然后遍歷集合,拿到Channel就可以進(jìn)行操作了。
      // 給pipeline設(shè)置處理器
      @Override
      protected void initChannel(SocketChannel sc) throws Exception {
       // 傳入自定義的handler
       sc.pipeline().addLast(new NettyServerHandler());
       // 在這里,可以將SocketChannel sc保存到集合中,別的線程拿到集合就可以調(diào)用channel的方法了

      四、Netty的異步模型

      「1、基本介紹:」

      • Netty中的I/O操作都是異步的,包括bind、write和connect。這些操作會(huì)返回一個(gè)ChannelFuture對(duì)象,而不會(huì)立即返回操作結(jié)果。
      • 調(diào)用者不能立即得到返回結(jié)果,而是通過(guò)Futrue-Listener機(jī)制,用戶可以主動(dòng)獲取或者通過(guò)通知機(jī)制獲得IO操作的結(jié)果。
      • Netty的異步是建立在future和callback之上的。callback是回調(diào),future表示異步執(zhí)行的結(jié)果,它的核心思想是:假設(shè)有個(gè)方法fun(),計(jì)算過(guò)程可能非常耗時(shí),等待fun()返回要很久,那么可以在調(diào)用fun()的時(shí)候,立馬返回一個(gè)future,后續(xù)通過(guò)future去監(jiān)控fun()方法的處理過(guò)程,這就是future-listener機(jī)制。
      • 用戶可以通過(guò)注冊(cè)監(jiān)聽函數(shù),來(lái)獲取操作真正的結(jié)果,ChannelFuture常用的函數(shù)如下:
      // 判斷當(dāng)前操作是否完成
      isDone
      // 判斷當(dāng)前操作是否成功
      isSuccess
      // 獲取操作失敗的原因
      getCause
      // 判斷當(dāng)前操作是否被取消
      isCancelled
      // 注冊(cè)監(jiān)聽器
      addListener

      「2、使用監(jiān)聽器:」

      在NettyServer中的“啟動(dòng)并綁定端口”下面加上如下代碼:

      // 5. 啟動(dòng)服務(wù)器并綁定端口
      ChannelFuture cf = bootstrap.bind(6666).sync();
      // 注冊(cè)監(jiān)聽器
      cf.addListener(new ChannelFutureListener() {
       @Override
       public void operationComplete(ChannelFuture cf) throws Exception {
        if (cf.isSuccess()) {
         System.out.println("綁定端口成功");
        } else {
         System.out.println("綁定端口失敗");
        }
       }
      });

      這樣就注冊(cè)了監(jiān)聽器,會(huì)監(jiān)聽綁定端口的結(jié)果,如果成功了,就會(huì)打印出綁定成功這段話。

      五、使用Netty開發(fā)Http服務(wù)

      開發(fā)一個(gè)Netty服務(wù)端,監(jiān)聽80端口,瀏覽器訪問localhost,可以返回信息給瀏覽器。代碼如下:

      • NettyHttpServer:
      public class NettyHttpServer {

       public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建boss group (boss group和work group含有的子線程數(shù)默認(rèn)是cpu數(shù) * 2)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 2. 創(chuàng)建work group
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
         // 3. 創(chuàng)建服務(wù)端啟動(dòng)對(duì)象
         ServerBootstrap bootstrap = new ServerBootstrap();
         // 4. 配置啟動(dòng)參數(shù)
         bootstrap.group(bossGroup, workGroup) // 設(shè)置兩個(gè)線程組
           .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為服務(wù)器的通道
           .childHandler(new NettyHttpServerInitializer());
         // 5. 啟動(dòng)服務(wù)器并綁定端口
         ChannelFuture cf = bootstrap.bind(80).sync();
         // 6. 對(duì)關(guān)閉通道進(jìn)行監(jiān)聽
         cf.channel().closeFuture().sync();
        } finally {
         bossGroup.shutdownGracefully();
         workGroup.shutdownGracefully();
        }
       }

      }
      • NettyHttpServerInitializer:
      public class NettyHttpServerInitializer extends ChannelInitializer<SocketChannel> {

       // 向管道加入處理器
       @Override
       protected void initChannel(SocketChannel sc) throws Exception {
        // 1. 得到管道
        ChannelPipeline pipeline = sc.pipeline();
        // 2. 加入一個(gè)編碼器解碼器
        pipeline.addLast("codec", new HttpServerCodec());
        // 3. 增加一個(gè)自定義的handler處理器
        pipeline.addLast("handler", new NettyHttpServerHandler());
       }
      }
      • NettyHttpServerHandler:
      public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject>{

       // 讀取數(shù)據(jù)
       @Override
       protected void channelRead0(ChannelHandlerContext chc, HttpObject msg) throws Exception {
        // 1. 判斷msg是不是httpRequest請(qǐng)求
        if (msg instanceof HttpRequest) {
         System.out.println("msg類型:" + msg.getClass());
         System.out.println("客戶端地址:" + chc.channel().remoteAddress());
         // 對(duì)特定資源不做響應(yīng)
         HttpRequest httpRequest = (HttpRequest) msg;
         URI uri = new URI(httpRequest.uri());
         if ("/favicon.ico".equals(uri.getPath())) {
          System.out.println("請(qǐng)求了圖標(biāo),不做響應(yīng)");
          return;
         }
         // 2. 創(chuàng)建回復(fù)給瀏覽器的信息
         ByteBuf content = Unpooled.copiedBuffer("hello, 我是服務(wù)器,are you ok?", CharsetUtil.UTF_8);
         // 3. 構(gòu)造http響應(yīng)
         FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
         response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
         response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
         // 4. 將response返回
         chc.writeAndFlush(response);
        }
       }
      }

      啟動(dòng)server后,在瀏覽器就訪問localhost就可以返回相關(guān)內(nèi)容了。

      -java開發(fā)那些事-

        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請(qǐng)遵守用戶 評(píng)論公約

        類似文章 更多