乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      AQS --- 融會(huì)貫通

       貪挽懶月 2022-06-20 發(fā)布于廣東

      一、ReentrantLock 加鎖過程簡(jiǎn)介

      加鎖可以分為三個(gè)階段:

      • 嘗試加鎖;
      • 加鎖失敗,線程入AQS隊(duì)列;
      • 線程入隊(duì)列后進(jìn)入阻塞狀態(tài)。

      二、ReentrantLock 加鎖源碼分析

      現(xiàn)有如下場(chǎng)景:

      三個(gè)人去銀行的一個(gè)窗口辦理業(yè)務(wù),一個(gè)窗口同一時(shí)刻只能接待一位顧客。抽象成代碼就是:

      public static void main(String[] args){
       ReentrantLock lock = new ReentrantLock();
       new Thread(() -> {
        lock.lock();
        try {
         System.out.println("顧客A辦理業(yè)務(wù)");
         TimeUnit.MINUTES.sleep(5);
        } catch (Exception e){
         e.printStackTrace();
        } finally {
         lock.unlock();
        }
       }, "A").start();

       new Thread(() -> {
        lock.lock();
        try {
         System.out.println("顧客B辦理業(yè)務(wù)");
        } catch (Exception e){
         e.printStackTrace();
        } finally {
         lock.unlock();
        }
       }, "B").start();

       new Thread(() -> {
        lock.lock();
        try {
         System.out.println("顧客C辦理業(yè)務(wù)");
        } catch (Exception e){
         e.printStackTrace();
        } finally {
         lock.unlock();
        }
       }, "C").start();
      }

      1. lock 方法:

      調(diào)用lock.lock()的時(shí)候,實(shí)際上調(diào)用的是 NonfairSync 的 lock 方法,如下:

      final void lock() {
       if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
       else
        acquire(1);
      }

      compareAndSetState(0, 1) 這里比較并交換,期望 AQS 中的 state 的值是0,如果是,就將其修改成1,返回 true ,否則返回 false 。

      • 返回 true 的時(shí)候,執(zhí)行 setExclusiveOwnerThread(Thread.currentThread()) 把占用資源的線程設(shè)置成當(dāng)前線程。當(dāng)線程A進(jìn)來的時(shí)候,因?yàn)?AQS 中的 state 還沒別的線程去修改,所以是0,就會(huì)成功修改成 1,就直接加鎖成功了。

      • 返回 false 的時(shí)候,就會(huì)進(jìn)入 esle 塊中,執(zhí)行 acquire(1) 方法。假如線程A加鎖成功還沒釋放鎖的時(shí)候,線程B進(jìn)來了,那么就會(huì)返回 false 。 acquire(1) 方法又主要包括三個(gè)方法,源碼如下:

      public final void acquire(int arg) {
       if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
      }

      接下來依次來看這三個(gè)方法。

      2. tryAcquire(arg) 方法:

      protected final boolean tryAcquire(int acquires) {
       return nonfairTryAcquire(acquires);
      }

      再點(diǎn)進(jìn)去看 nonfairTryAcquire(acquires) :

      final boolean nonfairTryAcquire(int acquires) {
       final Thread current = Thread.currentThread();
       int c = getState();
       if (c == 0) {
        if (compareAndSetState(0, acquires)) {
         setExclusiveOwnerThread(current);
         return true;
        }
       }
       else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
         throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
       }
       return false;
      }

      線程B進(jìn)到這段代碼就有三種情況:

      • currentThread 就是線程B,c 就是 AQS 中的 state,即1,c 不等于0,所以跑到 else if 中,判斷當(dāng)前線程和持有鎖的線程是否相同。current 是 B,而當(dāng)前持有鎖的是 A,也不相等,所以直接 return false。

      • 如果線程B進(jìn)來的時(shí)候 A 剛好走了,即 c 等于0,那么進(jìn)到 if 里面。if 里面做的事就是再將 state 改成1,同時(shí)設(shè)置當(dāng)前占有鎖的線程為 B,然后返回 true;

      • 如果當(dāng)前線程等于當(dāng)前占有鎖的線程,即進(jìn)來的還是線程A,那么就修改 state 的值(當(dāng)前值加1),然后返回 true。

      3. addWaiter(Node.EXCLUSIVE) 方法:

      上面說了 tryAcquire(arg) 方法,當(dāng)該方法返回 false 的時(shí)候,就會(huì)執(zhí)行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法,那么先看它里面的 addWaiter(Node.EXCLUSIVE) 方法,源碼如下:

      private Node addWaiter(Node mode) {
       Node node = new Node(Thread.currentThread(), mode);
       // Try the fast path of enq; backup to full enq on failure
       Node pred = tail;
       if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
         pred.next = node;
         return node;
        }
       }
       enq(node);
       return node;
      }

      無特殊說明的時(shí)候,以下情況基于:當(dāng)前持有鎖的是線程A,并且還沒釋放,進(jìn)來的是線程B。

      這個(gè)方法首先將線程B封裝成 Node,傳進(jìn)來的 mode 是 AQS 中的一個(gè)屬性,還沒哪里賦過值,所以是 null,當(dāng)前的 tail 其實(shí)也是 null,因?yàn)?AQS 隊(duì)列中現(xiàn)在還沒別的等待線程。是 null 的話,就執(zhí)行入隊(duì)方法 enq(node),該方法如下:

      private Node enq(final Node node) {
       for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
         if (compareAndSetHead(new Node()))
          tail = head;
        } else {
         node.prev = t;
         if (compareAndSetTail(t, node)) {
          t.next = node;
          return t;
         }
        }
       }
      }

      for (; ;) 其實(shí)就是相當(dāng)于 while(true),進(jìn)行自旋。當(dāng)前的 tail 是 null ,所以進(jìn)入 if 中,這里有個(gè) compareAndSetHead(new Node()) 方法,這里是 new 了一個(gè)節(jié)點(diǎn),姑且叫它傀儡節(jié)點(diǎn),將它設(shè)置為頭結(jié)點(diǎn),如果 new 成功了,尾結(jié)點(diǎn)也指向它。效果如下圖:

      傀儡節(jié)點(diǎn)

      第二次循環(huán)的時(shí)候,t 就是不再是 null 了,而是指向剛才那個(gè)傀儡節(jié)點(diǎn)了,所以進(jìn)入 else 中。else 中做的事就是,將傳進(jìn)來的節(jié)點(diǎn),即封裝了線程B的節(jié)點(diǎn) node,將其 prev 設(shè)置成剛才new 的那個(gè)傀儡節(jié)點(diǎn),再將 tail 指向 封裝了線程B的 node;再將傀儡節(jié)點(diǎn)的 next 指針指向封裝了線程B的 node,如下圖:

      node入隊(duì)

      當(dāng)線程C進(jìn)到 addWaiter(Node mode) 方法,此時(shí) tail 不是 null 了,已經(jīng)指向了 線程B的節(jié)點(diǎn),所以進(jìn)入到 if 塊里面,執(zhí)行:

      if (pred != null) {
       node.prev = pred;
       if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
       }
      }

      這里就是將線程C的 prev 設(shè)置為當(dāng)前的 tail,即線程B的 node,然后將線程C設(shè)置為 tail,再將線程B的 next 設(shè)置為線程C。最后效果圖如下:

      線程C入隊(duì)

      4. acquireQueued(final Node node, int arg) 方法:

      再來看看這個(gè)方法:

      final boolean acquireQueued(final Node node, int arg) {
       boolean failed = true;
       try {
        boolean interrupted = false;
        for (;;) {
         final Node p = node.predecessor();
         if (p == head && tryAcquire(arg)) {
          setHead(node);
          p.next = null; // help GC
          failed = false;
          return interrupted;
         }
         if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
          interrupted = true;
        }
       } finally {
        if (failed)
         cancelAcquire(node);
       }
      }

      這個(gè)固定的,假如傳進(jìn)來的 node 是線程B,首先會(huì)進(jìn)入自旋,看看 predecessor 這方法:

      final Node predecessor() throws NullPointerException {
       Node p = prev;
       if (p == null)
        throw new NullPointerException();
       else
        return p;
      }

      首先讓 p 等于 prev,此時(shí)線程B 節(jié)點(diǎn)的 prev是誰呢,直接看我上面畫的圖可以知道,線程B的 prev 就是傀儡節(jié)點(diǎn),所以這里 return 的就是傀儡節(jié)點(diǎn)。

      回到外層,傀儡節(jié)點(diǎn)等于 head,所以又會(huì)執(zhí)行 tryAcquire(arg),即又重復(fù)上述步驟再次嘗試獲取鎖。因?yàn)榇藭r(shí)線程A持有鎖,所以線程B嘗試獲取鎖會(huì)失敗,即 tryAcquire(arg) 會(huì)返回 false。

      返回 false, 那就執(zhí)行下一個(gè) if。首先看 shouldParkAfterFailedAcquire(p, node) 方法:

      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       int ws = pred.waitStatus;
       if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
       if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
         node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
       } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
       }
       return false;
      }

      prev 是傀儡節(jié)點(diǎn),它的 waitStatus 是0,因?yàn)榭芄?jié)點(diǎn) new 出來以后還沒改過它的 waitStatus 的值,默認(rèn)是0。Node.SIGNAL 的值是 -1,不相等,0 也不大于 0,所以進(jìn)入 else,執(zhí)行 compareAndSetWaitStatus(pred, ws, Node.SIGNAL),這是比較并交換,將傀儡節(jié)點(diǎn)的 waitStatus 的值由 0 改為 -1,最后返回了 false。

      shouldParkAfterFailedAcquire(p, node) 方法返回了 false,因?yàn)樽孕?,所以又回?nbsp;final Node p = node.predecessor(); 這一行。此時(shí) p 節(jié)點(diǎn)還是傀儡節(jié)點(diǎn),再去嘗試獲取鎖,如果線程A還是釋放,又獲取失敗了,就會(huì)再次執(zhí)行 shouldParkAfterFailedAcquire(p, node) 方法。

      再次執(zhí)行 shouldParkAfterFailedAcquire(p, node) 的時(shí)候,傀儡節(jié)點(diǎn)的 waitStatus 就已經(jīng)是 -1 了,所以直接 return true。

      這里 return 了 true,就會(huì)執(zhí)行 parkAndCheckInterrupt() 方法,看看這個(gè)方法源碼:

      private final boolean parkAndCheckInterrupt() {
       LockSupport.park(this);
       return Thread.interrupted();
      }

      這里的 this 就是線程B,這里調(diào)用了 park 方法,就讓線程B 在等著了。線程C進(jìn)來也一樣,執(zhí)行到這一步,就會(huì)調(diào)用 park 方法,一直在等著。當(dāng)線程A釋放鎖了,就會(huì)調(diào)用 unpark 方法,線程B和線程C就有一個(gè)可以搶到鎖了。

      5. unlock 方法:

      當(dāng)線程A調(diào)用了 unlock 方法,實(shí)際上調(diào)用的是:

       public void unlock() {
       sync.release(1);
      }

      點(diǎn)進(jìn)去之后是這樣的:

      public final boolean release(int arg) {
       if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
         unparkSuccessor(h);
        return true;
       }
       return false;
      }

      再點(diǎn)進(jìn)去看 tryRelease(arg 方法:

       protected final boolean tryRelease(int releases) {
       int c = getState() - releases;
       if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
       boolean free = false;
       if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
       }
       setState(c);
       return free;
      }

      線程A getState 是 1,傳進(jìn)來的 releases 也是 1,所以相減結(jié)果就是 0。因?yàn)榻Y(jié)果是 0,所以會(huì)將 free 改成 true,然后調(diào)用 setExclusiveOwnerThread(null); 將當(dāng)前持有鎖的線程設(shè)置為 null。然后設(shè)置 AQS 的 state 等于 c,即等于 0,最后該方法 return true。

      回到上一層,此時(shí)的 head 是傀儡節(jié)點(diǎn),不為空,并且傀儡節(jié)點(diǎn)的 waitStatus 剛才改成了 -1,不等于 0,所以會(huì)調(diào)用 unparkSuccessor(h); 方法:

      private void unparkSuccessor(Node node) {
       /*
        * If status is negative (i.e., possibly needing signal) try
        * to clear in anticipation of signalling.  It is OK if this
        * fails or if status is changed by waiting thread.
        */
       int ws = node.waitStatus;
       if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

       /*
        * Thread to unpark is held in successor, which is normally
        * just the next node.  But if cancelled or apparently null,
        * traverse backwards from tail to find the actual
        * non-cancelled successor.
        */
       Node s = node.next;
       if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
         if (t.waitStatus <= 0)
          s = t;
       }
       if (s != null)
        LockSupport.unpark(s.thread);
      }

      這里傳進(jìn)來的節(jié)點(diǎn)是傀儡節(jié)點(diǎn),它的 waitStatus 是 -1,小于 0,所以就會(huì)執(zhí)行 compareAndSetWaitStatus(node, ws, 0) 進(jìn)行比較并交換,又將傀儡節(jié)點(diǎn)的 waitStatus 改成了 0。

      繼續(xù)往下執(zhí)行,得到的 s 就是線程B所在節(jié)點(diǎn),不為空,并且線程B節(jié)點(diǎn)的 waitStatus 還沒改過,是 0,所以直接執(zhí)行 LockSupport.unpark(s.thread)。

      因?yàn)檎{(diào)用了 unpark,所以剛才阻塞的線程B在 acquireQueued(final Node node, int arg) 方法中的自旋就繼續(xù)進(jìn)行,就會(huì)調(diào)用 tryAcquire(arg) 方法,這次因?yàn)锳已經(jīng)釋放鎖了,所以該方法會(huì)返回 true,就會(huì)執(zhí)行 if 里面的代碼:

      if (p == head && tryAcquire(arg)) {
       setHead(node);
       p.next = null; // help GC
       failed = false;
       return interrupted;
      }

      看看 setHead(node) 方法:

      private void setHead(Node node) {
       head = node;
       node.thread = null;
       node.prev = null;
      }

      這里的 node 就是線程B節(jié)點(diǎn),將頭結(jié)點(diǎn)指向線程B節(jié)點(diǎn),將線程B節(jié)點(diǎn)的線程設(shè)置為空,前驅(qū)設(shè)置為空。外層再把傀儡節(jié)點(diǎn)的 next 指針設(shè)置為空,所以最終效果就是:

      傀儡節(jié)點(diǎn)出隊(duì)

      最終是傀儡節(jié)點(diǎn)出隊(duì),以前線程B所在節(jié)點(diǎn)成為新的傀儡節(jié)點(diǎn)。因?yàn)橹暗目芄?jié)點(diǎn)已經(jīng)沒有任何引用指向它了,就會(huì)被 GC 回收。


      掃描二維碼

        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請(qǐng)遵守用戶 評(píng)論公約

        類似文章 更多