Java5增加了新的類庫并發(fā)集java.util.concurrent,該類庫為并發(fā)程序提供了豐富的API多線程編程在Java 5中更加容易,靈活。本文通過一個網(wǎng)絡(luò)服務(wù)器模型,來實踐Java5的多線程編程,該模型中使用了Java5中的線程池,阻塞隊列,可重入鎖等,還實踐了 Callable, Future等接口,并使用了Java 5的另外一個新特性泛型。 簡介 本文將實現(xiàn)一個網(wǎng)絡(luò)服務(wù)器模型,一旦有客戶端連接到該服務(wù)器,則啟動一個新線程為該連接服務(wù),服務(wù)內(nèi)容為往客戶端輸送一些字符信息。一個典型的網(wǎng)絡(luò)服務(wù)器模型如下: 1. 建立監(jiān)聽端口。 2. 發(fā)現(xiàn)有新連接,接受連接,啟動線程,執(zhí)行服務(wù)線程。 3. 服務(wù)完畢,關(guān)閉線程。 這個模型在大部分情況下運行良好,但是需要頻繁的處理用戶請求而每次請求需要的服務(wù)又是簡短的時候,系統(tǒng)會將大量的時間花費在線程的創(chuàng)建銷毀。Java 5的線程池克服了這些缺點。通過對重用線程來執(zhí)行多個任務(wù),避免了頻繁線程的創(chuàng)建與銷毀開銷,使得服務(wù)器的性能方面得到很大提高。因此,本文的網(wǎng)絡(luò)服務(wù)器模型將如下: 1. 建立監(jiān)聽端口,創(chuàng)建線程池。 2. 發(fā)現(xiàn)有新連接,使用線程池來執(zhí)行服務(wù)任務(wù)。 3. 服務(wù)完畢,釋放線程到線程池。 下面詳細介紹如何使用Java 5的concurrent包提供的API來實現(xiàn)該服務(wù)器。 初始化 初始化包括創(chuàng)建線程池以及初始化監(jiān)聽端口。創(chuàng)建線程池可以通過調(diào)用java.util.concurrent.Executors類里的靜態(tài)方法 newChahedThreadPool或是newFixedThreadPool來創(chuàng)建,也可以通過新建一個 java.util.concurrent.ThreadPoolExecutor實例來執(zhí)行任務(wù)。這里我們采用newFixedThreadPool方法來建立線程池。 ExecutorService pool = Executors.newFixedThreadPool(10); 表示新建了一個線程池,線程池里面有10個線程為任務(wù)隊列服務(wù)。 使用ServerSocket對象來初始化監(jiān)聽端口。 private static final int PORT = 19527; serverListenSocket = new ServerSocket(PORT); serverListenSocket.setReuseAddress(true); serverListenSocket.setReuseAddress(true); 服務(wù)新連接 當有新連接建立時,accept返回時,將服務(wù)任務(wù)提交給線程池執(zhí)行。 while(true){ Socket socket = serverListenSocket.accept(); pool.execute(new ServiceThread(socket)); } 這里使用線程池對象來執(zhí)行線程,減少了每次線程創(chuàng)建和銷毀的開銷。任務(wù)執(zhí)行完畢,線程釋放到線程池。 服務(wù)任務(wù) 服務(wù)線程ServiceThread維護一個count來記錄服務(wù)線程被調(diào)用的次數(shù)。每當服務(wù)任務(wù)被調(diào)用一次時,count的值自增1,因此 ServiceThread提供一個increaseCount和getCount的方法,分別將count值自增1和取得該count值。由于可能多個線程存在競爭,同時訪問count,因此需要加鎖機制,在Java 5之前,我們只能使用synchronized來鎖定。Java 5中引入了性能更加粒度更細的重入鎖ReentrantLock。我們使用ReentrantLock保證代碼線程安全。下面是具體代碼: private static ReentrantLock lock = new ReentrantLock (); private static int count = 0; private int getCount(){ int ret = 0; try{ lock.lock(); ret = count; }finally{ lock.unlock(); } return ret; } private void increaseCount(){ try{ lock.lock(); ++count; }finally{ lock.unlock(); } } 服務(wù)線程在開始給客戶端打印一個歡迎信息, increaseCount(); int curCount = getCount(); helloString = "hello, id = " + curCount+"\r\n"; dos = new DataOutputStream(connectedSocket.getOutputStream()); dos.write(helloString.getBytes()); 然后使用ExecutorService的submit方法提交一個Callable的任務(wù),返回一個Future接口的引用。這種做法對費時的任務(wù)非常有效,submit任務(wù)之后可以繼續(xù)執(zhí)行下面的代碼,然后在適當?shù)奈恢每梢允褂肍uture的get方法來獲取結(jié)果,如果這時候該方法已經(jīng)執(zhí)行完畢,則無需等待即可獲得結(jié)果,如果還在執(zhí)行,則等待到運行完畢。 ExecutorService executor = Executors.newSingleThreadExecutor(); Future dos.write("let‘s do soemthing other".getBytes()); String result = future.get(); dos.write(result.getBytes()); 其中TimeConsumingTask實現(xiàn)了Callable接口 class TimeConsumingTask implements Callable public String call() throws Exception { System.out.println("It‘s a time-consuming task, you‘d better retrieve your result in the furture"); return "ok, here‘s the result: It takes me lots of time to produce this result"; } } 這里使用了Java 5的另外一個新特性泛型,聲明TimeConsumingTask的時候使用了String做為類型參數(shù)。必須實現(xiàn)Callable接口的call函數(shù),其作用類似與Runnable中的run函數(shù),在call函數(shù)里寫入要執(zhí)行的代碼,其返回值類型等同于在類聲明中傳入的類型值。在這段程序中,我們提交了一個Callable的任務(wù),然后程序不會堵塞,而是繼續(xù)執(zhí)行dos.write("let‘s do soemthing other".getBytes());當程序執(zhí)行到String result = future.get()時如果call函數(shù)已經(jīng)執(zhí)行完畢,則取得返回值,如果還在執(zhí)行,則等待其執(zhí)行完畢。 |
|