乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      netty案例,netty4.1源碼分析篇六《Netty異步架構(gòu)監(jiān)聽類Promise源碼分析》

       小傅哥 2021-12-13

      前言介紹

      分析Promise之前我們先來看兩個(gè)單詞;Promise、Future

      Promise v. 許諾;承諾;答應(yīng);保證;使很可能;預(yù)示
      Future n. 將來;未來;未來的事;將來發(fā)生的事;前景;前途;前程

      他們的含義都是對(duì)未來即將要發(fā)生的事情做相應(yīng)的處理,這也是在異步編程中非常常見的類名。

      Netty是一個(gè)異步網(wǎng)絡(luò)處理框架,在實(shí)現(xiàn)中大量使用了Future機(jī)制,并在Java自帶Future的基礎(chǔ)上,增加了Promise機(jī)制。這兩個(gè)實(shí)現(xiàn)類的目的都是為了使異步編程更加方便使用。

      源碼分析

      1、了解Java并發(fā)包中的Future

      java的并發(fā)包中提供java.util.concurrent.Future類,用于處理異步操作。在Java中Future是一個(gè)未來完成的異步操作,可以獲得未來返回的值。如下案例,調(diào)用一個(gè)獲取用戶信息的方法,該方法會(huì)立刻返回Future對(duì)象,調(diào)用Future.get()可以同步等待耗時(shí)方法的返回,也可以通過調(diào)用future的cancel()取消Future任務(wù)。

      1class TestFuture {
      2
      3    public static void main(String[] args) throws ExecutionException, InterruptedException {
      4        TestFuture testFuture = new TestFuture();
      5        Future<String> future = testFuture.queryUserInfo("10001"); //返回future
      6        String userInfo = future.get();
      7        System.out.println("查詢用戶信息:" + userInfo);
      8    }
      9
      10    private Future<String> queryUserInfo(String userId) {
      11        FutureTask<String> future = new FutureTask<>(() -> {
      12            try {
      13                Thread.sleep(1000);
      14                return "微信公眾號(hào):bugstack蟲洞棧 | 用戶ID:" + userId;
      15            } catch (InterruptedException ignored) {}
      16            return "error";
      17        });
      18        new Thread(future).start();
      19        return future;
      20    }
      21
      22}

      2、Netty實(shí)現(xiàn)了自己的Future

      Netty通過繼承java并發(fā)包的Future來定義自己的Future接口,為Future加入的功能主要有添加、刪除監(jiān)聽事件接口,最后由Promise實(shí)現(xiàn)。

      io.netty.util.concurrent.Future.java中定義了一些列的異步編程方法 | 經(jīng)常會(huì)使用的>b.bind(port).sync();

      1// 只有IO操作完成時(shí)才返回true
      2boolean isSuccess();
      3// 只有當(dāng)cancel(boolean)成功取消時(shí)才返回true
      4boolean isCancellable();
      5// IO操作發(fā)生異常時(shí),返回導(dǎo)致IO操作以此的原因,如果沒有異常,返回null
      6Throwable cause();
      7// 向Future添加事件,future完成時(shí),會(huì)執(zhí)行這些事件,如果add時(shí)future已經(jīng)完成,會(huì)立即執(zhí)行監(jiān)聽事件
      8Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
      9Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
      10// 移除監(jiān)聽事件,future完成時(shí),不會(huì)觸發(fā)
      11Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
      12Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
      13// 等待future done
      14Future<V> sync() throws InterruptedException;
      15// 等待future done,不可打斷
      16Future<V> syncUninterruptibly();
      17// 等待future完成
      18Future<V> await() throws InterruptedException;
      19// 等待future 完成,不可打斷
      20Future<V> awaitUninterruptibly();
      21boolean await(long timeout, TimeUnit unit) throws InterruptedException;
      22boolean await(long timeoutMillis) throws InterruptedException;
      23boolean awaitUninterruptibly(long timeout, TimeUnit unit);
      24boolean awaitUninterruptibly(long timeoutMillis);
      25// 立刻獲得結(jié)果,如果沒有完成,返回null
      26getNow();
      27// 如果成功取消,future會(huì)失敗,導(dǎo)致CancellationException
      28@Override
      29boolean cancel(boolean mayInterruptIfRunning);

      3、Promise機(jī)制

      Netty的Future與Java的Future雖然類名相同,但功能上略有不同,Netty中引入了Promise機(jī)制。在Java的Future中,業(yè)務(wù)邏輯為一個(gè)Callable或Runnable實(shí)現(xiàn)類,該類的call()或run()執(zhí)行完畢意味著業(yè)務(wù)邏輯的完結(jié);而在Promise機(jī)制中,可以在業(yè)務(wù)邏輯中人工設(shè)置業(yè)務(wù)邏輯的成功與失敗,這樣更加方便的監(jiān)控自己的業(yè)務(wù)邏輯。

      io.netty.util.concurrent.Promise.java |

      1public interface Promise<Vextends Future<V{
      2
      3    // 設(shè)置future執(zhí)行結(jié)果為成功
      4    Promise<V> setSuccess(V result);
      5
      6    // 嘗試設(shè)置future執(zhí)行結(jié)果為成功,返回是否設(shè)置成功
      7    boolean trySuccess(V result);
      8
      9    // 設(shè)置失敗
      10    Promise<V> setFailure(Throwable cause);
      11
      12    // 嘗試設(shè)置future執(zhí)行結(jié)果為失敗,返回是否設(shè)置成功 
      13    boolean tryFailure(Throwable cause);
      14
      15    // 設(shè)置為不能取消
      16    boolean setUncancellable();
      17
      18    // 源碼中,以下為覆蓋了Future的方法,例如;
      19
      20    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
      21
      22    @Override
      23    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
      24
      25}

      TestPromise.java | 一個(gè)查詢用戶信息的Promise列子,加入監(jiān)聽再operationComplete完成后,獲取查詢信息

      1class TestPromise {
      2
      3    public static void main(String[] args) throws ExecutionException, InterruptedException {
      4        TestPromise testPromise = new TestPromise();
      5        Promise<String> promise = testPromise.queryUserInfo("10001");
      6        promise.addListener(new GenericFutureListener<Future<? super String>>() {
      7            @Override
      8            public void operationComplete(Future<? super String> future) throws Exception {
      9                System.out.println("addListener.operationComplete > 查詢用戶信息完成: " + future.get());
      10            }
      11        });
      12    }
      13
      14    private Promise<String> queryUserInfo(String userId) {
      15        NioEventLoopGroup loop = new NioEventLoopGroup();
      16        // 創(chuàng)建一個(gè)DefaultPromise并返回,將業(yè)務(wù)邏輯放入線程池中執(zhí)行
      17        DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());
      18        loop.schedule(() -> {
      19            try {
      20                Thread.sleep(1000);
      21                promise.setSuccess("微信公眾號(hào):bugstack蟲洞棧 | 用戶ID:" + userId);
      22                return promise;
      23            } catch (InterruptedException ignored) {
      24            }
      25            return promise;
      26        }, 0, TimeUnit.SECONDS);
      27        return promise;
      28    }
      29
      30}

      通過這個(gè)例子可以看到,Promise能夠在業(yè)務(wù)邏輯線程中通知Future成功或失敗,由于Promise繼承了Netty的Future,因此可以加入監(jiān)聽事件。而Future和Promise的好處在于,獲取到Promise對(duì)象后可以為其設(shè)置異步調(diào)用完成后的操作,然后立即繼續(xù)去做其他任務(wù)。

      4、Promise類組織結(jié)構(gòu)&常用方法

      DefaultChannelPromise類組織結(jié)構(gòu)圖 | 承接Java并發(fā)包Future并增強(qiáng)實(shí)現(xiàn)

      微信公眾號(hào):bugstack蟲洞棧 | DefaultChannelPromise類組織結(jié)構(gòu)圖

      Netty中DefalutPromise是一個(gè)非常常用的類,這是Promise實(shí)現(xiàn)的基礎(chǔ)。DefaultChannelPromise是DefalutPromise的子類,加入了channel這個(gè)屬性。

      DefaultPromise | 使用
      在Netty中使用到Promise的地方會(huì)非常多,例如在前面一節(jié)《一行簡單的writeAndFlush都做了哪些事》分析HeadContext.write中unsafe.write(msg, promise);結(jié)合這一章節(jié)可以繼續(xù)深入了解Netty的異步框架原理。另外,服務(wù)器/客戶端啟動(dòng)時(shí)的注冊(cè)任務(wù),最終會(huì)調(diào)用unsafe的register,調(diào)用過程中會(huì)傳入一個(gè)promise,unsafe進(jìn)行事件的注冊(cè)時(shí)調(diào)用promise可以設(shè)置成功/失敗。

      SingleThreadEventLoop.java | 注冊(cè)服務(wù)事件循環(huán)組

      1@Override
      2public ChannelFuture register(Channel channel) {
      3    return register(new DefaultChannelPromise(channel, this));
      4}
      5
      6@Override
      7public ChannelFuture register(final ChannelPromise promise) {
      8    ObjectUtil.checkNotNull(promise, "promise");
      9    promise.channel().unsafe().register(this, promise);
      10    return promise;
      11}

      DefaultPromise | 實(shí)現(xiàn)
      DefaultChannelPromise提供的功能可以分為兩個(gè)部分;

      • 為調(diào)用者提供get()和addListener()用于獲取Future任務(wù)執(zhí)行結(jié)果和添加監(jiān)聽事件。

      • 為業(yè)務(wù)處理任務(wù)提供setSuccess()等方法設(shè)置任務(wù)的成功或失敗。

      AbstractFuture.java | get()方法

      1public abstract class AbstractFuture<Vimplements Future<V{
      2
      3    @Override
      4    public V get() throws InterruptedException, ExecutionException {
      5        await();
      6
      7        Throwable cause = cause();
      8        if (cause == null) {
      9            return getNow();
      10        }
      11        if (cause instanceof CancellationException) {
      12            throw (CancellationException) cause;
      13        }
      14        throw new ExecutionException(cause);
      15    }
      16
      17    @Override
      18    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
      19        if (await(timeout, unit)) {
      20            Throwable cause = cause();
      21            if (cause == null) {
      22                return getNow();
      23            }
      24            if (cause instanceof CancellationException) {
      25                throw (CancellationException) cause;
      26            }
      27            throw new ExecutionException(cause);
      28        }
      29        throw new TimeoutException();
      30    }
      31}

      DefaultPromise父類AbstractFuture提供了兩個(gè)get方法;1、無參數(shù)的get會(huì)阻塞等待;2、有參數(shù)的get會(huì)等待指定事件,若未結(jié)束拋出超時(shí)異常。


      DefaultPromise.java | DefaultPromise.await()方法

      1@Override
      2public Promise<V> await() throws Interrupt
      3    // 判斷Future任務(wù)是否結(jié)束,內(nèi)部根據(jù)result是否為null判斷,setSuccess或setFailure時(shí)會(huì)通過CAS修改result
      4    if (isDone()
      {
      5        return this;
      6    }
      7    // 線程是否被中斷
      8    if (Thread.interrupted()) {
      9        throw new InterruptedException(toS
      10    }
      11    // 檢查當(dāng)前線程是否與線程池運(yùn)行的線程是一個(gè)
      12    checkDeadLock();
      13    synchronized (this) {
      14        while (!isDone()) {
      15           /* waiters計(jì)數(shù)加1
      16            * private void incWaiters() {
      17            *   if (waiters == Short.MAX_VALUE) {
      18            *       throw new IllegalStateException("too many waiters: " + this);
      19            *   }
      20            *   ++waiters;
      21            * }
      22            */

      23            incWaiters();
      24            try {
      25                // Object的方法,讓出CPU,加入等待隊(duì)列
      26                wait();
      27            } finally {
      28                // waiters計(jì)數(shù)減1
      29                decWaiters();
      30            }
      31        }
      32    }
      33    return this;
      34}

      await(long timeout, TimeUnit unit)與awite類似,只是調(diào)用了Object對(duì)象的wait(long timeout, int nanos)方法awaitUninterruptibly()方法在內(nèi)部catch住了等待線程的中斷異常,因此不會(huì)拋出中斷異常。


      DefaultPromise.java | DefaultPromise.addListener0() / DefaultPromise.removeListener0()

      1private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
      2    if (listeners == null) {
      3        listeners = listener;
      4    } else if (listeners instanceof DefaultFutureListeners) {
      5        ((DefaultFutureListeners) listeners).add(listener);
      6    } else {
      7        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
      8    }
      9}
      10private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
      11    if (listeners instanceof DefaultFutureListeners) {
      12        ((DefaultFutureListeners) listeners).remove(listener);
      13    } else if (listeners == listener) {
      14        listeners = null;
      15    }
      16}
      • addListener0方法被調(diào)用時(shí),將傳入的回調(diào)類傳入到listeners對(duì)象中,如果監(jiān)聽多于1個(gè),會(huì)創(chuàng)建DefaultFutureListeners對(duì)象將回調(diào)方法保存在一個(gè)數(shù)組中。

      • removeListener0會(huì)將listeners設(shè)置為null(只有一個(gè)時(shí))或從數(shù)組中移除(多個(gè)回調(diào)時(shí))。


      DefaultPromise.java | DefaultPromise.notifyListener0() 通知偵聽器

      1@SuppressWarnings({ "unchecked""rawtypes" })
      2private static void notifyListener0(Future future, GenericFutureListener l) {
      3    try {
      4        l.operationComplete(future);
      5    } catch (Throwable t) {
      6        if (logger.isWarnEnabled()) {
      7            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
      8        }
      9    }
      10}
      • 在添加監(jiān)聽器時(shí),如果任務(wù)剛好執(zhí)行完畢,則會(huì)立即觸發(fā)監(jiān)聽事件,觸發(fā)監(jiān)聽通過notifyListeners()實(shí)現(xiàn)。

      • addListener和setSuccess都會(huì)調(diào)用notifyListeners()和Promise內(nèi)的線程池當(dāng)前執(zhí)行的線程是同一個(gè)線程,則放在線程池中執(zhí)行,否則提交到線程池去執(zhí)行;例如,main線程中調(diào)用addListener時(shí)任務(wù)完成,notifyListeners()執(zhí)行回調(diào),會(huì)提交到線程池中執(zhí)行;而如果是執(zhí)行Future任務(wù)的線程池中setSuccess()時(shí)調(diào)用notifyListeners(),會(huì)放在當(dāng)前線程中執(zhí)行。

      • 內(nèi)部維護(hù)了notifyingListeners用來記錄是否已經(jīng)觸發(fā)過監(jiān)聽事件,只有未觸發(fā)過且監(jiān)聽列表不為空,才會(huì)依次便利并調(diào)用operationComplete


      DefaultPromise.java | DefaultPromise.setSuccess0()、setFailure0() 喚起等待線程通知成功/失敗

      1// 設(shè)置成功后喚醒等待線程
      2private boolean setSuccess0(V result) {
      3    return setValue0(result == null ? SUCCESS : result);
      4}
      5
      6// 設(shè)置成功后喚醒等待線程
      7private boolean setFailure0(Throwable cause) {
      8    return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
      9}
      10
      11// 通知成功時(shí)將結(jié)果保存在變量result,通知失敗時(shí),使用CauseHolder包裝Throwable賦值給result
      12// RESULT_UPDATER 是一個(gè)使用CAS更新內(nèi)部屬性result的類,
      13// 如果result為null或UNCANCELLABLE,更新為成功/失敗結(jié)果;UNCANCELLABLE是不可取消狀態(tài)
      14private boolean setValue0(Object objResult) {
      15    if (RESULT_UPDATER.compareAndSet(thisnull, objResult) ||
      16        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
      17        // 檢查是否有服務(wù),如果有,通知他們。
      18        if (checkNotifyWaiters()) {
      19            notifyListeners();  // 通知
      20        }
      21        return true;
      22    }
      23    return false;
      24}

      Future任務(wù)在執(zhí)行完成后調(diào)用setSuccess()或setFailure()通知Future執(zhí)行結(jié)果;主要邏輯是:修改result的值,若有等待線程則喚醒,通知監(jiān)聽事件。


      DefaultChannelPromise實(shí)現(xiàn)

      1/**
      2 * The default {@link ChannelPromise} implementation.  It is recommended to use {@link Channel#newPromise()} to create
      3 * a new {@link ChannelPromise} rather than calling the constructor explicitly.
      4 */

      5public class DefaultChannelPromise extends DefaultPromise<Voidimplements ChannelPromiseFlushCheckpoint {
      6
      7    private final Channel channel;
      8    private long checkpoint;
      9
      10    ...
      11}
      • 從繼承關(guān)系可以看到DefaultChannelPromise是DefaultPromise的實(shí)現(xiàn)類,內(nèi)部維護(hù)了一個(gè)通道變量Channel。

      • 另外還實(shí)現(xiàn)了FlushCheckpoint接口,給ChannelFlushPromiseNotifier使用,我們可以將ChannelFuture注冊(cè)到ChannelFlushPromiseNotifier類,當(dāng)有數(shù)據(jù)寫入或到達(dá)checkpoint時(shí)使用。

      1interface FlushCheckpoint {
      2    long flushCheckpoint();
      3    void flushCheckpoint(long checkpoint)
      4    ChannelPromise promise()
      ;
      5}

        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請(qǐng)遵守用戶 評(píng)論公約

        類似文章 更多