一、使用自定義鎖實(shí)現(xiàn)生成--消費(fèi)模型下面我們使用上節(jié)自定義的鎖實(shí)現(xiàn)一個簡單的生產(chǎn)--消費(fèi)模型,代碼如下:package com.ruigege.LockSourceAnalysis6;
import java.util.Queue; import java.util.concurrent.locks.Condition;
public class Test { final static NonReentrantLock lock = new NonReentrantLock(); final static Condition notFull = lock.newCondition(); final static Condition notEmpty = lock.newCondition(); final static Queue<String> queue = new LinkedBlockingQueue<String>(); final static int queueSize = 10; public static void main(String[] args) { Thread producer = new Thread(new Runnable() { public void run() { // 獲取獨(dú)占鎖 lock.lock(); try { // (1)如果隊(duì)列滿了,則等待 while(queue.size() == queueSize) { notEmpty.await(); } // (2)添加元素到隊(duì)列 queue.add("ele"); // (3)喚醒消費(fèi)線程 notFull.signalAll(); }catch(Exception e) { e.printStackTrace(); }finally { // 釋放鎖 lock.unlock(); } } }); Thread consumer = new Thread(new Runnable() { public void run() { // 獲取獨(dú)占鎖 lock.lock(); try { // 隊(duì)列空,則等待 while(0 == queue.size()) { notFull.await(); } // 消費(fèi)一個元素 String ele = queue.poll(); // 喚醒生產(chǎn)線程 notEmpty.signalAll(); }catch(Exception e) { e.printStackTrace(); }finally { // 釋放鎖 lock.unlock(); } } }); // 啟動線程 producer.start(); consumer.start(); }
}
- 如上代碼首先創(chuàng)建了一個NonReentrantLock的一個對象lock,然后調(diào)用lock.newCondition創(chuàng)建了兩個條件變量,用來進(jìn)行生產(chǎn)者和消費(fèi)者線程之間的同步。
- 在main函數(shù)中,首先創(chuàng)建了producer生產(chǎn)線程,在線程內(nèi)部首先調(diào)用lock.lock()獲取獨(dú)占鎖,然后判斷當(dāng)前隊(duì)列是否已經(jīng)滿了,如果滿了則調(diào)用notEmpty.await()阻塞掛起當(dāng)前線程,需要注意的是,這里使用了while而不是if是為了避免虛假喚醒,如果隊(duì)列不滿則直接向隊(duì)列里面添加元素,然后調(diào)用notFull.signalAll()喚醒所有因?yàn)橄M(fèi)元素而被i阻塞的消費(fèi)線程,最后釋放獲取的鎖。
二.使用自定義鎖實(shí)現(xiàn)一個消費(fèi)模型package com.ruigege.LockSourceAnalysis6;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock;
public class NonReentrantLockME implements Lock,java.io.Serializable{ // 內(nèi)部幫助類 private static class Sync extends AbstractQueueSynchronizer { // 是否鎖已經(jīng)被持有 protected boolean isHeldExclusively() { return getState() == 1; } // 如果state為0,則嘗試獲取鎖 public boolean tryAcquire(int acquires) { assert acquires == 1; if(compareAndSetState(0,1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 嘗試釋放鎖,設(shè)置state為0 protected boolean tryRelease(int release) { assert releases == 1; if(getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } // 提供條件變量接口 Condition newConditon() { return new ConditionObject(); } } // 創(chuàng)建一個Sync來做具體的工作 private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newConditon(); } public boolean isLocked() { return sync.isHeldExclusively(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public boolean tryLock(long timeout,TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(timeout)); } }
- 使用NonReentrantLock創(chuàng)建一個實(shí)例,然后調(diào)用newCondition方法來生成兩個條件變量來進(jìn)行生產(chǎn)者和消費(fèi)者線程之間的同步。
- 在main函數(shù)中,首先創(chuàng)建了producer生產(chǎn)線程,在線程內(nèi)部先獲取了獨(dú)占鎖,然后看一下隊(duì)列是否滿了,如果滿了,那就阻塞當(dāng)前線程,如果沒有滿直接在隊(duì)列中加入隊(duì)列中,這里使用的while循環(huán)而不是使用if語句,這是為了避免虛假喚醒。然后調(diào)用notFull.sinalAll()喚醒所有因?yàn)橄M(fèi)元素而被阻塞的消費(fèi)線程,最后釋放了鎖。
- 在main函數(shù)中創(chuàng)建了consumer線程,先獲取獨(dú)占鎖,先判斷隊(duì)列有沒有元素,如果沒有元素,那么就先掛起當(dāng)前線程,這里使用了while是為了避免虛假喚醒,如果隊(duì)列中不為空,那么就拿出一個元素,然后喚醒因?yàn)殛?duì)列滿而被阻塞的生產(chǎn)線程,最后釋放獲取的鎖。
|