先上一個(gè)場(chǎng)景:假如你突然想做飯,但是沒(méi)有廚具,也沒(méi)有食材。網(wǎng)上購(gòu)買(mǎi)廚具比較方便,食材去超市買(mǎi)更放心。 實(shí)現(xiàn)分析:在快遞員送廚具的期間,我們肯定不會(huì)閑著,可以去超市買(mǎi)食材。所以,在主線程里面另起一個(gè)子線程去網(wǎng)購(gòu)廚具。 但是,子線程執(zhí)行的結(jié)果是要返回廚具的,而run方法是沒(méi)有返回值的。所以,這才是難點(diǎn),需要好好考慮一下。 模擬代碼1: package test; public class CommonCook { public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); // 第一步 網(wǎng)購(gòu)廚具 OnlineShopping thread = new OnlineShopping(); thread.start(); thread.join(); // 保證廚具送到 // 第二步 去超市購(gòu)買(mǎi)食材 Thread.sleep(2000); // 模擬購(gòu)買(mǎi)食材時(shí)間 Shicai shicai = new Shicai(); System.out.println("第二步:食材到位"); // 第三步 用廚具烹飪食材 System.out.println("第三步:開(kāi)始展現(xiàn)廚藝"); cook(thread.chuju, shicai); System.out.println("總共用時(shí)" + (System.currentTimeMillis() - startTime) + "ms"); } // 網(wǎng)購(gòu)廚具線程 static class OnlineShopping extends Thread { private Chuju chuju; @Override public void run() { System.out.println("第一步:下單"); System.out.println("第一步:等待送貨"); try { Thread.sleep(5000); // 模擬送貨時(shí)間 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一步:快遞送到"); chuju = new Chuju(); } } // 用廚具烹飪食材 static void cook(Chuju chuju, Shicai shicai) {} // 廚具類 static class Chuju {} // 食材類 static class Shicai {} } 運(yùn)行結(jié)果: 第一步:下單
第一步:等待送貨
第一步:快遞送到
第二步:食材到位
第三步:開(kāi)始展現(xiàn)廚藝
總共用時(shí)7013ms 可以看到,多線程已經(jīng)失去了意義。在廚具送到期間,我們不能干任何事。對(duì)應(yīng)代碼,就是調(diào)用join方法阻塞主線程。 有人問(wèn)了,不阻塞主線程行不行??? 不行?。?! 從代碼來(lái)看的話,run方法不執(zhí)行完,屬性chuju就沒(méi)有被賦值,還是null。換句話說(shuō),沒(méi)有廚具,怎么做飯。 Java現(xiàn)在的多線程機(jī)制,核心方法run是沒(méi)有返回值的;如果要保存run方法里面的計(jì)算結(jié)果,必須等待run方法計(jì)算完,無(wú)論計(jì)算過(guò)程多么耗時(shí)。 面對(duì)這種尷尬的處境,程序員就會(huì)想:在子線程run方法計(jì)算的期間,能不能在主線程里面繼續(xù)異步執(zhí)行??? Where there is a will,there is a way?。?! 這種想法的核心就是Future模式,下面先應(yīng)用一下Java自己實(shí)現(xiàn)的Future模式。 模擬代碼2: package test; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class FutureCook { public static void main(String[] args) throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); // 第一步 網(wǎng)購(gòu)廚具 Callable<Chuju> onlineShopping = new Callable<Chuju>() { @Override public Chuju call() throws Exception { System.out.println("第一步:下單"); System.out.println("第一步:等待送貨"); Thread.sleep(5000); // 模擬送貨時(shí)間 System.out.println("第一步:快遞送到"); return new Chuju(); } }; FutureTask<Chuju> task = new FutureTask<Chuju>(onlineShopping); new Thread(task).start(); // 第二步 去超市購(gòu)買(mǎi)食材 Thread.sleep(2000); // 模擬購(gòu)買(mǎi)食材時(shí)間 Shicai shicai = new Shicai(); System.out.println("第二步:食材到位"); // 第三步 用廚具烹飪食材 if (!task.isDone()) { // 聯(lián)系快遞員,詢問(wèn)是否到貨 System.out.println("第三步:廚具還沒(méi)到,心情好就等著(心情不好就調(diào)用cancel方法取消訂單)"); } Chuju chuju = task.get(); System.out.println("第三步:廚具到位,開(kāi)始展現(xiàn)廚藝"); cook(chuju, shicai); System.out.println("總共用時(shí)" + (System.currentTimeMillis() - startTime) + "ms"); } // 用廚具烹飪食材 static void cook(Chuju chuju, Shicai shicai) {} // 廚具類 static class Chuju {} // 食材類 static class Shicai {} } 運(yùn)行結(jié)果: 第一步:下單
第一步:等待送貨
第二步:食材到位
第三步:廚具還沒(méi)到,心情好就等著(心情不好就調(diào)用cancel方法取消訂單)
第一步:快遞送到
第三步:廚具到位,開(kāi)始展現(xiàn)廚藝
總共用時(shí)5005ms 可以看見(jiàn),在快遞員送廚具的期間,我們沒(méi)有閑著,可以去買(mǎi)食材;而且我們知道廚具到?jīng)]到,甚至可以在廚具沒(méi)到的時(shí)候,取消訂單不要了。 好神奇,有沒(méi)有。 下面具體分析一下第二段代碼: 1)把耗時(shí)的網(wǎng)購(gòu)廚具邏輯,封裝到了一個(gè)Callable的call方法里面。 public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } Callable接口可以看作是Runnable接口的補(bǔ)充,call方法帶有返回值,并且可以拋出異常。
2)把Callable實(shí)例當(dāng)作參數(shù),生成一個(gè)FutureTask的對(duì)象,然后把這個(gè)對(duì)象當(dāng)作一個(gè)Runnable,作為參數(shù)另起線程。 public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V> public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } 這個(gè)繼承體系中的核心接口是Future。Future的核心思想是:一個(gè)方法f,計(jì)算過(guò)程可能非常耗時(shí),等待f返回,顯然不明智。可以在調(diào)用f的時(shí)候,立馬返回一個(gè)Future,可以通過(guò)Future這個(gè)數(shù)據(jù)結(jié)構(gòu)去控制方法f的計(jì)算過(guò)程。 這里的控制包括: get方法:獲取計(jì)算結(jié)果(如果還沒(méi)計(jì)算完,也是必須等待的) cancel方法:還沒(méi)計(jì)算完,可以取消計(jì)算過(guò)程 isDone方法:判斷是否計(jì)算完 isCancelled方法:判斷計(jì)算是否被取消 這些接口的設(shè)計(jì)很完美,F(xiàn)utureTask的實(shí)現(xiàn)注定不會(huì)簡(jiǎn)單,后面再說(shuō)。
3)在第三步里面,調(diào)用了isDone方法查看狀態(tài),然后直接調(diào)用task.get方法獲取廚具,不過(guò)這時(shí)還沒(méi)送到,所以還是會(huì)等待3秒。對(duì)比第一段代碼的執(zhí)行結(jié)果,這里我們節(jié)省了2秒。這是因?yàn)樵诳爝f員送貨期間,我們?nèi)コ匈?gòu)買(mǎi)食材,這兩件事在同一時(shí)間段內(nèi)異步執(zhí)行。
通過(guò)以上3步,我們就完成了對(duì)Java原生Future模式最基本的應(yīng)用。下面具體分析下FutureTask的實(shí)現(xiàn),先看JDK8的,再比較一下JDK6的實(shí)現(xiàn)。 既然FutureTask也是一個(gè)Runnable,那就看看它的run方法 public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 這里的callable是從構(gòu)造方法里面?zhèn)魅说?/span> if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // 保存call方法拋出的異常 } if (ran) set(result); // 保存call方法的執(zhí)行結(jié)果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } 先看try語(yǔ)句塊里面的邏輯,發(fā)現(xiàn)run方法的主要邏輯就是運(yùn)行Callable的call方法,然后將保存結(jié)果或者異常(用的一個(gè)屬性result)。這里比較難想到的是,將call方法拋出的異常也保存起來(lái)了。 這里表示狀態(tài)的屬性state是個(gè)什么鬼 * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; 把FutureTask看作一個(gè)Future,那么它的作用就是控制Callable的call方法的執(zhí)行過(guò)程,在執(zhí)行的過(guò)程中自然會(huì)有狀態(tài)的轉(zhuǎn)換: 1)一個(gè)FutureTask新建出來(lái),state就是NEW狀態(tài);COMPETING和INTERRUPTING用的進(jìn)行時(shí),表示瞬時(shí)狀態(tài),存在時(shí)間極短(為什么要設(shè)立這種狀態(tài)???不解);NORMAL代表順利完成;EXCEPTIONAL代表執(zhí)行過(guò)程出現(xiàn)異常;CANCELED代表執(zhí)行過(guò)程被取消;INTERRUPTED被中斷 2)執(zhí)行過(guò)程順利完成:NEW -> COMPLETING -> NORMAL 3)執(zhí)行過(guò)程出現(xiàn)異常:NEW -> COMPLETING -> EXCEPTIONAL 4)執(zhí)行過(guò)程被取消:NEW -> CANCELLED 5)執(zhí)行過(guò)程中,線程中斷:NEW -> INTERRUPTING -> INTERRUPTED 代碼中狀態(tài)判斷、CAS操作等細(xì)節(jié),請(qǐng)讀者自己閱讀。 再看看get方法的實(shí)現(xiàn): public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } get方法的邏輯很簡(jiǎn)單,如果call方法的執(zhí)行過(guò)程已完成,就把結(jié)果給出去;如果未完成,就將當(dāng)前線程掛起等待。awaitDone方法里面死循環(huán)的邏輯,推演幾遍就能弄懂;它里面掛起線程的主要?jiǎng)?chuàng)新是定義了WaitNode類,來(lái)將多個(gè)等待線程組織成隊(duì)列,這是與JDK6的實(shí)現(xiàn)最大的不同。 掛起的線程何時(shí)被喚醒: private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); // 喚醒線程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } 以上就是JDK8的大體實(shí)現(xiàn)邏輯,像cancel、set等方法,也請(qǐng)讀者自己閱讀。 再來(lái)看看JDK6的實(shí)現(xiàn)。 JDK6的FutureTask的基本操作都是通過(guò)自己的內(nèi)部類Sync來(lái)實(shí)現(xiàn)的,而Sync繼承自AbstractQueuedSynchronizer這個(gè)出鏡率極高的并發(fā)工具類 /** State value representing that task is running */ private static final int RUNNING = 1; /** State value representing that task ran */ private static final int RAN = 2; /** State value representing that task was cancelled */ private static final int CANCELLED = 4; /** The underlying callable */ private final Callable<V> callable; /** The result to return from get() */ private V result; /** The exception to throw from get() */ private Throwable exception; 里面的狀態(tài)只有基本的幾個(gè),而且計(jì)算結(jié)果和異常是分開(kāi)保存的。 V innerGet() throws InterruptedException, ExecutionException { acquireSharedInterruptibly(0); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } 這個(gè)get方法里面處理等待線程隊(duì)列的方式是調(diào)用了acquireSharedInterruptibly方法,看過(guò)我之前幾篇博客文章的讀者應(yīng)該非常熟悉了。其中的等待線程隊(duì)列、線程掛起和喚醒等邏輯,這里不再贅述,如果不明白,請(qǐng)出門(mén)左轉(zhuǎn)。
最后來(lái)看看,F(xiàn)uture模式衍生出來(lái)的更高級(jí)的應(yīng)用。 再上一個(gè)場(chǎng)景:我們自己寫(xiě)一個(gè)簡(jiǎn)單的數(shù)據(jù)庫(kù)連接池,能夠復(fù)用數(shù)據(jù)庫(kù)連接,并且能在高并發(fā)情況下正常工作。 實(shí)現(xiàn)代碼1: package test; import java.util.concurrent.ConcurrentHashMap; public class ConnectionPool { private ConcurrentHashMap<String, Connection> pool = new ConcurrentHashMap<String, Connection>(); public Connection getConnection(String key) { Connection conn = null; if (pool.containsKey(key)) { conn = pool.get(key); } else { conn = createConnection(); pool.putIfAbsent(key, conn); } return conn; } public Connection createConnection() { return new Connection(); } class Connection {} } 我們用了ConcurrentHashMap,這樣就不必把getConnection方法置為synchronized(當(dāng)然也可以用Lock),當(dāng)多個(gè)線程同時(shí)調(diào)用getConnection方法時(shí),性能大幅提升。 貌似很完美了,但是有可能導(dǎo)致多余連接的創(chuàng)建,推演一遍: 某一時(shí)刻,同時(shí)有3個(gè)線程進(jìn)入getConnection方法,調(diào)用pool.containsKey(key)都返回false,然后3個(gè)線程各自都創(chuàng)建了連接。雖然ConcurrentHashMap的put方法只會(huì)加入其中一個(gè),但還是生成了2個(gè)多余的連接。如果是真正的數(shù)據(jù)庫(kù)連接,那會(huì)造成極大的資源浪費(fèi)。 所以,我們現(xiàn)在的難點(diǎn)是:如何在多線程訪問(wèn)getConnection方法時(shí),只執(zhí)行一次createConnection。 結(jié)合之前Future模式的實(shí)現(xiàn)分析:當(dāng)3個(gè)線程都要?jiǎng)?chuàng)建連接的時(shí)候,如果只有一個(gè)線程執(zhí)行createConnection方法創(chuàng)建一個(gè)連接,其它2個(gè)線程只需要用這個(gè)連接就行了。再延伸,把createConnection方法放到一個(gè)Callable的call方法里面,然后生成FutureTask。我們只需要讓一個(gè)線程執(zhí)行FutureTask的run方法,其它的線程只執(zhí)行g(shù)et方法就好了。 上代碼: package test; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class ConnectionPool { private ConcurrentHashMap<String, FutureTask<Connection>> pool = new ConcurrentHashMap<String, FutureTask<Connection>>(); public Connection getConnection(String key) throws InterruptedException, ExecutionException { FutureTask<Connection> connectionTask = pool.get(key); if (connectionTask != null) { return connectionTask.get(); } else { Callable<Connection> callable = new Callable<Connection>() { @Override public Connection call() throws Exception { return createConnection(); } }; FutureTask<Connection> newTask = new FutureTask<Connection>(callable); connectionTask = pool.putIfAbsent(key, newTask); if (connectionTask == null) { connectionTask = newTask; connectionTask.run(); } return connectionTask.get(); } } public Connection createConnection() { return new Connection(); } class Connection { } } 推演一遍:當(dāng)3個(gè)線程同時(shí)進(jìn)入else語(yǔ)句塊時(shí),各自都創(chuàng)建了一個(gè)FutureTask,但是ConcurrentHashMap只會(huì)加入其中一個(gè)。第一個(gè)線程執(zhí)行pool.putIfAbsent方法后返回null,然后connectionTask被賦值,接著就執(zhí)行run方法去創(chuàng)建連接,最后get。后面的線程執(zhí)行pool.putIfAbsent方法不會(huì)返回null,就只會(huì)執(zhí)行g(shù)et方法。 在并發(fā)的環(huán)境下,通過(guò)FutureTask作為中間轉(zhuǎn)換,成功實(shí)現(xiàn)了讓某個(gè)方法只被一個(gè)線程執(zhí)行。 就這么多吧,真是嘔心瀝血?。。。」?/span> |
|