乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      深入理解AbstractQueuedSynchronizer(一)

       liang1234_ 2019-01-30

      博客鏈接:http://www./2017/03/15/深入理解AbstractQueuedSynchronizer(一)/


      AbstractQueuedSynchronizer簡(jiǎn)介

      AbstractQueuedSynchronizer提供了一個(gè)FIFO隊(duì)列,可以看做是一個(gè)可以用來實(shí)現(xiàn)鎖以及其他需要同步功能的框架。這里簡(jiǎn)稱該類為AQS。AQS的使用依靠繼承來完成,子類通過繼承自AQS并實(shí)現(xiàn)所需的方法來管理同步狀態(tài)。例如ReentrantLock,CountDownLatch等。

      本篇文章基于JDK1.8來介紹,該類有許多實(shí)現(xiàn)類:

      其中,我們最常用的大概就是ReentrantLock和CountDownLatch了。ReentrantLock提供了對(duì)代碼塊的并發(fā)訪問控制,也就是鎖,說是鎖,但其實(shí)并沒有用到關(guān)鍵字synchronized,這么神奇?其實(shí)其內(nèi)部就是基于同步器來實(shí)現(xiàn)的,本文結(jié)合ReentrantLock的使用來分析同步器獨(dú)占鎖的原理。

      AQS的兩種功能

      從使用上來說,AQS的功能可以分為兩種:獨(dú)占和共享。對(duì)于這兩種功能,有一個(gè)很常用的類:ReentrantReadWriteLock,其就是通過兩個(gè)內(nèi)部類來分別實(shí)現(xiàn)了這兩種功能,提供了讀鎖和寫鎖的功能。但子類實(shí)現(xiàn)時(shí),只能實(shí)現(xiàn)其中的一種功能,即要么是獨(dú)占功能,要么是共享功能。

      對(duì)于獨(dú)占功能,例如如下代碼:

      ReentrantLock lock = new ReentrantLock(); ... public void function(){ lock.lock(); try { // do something... } finally { lock.unlock(); } }

      這個(gè)很好理解,通過ReentrantLock來保證在lock.lock()之后的代碼在同一時(shí)刻只能有一個(gè)線程來執(zhí)行,其余的線程將會(huì)被阻塞,直到該線程執(zhí)行了lock.unlock()。這就是一個(gè)獨(dú)占鎖的功能。

      對(duì)于共享功能,例如如下代碼:

      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
      ...
      
      public void function(){
          
          lock.readLock().lock();
          try {
              
          // do something...
              
              
          } finally {
              lock.readLock().unlock();
          }
          
      }
      
      

      代碼中的lockReentrantReadWriteLock類的實(shí)例,而lock.readLock()為獲取其中的讀鎖,即共享鎖,使用方式并無差別,但和獨(dú)占鎖是有區(qū)別的:

      • 讀鎖與讀鎖可以共享
      • 讀鎖與寫鎖不可以共享(排他)
      • 寫鎖與寫鎖不可以共享(排他)

      AQS獨(dú)占鎖的內(nèi)部實(shí)現(xiàn)

      AQS的主要數(shù)據(jù)結(jié)構(gòu)

      由于使用AQS可以實(shí)現(xiàn)鎖的功能,那么下面就要分析一下究竟是如何實(shí)現(xiàn)的。

      AQS內(nèi)部維護(hù)著一個(gè)FIFO的隊(duì)列,該隊(duì)列就是用來實(shí)現(xiàn)線程的并發(fā)訪問控制。隊(duì)列中的元素是一個(gè)Node類型的節(jié)點(diǎn),Node的主要屬性如下:

      static final class Node { int waitStatus; Node prev; Node next; Node nextWaiter; Thread thread; }
      • waitStatus:表示節(jié)點(diǎn)的狀態(tài),其中包含的狀態(tài)有:
        • CANCELLED:值為1,表示當(dāng)前節(jié)點(diǎn)被取消;
        • SIGNAL:值為-1,表示當(dāng)前節(jié)點(diǎn)的的后繼節(jié)點(diǎn)將要或者已經(jīng)被阻塞,在當(dāng)前節(jié)點(diǎn)釋放的時(shí)候需要unpark后繼節(jié)點(diǎn);
        • CONDITION:值為-2,表示當(dāng)前節(jié)點(diǎn)在等待condition,即在condition隊(duì)列中;
        • PROPAGATE:值為-3,表示releaseShared需要被傳播給后續(xù)節(jié)點(diǎn)(僅在共享模式下使用);
        • 0:無狀態(tài),表示當(dāng)前節(jié)點(diǎn)在隊(duì)列中等待獲取鎖。
      • prev:前繼節(jié)點(diǎn);
      • next:后繼節(jié)點(diǎn);
      • nextWaiter:存儲(chǔ)condition隊(duì)列中的后繼節(jié)點(diǎn);
      • thread:當(dāng)前線程。

      其中,隊(duì)列里還有一個(gè)head節(jié)點(diǎn)和一個(gè)tail節(jié)點(diǎn),分別表示頭結(jié)點(diǎn)和尾節(jié)點(diǎn),其中頭結(jié)點(diǎn)不存儲(chǔ)Thread,僅保存next結(jié)點(diǎn)的引用。

      AQS中有一個(gè)state變量,該變量對(duì)不同的子類實(shí)現(xiàn)具有不同的意義,對(duì)ReentrantLock來說,它表示加鎖的狀態(tài):

      • 無鎖時(shí)state=0,有鎖時(shí)state>0;
      • 第一次加鎖時(shí),將state設(shè)置為1;
      • 由于ReentrantLock是可重入鎖,所以持有鎖的線程可以多次加鎖,經(jīng)過判斷加鎖線程就是當(dāng)前持有鎖的線程時(shí)(即exclusiveOwnerThread==Thread.currentThread()),即可加鎖,每次加鎖都會(huì)將state的值 1,state等于幾,就代表當(dāng)前持有鎖的線程加了幾次鎖;
      • 解鎖時(shí)每解一次鎖就會(huì)將state減1,state減到0后,鎖就被釋放掉,這時(shí)其它線程可以加鎖;
      • 當(dāng)持有鎖的線程釋放鎖以后,如果是等待隊(duì)列獲取到了加鎖權(quán)限,則會(huì)在等待隊(duì)列頭部取出第一個(gè)線程去獲取鎖,獲取鎖的線程會(huì)被移出隊(duì)列;

      state變量定義如下:

      /**
       * The synchronization state.
       */
      private volatile int state;
      

      ReentrantLock類的結(jié)構(gòu)

      下面通過ReentrantLock的實(shí)現(xiàn)進(jìn)一步分析重入鎖的實(shí)現(xiàn)。

      首先看一下lock方法:

      public void lock() { sync.lock(); }

      該方法調(diào)用了sync實(shí)例的lock方法,這里要說明一下ReentrantLock中的幾個(gè)內(nèi)部類:

      • Sync
      • FairSync
      • NonfairSync

      對(duì)于ReentrantLock,有兩種獲取鎖的模式:公平鎖和非公平鎖。所以對(duì)應(yīng)有兩個(gè)內(nèi)部類,都繼承自Sync。而Sync繼承自AQS:

      本文主要通過公平鎖來介紹,看一下FairSync的定義:

      /**
       * Sync object for fair locks
       */
      static final class FairSync extends Sync {
          private static final long serialVersionUID = -3000897897090466540L;
      
          final void lock() {
              acquire(1);
          }
      
          /**
           * Fair version of tryAcquire.  Don't grant access unless
           * recursive call or no waiters or is first.
           */
          protected final boolean tryAcquire(int acquires) {
              final Thread current = Thread.currentThread();
              // 獲取state
              int c = getState();
              // state=0表示當(dāng)前隊(duì)列中沒有線程被加鎖
              if (c == 0) {
                  /*
                   * 首先判斷是否有前繼結(jié)點(diǎn),如果沒有則當(dāng)前隊(duì)列中還沒有其他線程;
                   * 設(shè)置狀態(tài)為acquires,即lock方法中寫死的1(這里為什么不直接setState?因?yàn)榭赡芡瑫r(shí)有多個(gè)線程同時(shí)在執(zhí)行到此處,所以用CAS來執(zhí)行);
                   * 設(shè)置當(dāng)前線程獨(dú)占鎖。
                   */
                  if (!hasQueuedPredecessors() &&
                      compareAndSetState(0, acquires)) {
                      setExclusiveOwnerThread(current);
                      return true;
                  }
              }
              /*
               * 如果state不為0,表示已經(jīng)有線程獨(dú)占鎖了,這時(shí)還需要判斷獨(dú)占鎖的線程是否是當(dāng)前的線程,原因是由于ReentrantLock為可重入鎖;
               * 如果獨(dú)占鎖的線程是當(dāng)前線程,則將狀態(tài)加1,并setState;
               * 這里為什么不用compareAndSetState?因?yàn)楠?dú)占鎖的線程已經(jīng)是當(dāng)前線程,不需要通過CAS來設(shè)置。
               */
              else if (current == getExclusiveOwnerThread()) {
                  int nextc = c   acquires;
                  if (nextc < 0)
                      throw new Error('Maximum lock count exceeded');
                  setState(nextc);
                  return true;
              }
              return false;
          }
      }
      

      AQS獲取獨(dú)占鎖的實(shí)現(xiàn)

      acquire方法

      acquire是AQS中的方法,代碼如下:

      public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

      該方法主要工作如下:

      1. 嘗試獲取獨(dú)占鎖;
      2. 獲取成功則返回,否則執(zhí)行步驟3;
      3. addWaiter方法將當(dāng)前線程封裝成Node對(duì)象,并添加到隊(duì)列尾部;
      4. 自旋獲取鎖,并判斷中斷標(biāo)志位。如果中斷標(biāo)志位為true,執(zhí)行步驟5,否則返回;
      5. 設(shè)置線程中斷。

      tryAcquire方法

      tryAcquire方法在FairSync中已經(jīng)說明,它重寫了AQS中的方法,在AQS中它的定義如下:

      protected boolean tryAcquire(int arg) {
          throw new UnsupportedOperationException();
      }
      

      既然該方法需要子類來實(shí)現(xiàn),為什么不使用abstract來修飾呢?上文中提到過,AQS有兩種功能:獨(dú)占和共享,而且子類只能實(shí)現(xiàn)其一種功能,所以,如果使用abstract來修飾,那么每個(gè)子類都需要同時(shí)實(shí)現(xiàn)兩種功能的方法,這對(duì)子類來說不太友好,所以沒有使用abstract來修飾。

      該方法是在ReentrantLock中的FairSync和NonfairSync的兩個(gè)內(nèi)部類來實(shí)現(xiàn)的,這里以FairSysc-公平鎖來說明:

      protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c acquires; if (nextc < 0) throw new Error('Maximum lock count exceeded'); setState(nextc); return true; } return false; }

      addWaiter方法

      看下addWaiter方法的定義:

      private Node addWaiter(Node mode) {
          // 根據(jù)當(dāng)前線程創(chuàng)建一個(gè)Node對(duì)象
          Node node = new Node(Thread.currentThread(), mode);
          // Try the fast path of enq; backup to full enq on failure
          Node pred = tail;
          // 判斷tail是否為空,如果為空表示隊(duì)列是空的,直接enq
          if (pred != null) {
              node.prev = pred;
              // 這里嘗試CAS來設(shè)置隊(duì)尾,如果成功則將當(dāng)前節(jié)點(diǎn)設(shè)置為tail,否則enq
              if (compareAndSetTail(pred, node)) {
                  pred.next = node;
                  return node;
              }
          }
          enq(node);
          return node;
      }
      

      該方法就是根據(jù)當(dāng)前線程創(chuàng)建一個(gè)Node,然后添加到隊(duì)列尾部。

      enq方法

      private Node enq(final Node node) { // 重復(fù)直到成功 for (;;) { Node t = tail; // 如果tail為null,則必須創(chuàng)建一個(gè)Node節(jié)點(diǎn)并進(jìn)行初始化 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // 嘗試CAS來設(shè)置隊(duì)尾 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

      acquireQueued方法

      該方法的功能是循環(huán)的嘗試獲取鎖,直到成功為止,最后返回中斷標(biāo)志位。

      final boolean acquireQueued(final Node node, int arg) {
          boolean failed = true;
          try {
              // 中斷標(biāo)志位
              boolean interrupted = false;
              for (;;) {
                  // 獲取前繼節(jié)點(diǎn)
                  final Node p = node.predecessor();
                  // 如果前繼節(jié)點(diǎn)是head,則嘗試獲取
                  if (p == head && tryAcquire(arg)) {
                      // 設(shè)置head為當(dāng)前節(jié)點(diǎn)(head中不包含thread)
                      setHead(node);
                      // 清除之前的head
                      p.next = null; // help GC
                      failed = false;
                      return interrupted;
                  }
                  // 如果p不是head或者獲取鎖失敗,判斷是否需要進(jìn)行park
                  if (shouldParkAfterFailedAcquire(p, node) &&
                      parkAndCheckInterrupt())
                      interrupted = true;
              }
          } finally {
              if (failed)
                  cancelAcquire(node);
          }
      }
      

      這里有幾個(gè)問題很重要:

      • 什么條件下需要park?
      • 為什么要判斷中斷狀態(tài)?
      • 死循環(huán)不會(huì)引起CPU使用率飆升?

      下面分別來分析一下。

      什么條件下需要park?

      看下shouldParkAfterFailedAcquire方法的代碼:

      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
      • 如果前一個(gè)節(jié)點(diǎn)的狀態(tài)是SIGNAL,則需要park;
      • 如果ws > 0,表示已被取消,刪除狀態(tài)是已取消的節(jié)點(diǎn);
      • 其他情況,設(shè)置前繼節(jié)點(diǎn)的狀態(tài)為SIGNAL。

      可見,只有在前繼節(jié)點(diǎn)的狀態(tài)是SIGNAL時(shí),需要park。第二種情況稍后會(huì)詳細(xì)介紹。

      為什么要判斷中斷狀態(tài)?

      首先要知道,acquireQueued方法中獲取鎖的方式是死循環(huán),判斷是否中斷是在parkAndCheckInterrupt方法中實(shí)現(xiàn)的,看下該方法的代碼:

      private final boolean parkAndCheckInterrupt() {
          LockSupport.park(this);
          return Thread.interrupted();
      }
      

      非常簡(jiǎn)單,阻塞當(dāng)前線程,然后返回線程的中斷狀態(tài)并復(fù)位中斷狀態(tài)。

      注意interrupted()方法的作用,該方法是獲取線程的中斷狀態(tài),并復(fù)位,也就是說,如果當(dāng)前線程是中斷狀態(tài),則第一次調(diào)用該方法獲取的是true,第二次則是false。而isInterrupted()方法則只是返回線程的中斷狀態(tài),不執(zhí)行復(fù)位操作。

      如果acquireQueued執(zhí)行完畢,返回中斷狀態(tài),回到acquire方法中,根據(jù)返回的中斷狀態(tài)判斷是否需要執(zhí)行Thread.currentThread().interrupt()。

      為什么要多做這一步呢?先判斷中斷狀態(tài),然后復(fù)位,如果之前線程是中斷狀態(tài),再進(jìn)行中斷?

      這里就要介紹一下park方法了。park方法是Unsafe類中的方法,與之對(duì)應(yīng)的是unpark方法。簡(jiǎn)單來說,當(dāng)前線程如果執(zhí)行了park方法,也就是阻塞了當(dāng)前線程,反之,unpark就是喚醒一個(gè)線程。

      具體的說明請(qǐng)參考http://blog.csdn.net/hengyunabc/article/details/28126139

      park與wait的作用類似,但是對(duì)中斷狀態(tài)的處理并不相同。如果當(dāng)前線程不是中斷的狀態(tài),park與wait的效果是一樣的;如果一個(gè)線程是中斷的狀態(tài),這時(shí)執(zhí)行wait方法會(huì)報(bào)java.lang.IllegalMonitorStateException,而執(zhí)行park時(shí)并不會(huì)報(bào)異常,而是直接返回。

      所以,知道了這一點(diǎn),就可以知道為什么要進(jìn)行中斷狀態(tài)的復(fù)位了:

      • 如果當(dāng)前線程是非中斷狀態(tài),則在執(zhí)行park時(shí)被阻塞,這是返回中斷狀態(tài)是false;
      • 如果當(dāng)前線程是中斷狀態(tài),則park方法不起作用,會(huì)立即返回,然后parkAndCheckInterrupt方法會(huì)獲取中斷的狀態(tài),也就是true,并復(fù)位;
      • 再次執(zhí)行循環(huán)的時(shí)候,由于在前一步已經(jīng)把該線程的中斷狀態(tài)進(jìn)行了復(fù)位,則再次調(diào)用park方法時(shí)會(huì)阻塞。

      所以,這里判斷線程中斷的狀態(tài)實(shí)際上是為了不讓循環(huán)一直執(zhí)行,要讓當(dāng)前線程進(jìn)入阻塞的狀態(tài)。想象一下,如果不這樣判斷,前一個(gè)線程在獲取鎖之后執(zhí)行了很耗時(shí)的操作,那么豈不是要一直執(zhí)行該死循環(huán)?這樣就造成了CPU使用率飆升,這是很嚴(yán)重的后果。

      死循環(huán)不會(huì)引起CPU使用率飆升?

      上面已經(jīng)說明。

      cancelAcquire方法

      在acquireQueued方法的finally語句塊中,如果在循環(huán)的過程中出現(xiàn)了異常,則執(zhí)行cancelAcquire方法,用于將該節(jié)點(diǎn)標(biāo)記為取消狀態(tài)。該方法代碼如下:

      private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; // 設(shè)置該節(jié)點(diǎn)不再關(guān)聯(lián)任何線程 node.thread = null; // Skip cancelled predecessors // 通過前繼節(jié)點(diǎn)跳過取消狀態(tài)的node Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. // 獲取過濾后的前繼節(jié)點(diǎn)的后繼節(jié)點(diǎn) Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 設(shè)置狀態(tài)為取消狀態(tài) node.waitStatus = Node.CANCELLED; /* * If we are the tail, remove ourselves. * 1.如果當(dāng)前節(jié)點(diǎn)是tail: * 嘗試更新tail節(jié)點(diǎn),設(shè)置tail為pred; * 更新失敗則返回,成功則設(shè)置tail的后繼節(jié)點(diǎn)為null */ if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; /* * 2.如果當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn): * 判斷當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)的狀態(tài)是否是SIGNAL,如果不是則嘗試設(shè)置前繼節(jié)點(diǎn)的狀態(tài)為SIGNAL; * 上面兩個(gè)條件如果有一個(gè)返回true,則再判斷前繼節(jié)點(diǎn)的thread是否不為空; * 若滿足以上條件,則嘗試設(shè)置當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)的后繼節(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),也就是相當(dāng)于將當(dāng)前節(jié)點(diǎn)從隊(duì)列中刪除 */ if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 3.如果是head的后繼節(jié)點(diǎn)或者狀態(tài)判斷或設(shè)置失敗,則喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn) unparkSuccessor(node); } node.next = node; // help GC } }

      該方法中執(zhí)行的過程有些復(fù)雜,首先是要獲取當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn),如果前繼節(jié)點(diǎn)的狀態(tài)不是取消狀態(tài)(即pred.waitStatus > 0),則向前遍歷隊(duì)列,直到遇到第一個(gè)waitStatus <= 0的節(jié)點(diǎn),并把當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)設(shè)置為該節(jié)點(diǎn),然后設(shè)置當(dāng)前節(jié)點(diǎn)的狀態(tài)為取消狀態(tài)。

      接下來的工作可以分為3種情況:

      • 當(dāng)前節(jié)點(diǎn)是tail;
      • 當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn)(即隊(duì)列的第一個(gè)節(jié)點(diǎn),不包括head),也不是tail;
      • 當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn)。

      我們依次來分析一下:

      當(dāng)前節(jié)點(diǎn)是tail

      這種情況很簡(jiǎn)單,因?yàn)閠ail是隊(duì)列的最后一個(gè)節(jié)點(diǎn),如果該節(jié)點(diǎn)需要取消,則直接把該節(jié)點(diǎn)的前繼節(jié)點(diǎn)的next指向null,也就是把當(dāng)前節(jié)點(diǎn)移除隊(duì)列。出隊(duì)的過程如下:

      注意:經(jīng)驗(yàn)證,這里并沒有設(shè)置node的prev為null。

      當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn),也不是tail

      這里將node的前繼節(jié)點(diǎn)的next指向了node的后繼節(jié)點(diǎn),真正執(zhí)行的代碼就是如下一行:

      compareAndSetNext(pred, predNext, next);

      當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn)

      這里直接unpark后繼節(jié)點(diǎn)的線程,然后將next指向了自己。

      這里可能會(huì)有疑問,既然要?jiǎng)h除節(jié)點(diǎn),為什么都沒有對(duì)prev進(jìn)行操作,而僅僅是修改了next?

      要明確的一點(diǎn)是,這里修改指針的操作都是CAS操作,在AQS中所有以compareAndSet開頭的方法都是嘗試更新,并不保證成功,圖中所示的都是執(zhí)行成功的情況。

      那么在執(zhí)行cancelAcquire方法時(shí),當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)有可能已經(jīng)執(zhí)行完并移除隊(duì)列了(參見setHead方法),所以在這里只能用CAS來嘗試更新,而就算是嘗試更新,也只能更新next,不能更新prev,因?yàn)閜rev是不確定的,否則有可能會(huì)導(dǎo)致整個(gè)隊(duì)列的不完整,例如把prev指向一個(gè)已經(jīng)移除隊(duì)列的node。

      什么時(shí)候修改prev呢?其實(shí)prev是由其他線程來修改的?;厝タ聪聅houldParkAfterFailedAcquire方法,該方法有這樣一段代碼:

      do {
          node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      pred.next = node;
      

      該段代碼的作用就是通過prev遍歷到第一個(gè)不是取消狀態(tài)的node,并修改prev。

      這里為什么可以更新prev?因?yàn)閟houldParkAfterFailedAcquire方法是在獲取鎖失敗的情況下才能執(zhí)行,因此進(jìn)入該方法時(shí),說明已經(jīng)有線程獲得鎖了,并且在執(zhí)行該方法時(shí),當(dāng)前節(jié)點(diǎn)之前的節(jié)點(diǎn)不會(huì)變化(因?yàn)橹挥挟?dāng)下一個(gè)節(jié)點(diǎn)獲得鎖的時(shí)候才會(huì)設(shè)置head),所以這里可以更新prev,而且不必用CAS來更新。

      AQS釋放獨(dú)占鎖的實(shí)現(xiàn)

      釋放通過unlock方法來實(shí)現(xiàn):

      public void unlock() { sync.release(1); }

      該方法調(diào)用了release方法,release是在AQS中定義的,看下release代碼:

      public final boolean release(int arg) {
          // 嘗試釋放鎖
          if (tryRelease(arg)) {
              // 釋放成功后unpark后繼節(jié)點(diǎn)的線程
              Node h = head;
              if (h != null && h.waitStatus != 0)
                  unparkSuccessor(h);
              return true;
          }
          return false;
      }
      

      這里首先嘗試著去釋放鎖,成功了之后要去喚醒后繼節(jié)點(diǎn)的線程,這樣其他的線程才有機(jī)會(huì)去執(zhí)行。

      tryRelease代碼如下:

      protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }

      是不是和tryAcquire方法類似?該方法也需要被重寫,在Sync類中的代碼如下:

      protected final boolean tryRelease(int releases) {
          // 這里是將鎖的數(shù)量減1
          int c = getState() - releases;
          // 如果釋放的線程和獲取鎖的線程不是同一個(gè),拋出非法監(jiān)視器狀態(tài)異常
          if (Thread.currentThread() != getExclusiveOwnerThread())
              throw new IllegalMonitorStateException();
          boolean free = false;
          // 由于重入的關(guān)系,不是每次釋放鎖c都等于0,
          // 直到最后一次釋放鎖時(shí),才會(huì)把當(dāng)前線程釋放
          if (c == 0) {
              free = true;
              setExclusiveOwnerThread(null);
          }
          // 記錄鎖的數(shù)量
          setState(c);
          return free;
      }
      

      當(dāng)前線程被釋放之后,需要喚醒下一個(gè)節(jié)點(diǎn)的線程,通過unparkSuccessor方法來實(shí)現(xiàn):

      private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

      主要功能就是要喚醒下一個(gè)線程,這里s == null || s.waitStatus > 0判斷后繼節(jié)點(diǎn)是否為空或者是否是取消狀態(tài),然后從隊(duì)列尾部向前遍歷找到最前面的一個(gè)waitStatus小于0的節(jié)點(diǎn),至于為什么從尾部開始向前遍歷,回想一下cancelAcquire方法的處理過程,cancelAcquire只是設(shè)置了next的變化,沒有設(shè)置prev的變化,在最后有這樣一行代碼:node.next = node,如果這時(shí)執(zhí)行了unparkSuccessor方法,并且向后遍歷的話,就成了死循環(huán)了,所以這時(shí)只有prev是穩(wěn)定的。

      到這里,通過ReentrantLock的lock和unlock來分析AQS獨(dú)占鎖的實(shí)現(xiàn)已經(jīng)基本完成了,但ReentrantLock還有一個(gè)非公平鎖NonfairSync。

      其實(shí)NonfairSync和FairSync主要就是在獲取鎖的方式上不同,公平鎖是按順序去獲取,而非公平鎖是搶占式的獲取,lock的時(shí)候先去嘗試修改state變量,如果搶占成功,則獲取到鎖:

      final void lock() {
          if (compareAndSetState(0, 1))
              setExclusiveOwnerThread(Thread.currentThread());
          else
              acquire(1);
      }
      

      非公平鎖的tryAcquire方法調(diào)用了nonfairTryAcquire方法:

      final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c acquires; if (nextc < 0) // overflow throw new Error('Maximum lock count exceeded'); setState(nextc); return true; } return false; }

      該方法比公平鎖的tryAcquire方法在第二個(gè)if判斷中少了一個(gè)是否存在前繼節(jié)點(diǎn)判斷,F(xiàn)airSync中的tryAcquire代碼中的這個(gè)if語句塊如下:

      if (!hasQueuedPredecessors() &&
          compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(current);
          return true;
      }
      

      總結(jié)

      本文從ReentrantLock出發(fā),比較完整的分析了AQS內(nèi)部獨(dú)占鎖的實(shí)現(xiàn),總體來說實(shí)現(xiàn)的思路很清晰,就是使用了標(biāo)志位 隊(duì)列的方式來處理鎖的狀態(tài),包括鎖的獲取,鎖的競(jìng)爭(zhēng)以及鎖的釋放。在AQS中,state可以表示鎖的數(shù)量,也可以表示其他狀態(tài),state的含義由子類去定義,自己只是提供了對(duì)state的維護(hù)。AQS通過state來實(shí)現(xiàn)線程對(duì)資源的訪問控制,而state具體的含義要在子類中定義。

      AQS在隊(duì)列的維護(hù)上的實(shí)現(xiàn)比較復(fù)雜,尤其是節(jié)點(diǎn)取消時(shí)隊(duì)列的維護(hù),這里并不是通過一個(gè)線程來完成的。同時(shí),AQS中大量的使用CAS來實(shí)現(xiàn)更新,這種更新能夠保證狀態(tài)和隊(duì)列的完整性。


        本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請(qǐng)遵守用戶 評(píng)論公約

        類似文章 更多