乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      學(xué)習(xí)JUC源碼(1)——AQS同步隊(duì)列(源碼分析結(jié)合圖文理解)

       路人甲Java 2021-06-25

      前言

        最近結(jié)合書籍《Java并發(fā)編程藝術(shù)》一直在看AQS的源碼,發(fā)現(xiàn)AQS核心就是:利用內(nèi)置的FIFO雙向隊(duì)列結(jié)構(gòu)來實(shí)現(xiàn)線程排隊(duì)獲取int變量的同步狀態(tài),以此奠定了很多并發(fā)包中大部分實(shí)現(xiàn)基礎(chǔ),比如ReentranLock等。今天又是周末,便來總結(jié)下最近看的消化后的內(nèi)容。

        主要參考資料《Java并發(fā)編程藝術(shù)》(有需要的小伙伴可以找我,我這里只有電子PDF)結(jié)合ReentranLock、AQS等源碼。


      一、同步隊(duì)列的結(jié)構(gòu)與實(shí)現(xiàn)

      1、同步隊(duì)列的結(jié)構(gòu)

      (1)結(jié)構(gòu)介紹

        AQS使用的同步隊(duì)列是基于一種CLH鎖算法來實(shí)現(xiàn)(引用網(wǎng)上資料對CLH簡單介紹):

        CLH鎖也是一種基于鏈表的可擴(kuò)展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋;

        結(jié)點(diǎn)之間是通過隱形的鏈表相連,之所以叫隱形的鏈表是由于這些結(jié)點(diǎn)之間沒有明顯的next指針,而是通過myPred所指向的結(jié)點(diǎn)的變化情況來影響myNode的行為;

        當(dāng)一個(gè)線程須要獲取鎖時(shí),會(huì)創(chuàng)建一個(gè)新的QNode。將當(dāng)中的locked設(shè)置為true表示須要獲取鎖。然后線程對tail域調(diào)用getAndSet方法,使自己成為隊(duì)列的尾部。同一時(shí)候獲取一個(gè)指向其前趨的引用myPred,然后該線程就在前趨結(jié)點(diǎn)的locked字段上旋轉(zhuǎn)。直到前趨結(jié)點(diǎn)釋放鎖。

        當(dāng)一個(gè)線程須要釋放鎖時(shí),將當(dāng)前結(jié)點(diǎn)的locked域設(shè)置為false,同一時(shí)候回收前趨結(jié)點(diǎn)。線程A須要獲取鎖。其myNode域?yàn)閠rue。些時(shí)tail指向線程A的結(jié)點(diǎn),然后線程B也增加到線程A后面。tail指向線程B的結(jié)點(diǎn)。然后線程A和B都在它的myPred域上旋轉(zhuǎn),一旦它的myPred結(jié)點(diǎn)的locked字段變?yōu)閒alse,它就能夠獲取鎖。

      而在源碼中也有這樣的介紹:

      /**
       * Wait queue node class.
       *
       * <p>The wait queue is a variant of a "" (Craig, Landin, and
       * Hagersten) lock queue. CLH locks are normally used for
       * spinlocks. 
       * ...........
       * <p>To enqueue into a CLH lock, you atomically splice it in as new
       * tail. To dequeue, you just set the head field.
       * <pre>
       *      +------+  prev +-----+       +-----+
       * head |      | <---- |     | <---- |     |  tail
       *      +------+       +-----+       +-----+
       * </pre>
       * ..............

      在AQS中的同步隊(duì)列結(jié)構(gòu)以及獲取/釋放鎖都是基于此實(shí)現(xiàn)的,這里我們先放一個(gè)我畫的基本結(jié)構(gòu)來理解AQS同步隊(duì)列,再進(jìn)一步介紹一些細(xì)節(jié)。

               

      根據(jù)以上圖我們看到:

      • 該隊(duì)列是雙向FIFO隊(duì)列,每個(gè)節(jié)點(diǎn)都有pre、next域;

      • 同步器包含了兩個(gè)節(jié)點(diǎn)類型的引用,一個(gè)指向頭結(jié)點(diǎn),一個(gè)指向尾節(jié)點(diǎn);

      • 新加入線程被構(gòu)造成Node通過調(diào)用compareAndSetTail加入同步隊(duì)列中;

      • 使用setHead(Node node)設(shè)置頭結(jié)點(diǎn),指向隊(duì)列頭。使用compareAndSetTail(Node exceptNode, Node updateNode)指向隊(duì)列尾節(jié)點(diǎn)。

      在源碼中我們可以看到:

          // 內(nèi)部類Node節(jié)點(diǎn)static final class Node{...}// 同步隊(duì)列的head引用private transient volatile Node head;// 同步隊(duì)列的tail引用private transient volatile Node tail;

      (2)節(jié)點(diǎn)構(gòu)成

      那么Node結(jié)構(gòu)的具體構(gòu)成是什么呢?我們具體看內(nèi)部類Node的源碼:

          static final class Node {/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED =  1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL    = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/** * waitStatus value to indicate the next acquireShared should
               * unconditionally propagate         */static final int PROPAGATE = -3;/** 等待狀態(tài):
               * 0 INITAIL: 初始狀態(tài)
               * 1 CANCELLED: 由于等待超時(shí)或者被中斷,需要從同步隊(duì)列中取消等待,節(jié)點(diǎn)進(jìn)入該狀態(tài)不會(huì)被改變
               * -1 SIGNAL: 當(dāng)前節(jié)點(diǎn)釋放同步狀態(tài)或被取消,則等待狀態(tài)的后繼節(jié)點(diǎn)被通知
               * -2 CONDITION: 節(jié)點(diǎn)在等待隊(duì)列中,線程在Condition上,需要其它線程調(diào)用Condition的signal()方法才能從等待隊(duì)轉(zhuǎn)移到同步隊(duì)列
               * -3 PROPAGATE: 表示下一個(gè)共享式同步狀態(tài)將會(huì)無條件被傳播下去         */volatile int waitStatus;/** 前驅(qū)結(jié)點(diǎn) */volatile Node prev;/** 后繼節(jié)點(diǎn) */volatile Node next; /** 獲取同步狀態(tài)的線程 */volatile Thread thread;/** 等待隊(duì)列中的后繼節(jié)點(diǎn) */Node nextWaiter;      /** 判斷Node是否是共享模式 */final boolean isShared() {return nextWaiter == SHARED;
              } /** 返回前驅(qū)結(jié)點(diǎn) */final Node predecessor() throws NullPointerException {
                  Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;
              }
      
              Node() {    // Used to establish initial head or SHARED marker        }
      
              Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;
              }
      
              Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;
              }
          }

      從源碼中可以發(fā)現(xiàn):同步隊(duì)列中的節(jié)點(diǎn)Node用來保存獲取同步狀態(tài)失敗的線程引用、等待狀態(tài)以及前驅(qū)和后繼節(jié)點(diǎn)。

      節(jié)點(diǎn)是構(gòu)成同步隊(duì)列的基礎(chǔ),沒有成功獲取同步狀態(tài)的線程將成為節(jié)點(diǎn)加入該隊(duì)列的尾部。當(dāng)一個(gè)線程無法獲取同步狀態(tài)時(shí),會(huì)被構(gòu)造成節(jié)點(diǎn)并加入同步隊(duì)列中,通過CAS保證設(shè)置尾節(jié)點(diǎn)這一步是線程安全的,此時(shí)才能認(rèn)為當(dāng)前節(jié)點(diǎn)(線程)成功加入同步隊(duì)列與尾節(jié)點(diǎn)建立聯(lián)系。具體的實(shí)現(xiàn)邏輯請看下面介紹!

      2、同步狀態(tài)獲取與釋放

      (1)獨(dú)占式同步狀態(tài)獲取與釋放

      通過調(diào)用同步器acquire(int arg)方法可以獲取同步狀態(tài),該方法中斷不敏感,也就是由于線程獲取同步狀態(tài)失敗后進(jìn)入同步隊(duì)列中,后序線程對進(jìn)行中斷操作時(shí),線程不會(huì)從同步隊(duì)列中移出

        public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                  selfInterrupt();
          }

      同步狀態(tài)獲取主要的流程步驟:

      1)首先調(diào)用自定義同步器實(shí)現(xiàn)tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態(tài)

      2)如果獲取失敗則構(gòu)造同步節(jié)點(diǎn)(獨(dú)占式Node.EXCLUSIVE)并通過addWaiter(Node ndoe)方法將該節(jié)點(diǎn)加入到同步隊(duì)列的尾部,同時(shí)enq(node)通過for(;;)循環(huán)保證安全設(shè)置尾節(jié)點(diǎn)。

       private Node addWaiter(Node mode) {// 根據(jù)給定模式構(gòu)造NodeNode node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail; // 嘗試在尾部添加if (pred != null) {
                  node.prev = pred;// cas方式保證正確添加尾節(jié)點(diǎn)if (compareAndSetTail(pred, node)) {
                      pred.next = node;return node;
                  }
              }// enq主要是通過for(;;)死循環(huán)來確保節(jié)點(diǎn)正確添加// 在for(;;)死循環(huán)中,通過cas將節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)時(shí),才返回;否則一直嘗試設(shè)置        enq(node);return node;
          } private Node enq(final Node node) {for (;;) {
                  Node t = tail;if (t == null) { // Must initialize  當(dāng)tail節(jié)點(diǎn)為null時(shí),必須初始化構(gòu)造好    head節(jié)點(diǎn)if (compareAndSetHead(new Node()))
                          tail = head;
                  } else { // 否則就通過cas開始添加尾節(jié)點(diǎn)node.prev = t;if (compareAndSetTail(t, node)) {
                          t.next = node;return t;
                      }
                  }
              }
          }

      假設(shè)原隊(duì)列中存在Node-1到Node-4節(jié)點(diǎn),此時(shí)某個(gè)線程獲取同步狀態(tài)失敗則構(gòu)成成Node-5通過CAS方式加入隊(duì)列(下圖忽略自旋環(huán)節(jié))。

            

      3)節(jié)點(diǎn)進(jìn)入同步隊(duì)列之后“自旋”,即acquireQueued(final Node node, int arg)方法,在這個(gè)方法中,當(dāng)前node死循環(huán)嘗試獲取鎖狀態(tài),但是只有node的前驅(qū)結(jié)點(diǎn)是Head才能嘗試獲取同步狀態(tài),取成功之后立即設(shè)置當(dāng)前節(jié)點(diǎn)為Head,并成功返回。否則就會(huì)一直自旋。

      final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// 當(dāng)前node節(jié)點(diǎn)的前驅(qū)是Head時(shí)(p == head),才能有資格去嘗試獲取同步狀態(tài)(tryAcquire(arg))// 這是因?yàn)楫?dāng)前節(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)獲得同步狀態(tài),才能喚醒后繼節(jié)點(diǎn),即當(dāng)前節(jié)點(diǎn)if (p == head && tryAcquire(arg)) { // 以上條件滿足之后setHead(node); // 設(shè)置當(dāng)前節(jié)點(diǎn)為Headp.next = null; // help GC // 釋放ndoe的前驅(qū)節(jié)點(diǎn)failed = false;return interrupted;
                      }// 線程被中斷或者前驅(qū)結(jié)點(diǎn)被釋放,則繼續(xù)進(jìn)入檢查:p == head && tryAcquire(argif (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
                          interrupted = true;
                  }
              } finally {if (failed)
                      cancelAcquire(node);
              }
          }

      此時(shí)新加入的Node-5節(jié)點(diǎn)也開始自旋,此時(shí)的Head(Node-1)已經(jīng)獲取到了同步狀態(tài),而Node-2退出了自旋,成為了新的Head。

         

      文字總結(jié):

      1)同步器會(huì)維護(hù)一個(gè)雙向FIFO隊(duì)列,獲取同步失敗的線程將會(huì)被構(gòu)造成Node加入隊(duì)尾(并且做自旋檢查:檢查前驅(qū)結(jié)點(diǎn)是否是Head);

      2)當(dāng)前線程想要獲得同步狀態(tài),前提是其前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn),并且獲得了同步狀態(tài);

      3)當(dāng)Head調(diào)用release(int arg)釋放鎖的同時(shí)會(huì)喚醒后繼節(jié)點(diǎn)(即當(dāng)前節(jié)點(diǎn)),后繼節(jié)點(diǎn)結(jié)束自旋

      流程圖總結(jié):

                 

      同步器的release方法:釋放鎖的同時(shí),喚醒后繼節(jié)點(diǎn)(進(jìn)而時(shí)后繼節(jié)點(diǎn)重新獲取同步狀態(tài))

          public final boolean release(int arg) {if (tryRelease(arg)) {
                  Node h = head;if (h != null && h.waitStatus != 0)// 該方法會(huì)喚醒Head節(jié)點(diǎn)的后繼節(jié)點(diǎn),使其重試嘗試獲取同步狀態(tài)                unparkSuccessor(h);return true;
              }return false;
          }

      UnparkSuccessor(Node node)方法使用LookSupport(LockSupport.unpark)喚醒處于等待狀態(tài)的線程(之后會(huì)慢慢看源碼介紹)。

      (2)共享式同步狀態(tài)獲取與釋放

      共享鎖跟獨(dú)占式鎖最大的不同就是:某一時(shí)刻有多個(gè)線程同時(shí)獲取到同步狀態(tài),獲取判斷是否獲取同步狀態(tài)成功的關(guān)鍵,獲取到的同步狀態(tài)要大于等于0。而其他步驟基本都是一致的,還是從源碼開始分析起:帶后綴Share都為共享式同步方法。

      1)acquireShared(int arg)獲取同步狀態(tài):如果獲取失敗則加入隊(duì)尾,并且檢查是否具備退出自旋的條件(前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)并且能成功獲取同步狀態(tài))

          public final void acquireShared(int arg) {// tryAcquireShared 獲取同步狀態(tài),大于0才是獲取狀態(tài)成功,否則就是失敗if (tryAcquireShared(arg) < 0)// 獲取狀態(tài)失敗則構(gòu)造共享Node,加入隊(duì)列;// 并且檢查是否具備退出自旋的條件:即preNode為head,并且能獲取到同步狀態(tài)            doAcquireShared(arg);
          }

      2)doAcquireShared(arg):獲取失敗的Node加入隊(duì)列,如果當(dāng)前節(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)的話,嘗試獲取同步狀態(tài),如果大于等于0則在for(;;)中退出(退出自旋)。

      private void doAcquireShared(int arg) {// 構(gòu)造共享模式的Nodefinal Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);// 前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn),并且能獲取狀態(tài)成功,則return返回,退出死循環(huán)(自旋)if (r >= 0) {
                              setHeadAndPropagate(node, r);
                              p.next = null; // help GCif (interrupted)
                                  selfInterrupt();
                              failed = false;return;
                          }
                      }if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
                          interrupted = true;
                  }
              } finally {if (failed)
                      cancelAcquire(node);
              }
          }

      3)releaseShared(int arg):釋放同步狀態(tài),通過loop+CAS方式釋放多個(gè)線程的同步狀態(tài)。

          public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {// 通過loop+CAS方式釋放多個(gè)線程的同步狀態(tài)            doReleaseShared();return true;
              }return false;
          }

      二、自定義同步組件(實(shí)現(xiàn)Lock,內(nèi)部類Sync繼承AQS)

      1、實(shí)現(xiàn)一個(gè)不可重入的互斥鎖Mutex

      2、實(shí)現(xiàn)指定共享數(shù)量的共享鎖MyShareLock

        本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多