AbstractQueuedSynchronizer簡(jiǎn)介AbstractQueuedSynchronizer提供了一個(gè)FIFO隊(duì)列,可以看做是一個(gè)可以用來實(shí)現(xiàn)鎖以及其他需要同步功能的框架。這里簡(jiǎn)稱該類為AQS。AQS的使用依靠繼承來完成,子類通過繼承自AQS并實(shí)現(xiàn)所需的方法來管理同步狀態(tài)。例如ReentrantLock,CountDownLatch等。 本篇文章基于JDK1.8來介紹,該類有許多實(shí)現(xiàn)類: ![]() 其中,我們最常用的大概就是ReentrantLock和CountDownLatch了。ReentrantLock提供了對(duì)代碼塊的并發(fā)訪問控制,也就是鎖,說是鎖,但其實(shí)并沒有用到關(guān)鍵字 AQS的兩種功能從使用上來說,AQS的功能可以分為兩種:獨(dú)占和共享。對(duì)于這兩種功能,有一個(gè)很常用的類:ReentrantReadWriteLock,其就是通過兩個(gè)內(nèi)部類來分別實(shí)現(xiàn)了這兩種功能,提供了讀鎖和寫鎖的功能。但子類實(shí)現(xiàn)時(shí),只能實(shí)現(xiàn)其中的一種功能,即要么是獨(dú)占功能,要么是共享功能。 對(duì)于獨(dú)占功能,例如如下代碼: ReentrantLock lock = new ReentrantLock();
...
public void function(){
lock.lock();
try {
// do something...
} finally {
lock.unlock();
}
}
這個(gè)很好理解,通過ReentrantLock來保證在 對(duì)于共享功能,例如如下代碼:
代碼中的
AQS獨(dú)占鎖的內(nèi)部實(shí)現(xiàn)AQS的主要數(shù)據(jù)結(jié)構(gòu)由于使用AQS可以實(shí)現(xiàn)鎖的功能,那么下面就要分析一下究竟是如何實(shí)現(xiàn)的。 AQS內(nèi)部維護(hù)著一個(gè)FIFO的隊(duì)列,該隊(duì)列就是用來實(shí)現(xiàn)線程的并發(fā)訪問控制。隊(duì)列中的元素是一個(gè)Node類型的節(jié)點(diǎn),Node的主要屬性如下: static final class Node {
int waitStatus;
Node prev;
Node next;
Node nextWaiter;
Thread thread;
}
其中,隊(duì)列里還有一個(gè) AQS中有一個(gè)
ReentrantLock類的結(jié)構(gòu)下面通過ReentrantLock的實(shí)現(xiàn)進(jìn)一步分析重入鎖的實(shí)現(xiàn)。 首先看一下lock方法: public void lock() {
sync.lock();
}
該方法調(diào)用了
對(duì)于ReentrantLock,有兩種獲取鎖的模式:公平鎖和非公平鎖。所以對(duì)應(yīng)有兩個(gè)內(nèi)部類,都繼承自Sync。而Sync繼承自AQS: ![]() 本文主要通過公平鎖來介紹,看一下FairSync的定義:
AQS獲取獨(dú)占鎖的實(shí)現(xiàn)acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
該方法主要工作如下:
tryAcquire方法
既然該方法需要子類來實(shí)現(xiàn),為什么不使用 該方法是在ReentrantLock中的FairSync和NonfairSync的兩個(gè)內(nèi)部類來實(shí)現(xiàn)的,這里以FairSysc-公平鎖來說明: protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0)
throw new Error('Maximum lock count exceeded');
setState(nextc);
return true;
}
return false;
}
addWaiter方法看下addWaiter方法的定義:
該方法就是根據(jù)當(dāng)前線程創(chuàng)建一個(gè)Node,然后添加到隊(duì)列尾部。 enq方法private Node enq(final Node node) {
// 重復(fù)直到成功
for (;;) {
Node t = tail;
// 如果tail為null,則必須創(chuàng)建一個(gè)Node節(jié)點(diǎn)并進(jìn)行初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 嘗試CAS來設(shè)置隊(duì)尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法該方法的功能是循環(huán)的嘗試獲取鎖,直到成功為止,最后返回中斷標(biāo)志位。
這里有幾個(gè)問題很重要:
下面分別來分析一下。 什么條件下需要park? 看下 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
可見,只有在前繼節(jié)點(diǎn)的狀態(tài)是SIGNAL時(shí),需要park。第二種情況稍后會(huì)詳細(xì)介紹。 為什么要判斷中斷狀態(tài)? 首先要知道,acquireQueued方法中獲取鎖的方式是死循環(huán),判斷是否中斷是在parkAndCheckInterrupt方法中實(shí)現(xiàn)的,看下該方法的代碼:
非常簡(jiǎn)單,阻塞當(dāng)前線程,然后返回線程的中斷狀態(tài)并復(fù)位中斷狀態(tài)。
如果acquireQueued執(zhí)行完畢,返回中斷狀態(tài),回到acquire方法中,根據(jù)返回的中斷狀態(tài)判斷是否需要執(zhí)行 為什么要多做這一步呢?先判斷中斷狀態(tài),然后復(fù)位,如果之前線程是中斷狀態(tài),再進(jìn)行中斷? 這里就要介紹一下park方法了。park方法是Unsafe類中的方法,與之對(duì)應(yīng)的是unpark方法。簡(jiǎn)單來說,當(dāng)前線程如果執(zhí)行了park方法,也就是阻塞了當(dāng)前線程,反之,unpark就是喚醒一個(gè)線程。 具體的說明請(qǐng)參考http://blog.csdn.net/hengyunabc/article/details/28126139 park與wait的作用類似,但是對(duì)中斷狀態(tài)的處理并不相同。如果當(dāng)前線程不是中斷的狀態(tài),park與wait的效果是一樣的;如果一個(gè)線程是中斷的狀態(tài),這時(shí)執(zhí)行wait方法會(huì)報(bào) 所以,知道了這一點(diǎn),就可以知道為什么要進(jìn)行中斷狀態(tài)的復(fù)位了:
所以,這里判斷線程中斷的狀態(tài)實(shí)際上是為了不讓循環(huán)一直執(zhí)行,要讓當(dāng)前線程進(jìn)入阻塞的狀態(tài)。想象一下,如果不這樣判斷,前一個(gè)線程在獲取鎖之后執(zhí)行了很耗時(shí)的操作,那么豈不是要一直執(zhí)行該死循環(huán)?這樣就造成了CPU使用率飆升,這是很嚴(yán)重的后果。 死循環(huán)不會(huì)引起CPU使用率飆升? 上面已經(jīng)說明。 cancelAcquire方法在acquireQueued方法的finally語句塊中,如果在循環(huán)的過程中出現(xiàn)了異常,則執(zhí)行cancelAcquire方法,用于將該節(jié)點(diǎn)標(biāo)記為取消狀態(tài)。該方法代碼如下: private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 設(shè)置該節(jié)點(diǎn)不再關(guān)聯(lián)任何線程
node.thread = null;
// Skip cancelled predecessors
// 通過前繼節(jié)點(diǎn)跳過取消狀態(tài)的node
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 獲取過濾后的前繼節(jié)點(diǎn)的后繼節(jié)點(diǎn)
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 設(shè)置狀態(tài)為取消狀態(tài)
node.waitStatus = Node.CANCELLED;
/*
* If we are the tail, remove ourselves.
* 1.如果當(dāng)前節(jié)點(diǎn)是tail:
* 嘗試更新tail節(jié)點(diǎn),設(shè)置tail為pred;
* 更新失敗則返回,成功則設(shè)置tail的后繼節(jié)點(diǎn)為null
*/
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
/*
* 2.如果當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn):
* 判斷當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)的狀態(tài)是否是SIGNAL,如果不是則嘗試設(shè)置前繼節(jié)點(diǎn)的狀態(tài)為SIGNAL;
* 上面兩個(gè)條件如果有一個(gè)返回true,則再判斷前繼節(jié)點(diǎn)的thread是否不為空;
* 若滿足以上條件,則嘗試設(shè)置當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)的后繼節(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),也就是相當(dāng)于將當(dāng)前節(jié)點(diǎn)從隊(duì)列中刪除
*/
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 3.如果是head的后繼節(jié)點(diǎn)或者狀態(tài)判斷或設(shè)置失敗,則喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
該方法中執(zhí)行的過程有些復(fù)雜,首先是要獲取當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn),如果前繼節(jié)點(diǎn)的狀態(tài)不是取消狀態(tài)(即 接下來的工作可以分為3種情況:
我們依次來分析一下: 當(dāng)前節(jié)點(diǎn)是tail 這種情況很簡(jiǎn)單,因?yàn)閠ail是隊(duì)列的最后一個(gè)節(jié)點(diǎn),如果該節(jié)點(diǎn)需要取消,則直接把該節(jié)點(diǎn)的前繼節(jié)點(diǎn)的next指向null,也就是把當(dāng)前節(jié)點(diǎn)移除隊(duì)列。出隊(duì)的過程如下: ![]() 注意:經(jīng)驗(yàn)證,這里并沒有設(shè)置node的prev為null。 當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn),也不是tail ![]() 這里將node的前繼節(jié)點(diǎn)的next指向了node的后繼節(jié)點(diǎn),真正執(zhí)行的代碼就是如下一行:
當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn) ![]() 這里直接unpark后繼節(jié)點(diǎn)的線程,然后將next指向了自己。 這里可能會(huì)有疑問,既然要?jiǎng)h除節(jié)點(diǎn),為什么都沒有對(duì)prev進(jìn)行操作,而僅僅是修改了next? 要明確的一點(diǎn)是,這里修改指針的操作都是CAS操作,在AQS中所有以 那么在執(zhí)行cancelAcquire方法時(shí),當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)有可能已經(jīng)執(zhí)行完并移除隊(duì)列了(參見 什么時(shí)候修改prev呢?其實(shí)prev是由其他線程來修改的?;厝タ聪聅houldParkAfterFailedAcquire方法,該方法有這樣一段代碼:
該段代碼的作用就是通過prev遍歷到第一個(gè)不是取消狀態(tài)的node,并修改prev。 這里為什么可以更新prev?因?yàn)閟houldParkAfterFailedAcquire方法是在獲取鎖失敗的情況下才能執(zhí)行,因此進(jìn)入該方法時(shí),說明已經(jīng)有線程獲得鎖了,并且在執(zhí)行該方法時(shí),當(dāng)前節(jié)點(diǎn)之前的節(jié)點(diǎn)不會(huì)變化(因?yàn)橹挥挟?dāng)下一個(gè)節(jié)點(diǎn)獲得鎖的時(shí)候才會(huì)設(shè)置head),所以這里可以更新prev,而且不必用CAS來更新。 AQS釋放獨(dú)占鎖的實(shí)現(xiàn)釋放通過unlock方法來實(shí)現(xiàn): public void unlock() {
sync.release(1);
}
該方法調(diào)用了release方法,release是在AQS中定義的,看下release代碼:
這里首先嘗試著去釋放鎖,成功了之后要去喚醒后繼節(jié)點(diǎn)的線程,這樣其他的線程才有機(jī)會(huì)去執(zhí)行。 tryRelease代碼如下: protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
是不是和tryAcquire方法類似?該方法也需要被重寫,在Sync類中的代碼如下:
當(dāng)前線程被釋放之后,需要喚醒下一個(gè)節(jié)點(diǎn)的線程,通過unparkSuccessor方法來實(shí)現(xiàn): private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
主要功能就是要喚醒下一個(gè)線程,這里 到這里,通過ReentrantLock的lock和unlock來分析AQS獨(dú)占鎖的實(shí)現(xiàn)已經(jīng)基本完成了,但ReentrantLock還有一個(gè)非公平鎖NonfairSync。 其實(shí)NonfairSync和FairSync主要就是在獲取鎖的方式上不同,公平鎖是按順序去獲取,而非公平鎖是搶占式的獲取,lock的時(shí)候先去嘗試修改state變量,如果搶占成功,則獲取到鎖:
非公平鎖的tryAcquire方法調(diào)用了nonfairTryAcquire方法: final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0) // overflow
throw new Error('Maximum lock count exceeded');
setState(nextc);
return true;
}
return false;
}
該方法比公平鎖的tryAcquire方法在第二個(gè)if判斷中少了一個(gè)是否存在前繼節(jié)點(diǎn)判斷,F(xiàn)airSync中的tryAcquire代碼中的這個(gè)if語句塊如下:
總結(jié)本文從ReentrantLock出發(fā),比較完整的分析了AQS內(nèi)部獨(dú)占鎖的實(shí)現(xiàn),總體來說實(shí)現(xiàn)的思路很清晰,就是使用了標(biāo)志位 隊(duì)列的方式來處理鎖的狀態(tài),包括鎖的獲取,鎖的競(jìng)爭(zhēng)以及鎖的釋放。在AQS中,state可以表示鎖的數(shù)量,也可以表示其他狀態(tài),state的含義由子類去定義,自己只是提供了對(duì)state的維護(hù)。AQS通過state來實(shí)現(xiàn)線程對(duì)資源的訪問控制,而state具體的含義要在子類中定義。 AQS在隊(duì)列的維護(hù)上的實(shí)現(xiàn)比較復(fù)雜,尤其是節(jié)點(diǎn)取消時(shí)隊(duì)列的維護(hù),這里并不是通過一個(gè)線程來完成的。同時(shí),AQS中大量的使用CAS來實(shí)現(xiàn)更新,這種更新能夠保證狀態(tài)和隊(duì)列的完整性。 |
|