在Merlin之前,編寫Socket程序是比較繁瑣的工作.因為輸入輸出都必須同步.這樣,對于多客戶端客戶/服務(wù)器模式,不得不使用多線程.即為每個連接的客戶都分配一個線程來處理輸入輸出.由此而帶來的問題是可想而知的.程序員不得不為了避免死鎖,線程安全等問題,進行大量的編碼和測試.很多人都在抱怨為什么不在Java中引入異步輸入輸出機制.比較官方的解釋是,任何一種應(yīng)用程序接口的引入,都必須兼容任何操作平臺.因為Java是跨平臺的.而當(dāng)時支持異步輸入輸出機制的操作平臺顯然不可能是全部.自Java
2
Platform以后,分離出J2SE,J2ME,J2EE三種不同類型的應(yīng)用程序接口,以適應(yīng)不同的應(yīng)用開發(fā).Java標(biāo)準(zhǔn)的制訂者們意識到了這個問題,并且支持異步輸入輸出機制的操作平臺在當(dāng)今操作平臺中處于主流地位.于是,Jdk(J2SE)
的第五次發(fā)布中引入了異步輸入輸出機制.
以前的Socket進程通信程序設(shè)計中,一般客戶端和服務(wù)器端程序設(shè)計如下: 服務(wù)器端: //服務(wù)器端監(jiān)聽線程 while (true) { ............. Socket clientSocket; clientSocket = socket.accept(); //取得客戶請求Socket,如果沒有//客戶請求連接,線程在此處阻塞 //用取得的Socket構(gòu)造輸入輸出流 PrintStream os = new PrintStream(new BufferedOutputStream(clientSocket.getOutputStream(), 1024), false); BufferedReader is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); //創(chuàng)建客戶會話線程,進行輸入輸出控制,為同步機制 new ClientSession(); ....... } 客戶端: ............ clientSocket = new Socket(HOSTNAME, LISTENPORT);//連接服務(wù)器套接字 //用取得的Socket構(gòu)造輸入輸出流 PrintStream os = new PrintStream(new BufferedOutputStream(clientSocket.getOutputStream(), 1024), false); BufferedReader is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); //進行輸入輸出控制 ....... 以上代碼段只是用同步機制編寫Socket進程通信的一個框架,實際上要考慮的問題要復(fù)雜的多(有興趣的讀者可以參考我的一篇文章《Internet 實時通信系統(tǒng)設(shè)計與實現(xiàn)》)。將這樣一個框架列出來,只是為了與用異步機制實現(xiàn)的Socket進程通信進行比較。下面將介紹使用異步機制的程序設(shè)計。 回頁首 用異步輸入輸出流編寫Socket進程通信程序 在Merlin中加入了用于實現(xiàn)異步輸入輸出機制的應(yīng)用程序接口包:java.nio(新的輸入輸出包,定義了很多基本類型緩沖(Buffer)), java.nio.channels(通道及選擇器等,用于異步輸入輸出),java.nio.charset(字符的編碼解碼)。通道 (Channel)首先在選擇器(Selector)中注冊自己感興趣的事件,當(dāng)相應(yīng)的事件發(fā)生時,選擇器便通過選擇鍵(SelectionKey)通知已注冊的通道。然后通道將需要處理的信息,通過緩沖(Buffer)打包,編碼/解碼,完成輸入輸出控制。 通道介紹: 這里主要介紹ServerSocketChannel和 SocketChannel.它們都是可選擇的(selectable)通道,分別可以工作在同步和異步兩種方式下(注意,這里的可選擇不是指可以選擇兩種工作方式,而是指可以有選擇的注冊自己感興趣的事件)??梢杂胏hannel.configureBlocking(Boolean )來設(shè)置其工作方式。與以前版本的API相比較,ServerSocketChannel就相當(dāng)于ServerSocket (ServerSocketChannel封裝了ServerSocket),而SocketChannel就相當(dāng)于Socket (SocketChannel封裝了Socket)。當(dāng)通道工作在同步方式時,編程方法與以前的基本相似,這里主要介紹異步工作方式。 所謂異步輸入輸出機制,是指在進行輸入輸出處理時,不必等到輸入輸出處理完畢才返回。所以異步的同義語是非阻塞(None Blocking)。在服務(wù)器端,ServerSocketChannel通過靜態(tài)函數(shù)open()返回一個實例serverChl。然后該通道調(diào)用 serverChl.socket().bind()綁定到服務(wù)器某端口,并調(diào)用register(Selector sel, SelectionKey.OP_ACCEPT)注冊O(shè)P_ACCEPT事件到一個選擇器中(ServerSocketChannel只可以注冊 OP_ACCEPT事件)。當(dāng)有客戶請求連接時,選擇器就會通知該通道有客戶連接請求,就可以進行相應(yīng)的輸入輸出控制了;在客戶端,clientChl實例注冊自己感興趣的事件后(可以是OP_CONNECT,OP_READ,OP_WRITE的組合),調(diào)用clientChl.connect (InetSocketAddress )連接服務(wù)器然后進行相應(yīng)處理。注意,這里的連接是異步的,即會立即返回而繼續(xù)執(zhí)行后面的代碼。 選擇器和選擇鍵介紹: 選擇器(Selector)的作用是:將通道感興趣的事件放入隊列中,而不是馬上提交給應(yīng)用程序,等已注冊的通道自己來請求處理這些事件。換句話說,就是選擇器將會隨時報告已經(jīng)準(zhǔn)備好了的通道,而且是按照先進先出的順序。那么,選擇器是通過什么來報告的呢?選擇鍵(SelectionKey)。選擇鍵的作用就是表明哪個通道已經(jīng)做好了準(zhǔn)備,準(zhǔn)備干什么。你也許馬上會想到,那一定是已注冊的通道感興趣的事件。不錯,例如對于服務(wù)器端serverChl來說,可以調(diào)用key.isAcceptable()來通知serverChl有客戶端連接請求。相應(yīng)的函數(shù)還有: SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一個循環(huán)中輪詢感興趣的事件(具體可參照下面的代碼)。如果選擇器中尚無通道已注冊事件發(fā)生,調(diào)用Selector.select()將阻塞,直到有事件發(fā)生為止。另外,可以調(diào)用 selectNow()或者select(long timeout)。前者立即返回,沒有事件時返回0值;后者等待timeout時間后返回。一個選擇器最多可以同時被63個通道一起注冊使用。 應(yīng)用實例: 下面是用異步輸入輸出機制實現(xiàn)的客戶/服務(wù)器實例程序?D?D程序清單1(限于篇幅,只給出了服務(wù)器端實現(xiàn),讀者可以參照著實現(xiàn)客戶端代碼): 程序類圖 程序清單1 public class NBlockingServer { int port = 8000; int BUFFERSIZE = 1024; Selector selector = null; ServerSocketChannel serverChannel = null; HashMap clientChannelMap = null;//用來存放每一個客戶連接對應(yīng)的套接字和通道 public NBlockingServer( int port ) { this.clientChannelMap = new HashMap(); this.port = port; } public void initialize() throws IOException { //初始化,分別實例化一個選擇器,一個服務(wù)器端可選擇通道 this.selector = Selector.open(); this.serverChannel = ServerSocketChannel.open(); this.serverChannel.configureBlocking(false); InetAddress localhost = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(localhost, this.port ); this.serverChannel.socket().bind(isa);//將該套接字綁定到服務(wù)器某一可用端口 } //結(jié)束時釋放資源 public void finalize() throws IOException { this.serverChannel.close(); this.selector.close(); } //將讀入字節(jié)緩沖的信息解碼 public String decode( ByteBuffer byteBuffer ) throws CharacterCodingException { Charset charset = Charset.forName( "ISO-8859-1" ); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode( byteBuffer ); String result = charBuffer.toString(); return result; } //監(jiān)聽端口,當(dāng)通道準(zhǔn)備好時進行相應(yīng)操作 public void portListening() throws IOException, InterruptedException { //服務(wù)器端通道注冊O(shè)P_ACCEPT事件 SelectionKey acceptKey =this.serverChannel.register( this.selector, SelectionKey.OP_ACCEPT ); //當(dāng)有已注冊的事件發(fā)生時,select()返回值將大于0 while (acceptKey.selector().select() > 0 ) { System.out.println("event happened"); //取得所有已經(jīng)準(zhǔn)備好的所有選擇鍵 Set readyKeys = this.selector.selectedKeys(); //使用迭代器對選擇鍵進行輪詢 Iterator i = readyKeys.iterator(); while (i.hasNext()) { SelectionKey key = (SelectionKey)i.next(); i.remove();//刪除當(dāng)前將要處理的選擇鍵 if ( key.isAcceptable() ) {//如果是有客戶端連接請求 System.out.println("more client connect in!"); ServerSocketChannel nextReady = (ServerSocketChannel)key.channel(); //獲取客戶端套接字 Socket s = nextReady.accept(); //設(shè)置對應(yīng)的通道為異步方式并注冊感興趣事件 s.getChannel().configureBlocking( false ); SelectionKey readWriteKey = s.getChannel().register( this.selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE ); //將注冊的事件與該套接字聯(lián)系起來 readWriteKey.attach( s ); //將當(dāng)前建立連接的客戶端套接字及對應(yīng)的通道存放在哈希表//clientChannelMap中 this.clientChannelMap.put( s, new ClientChInstance( s.getChannel() ) ); } else if ( key.isReadable() ) {//如果是通道讀準(zhǔn)備好事件 System.out.println("Readable"); //取得選擇鍵對應(yīng)的通道和套接字 SelectableChannel nextReady = (SelectableChannel) key.channel(); Socket socket = (Socket) key.attachment(); //處理該事件,處理方法已封裝在類ClientChInstance中 this.readfromChannel( socket.getChannel(), (ClientChInstance) this.clientChannelMap.get( socket ) ); } else if ( key.isWritable() ) {//如果是通道寫準(zhǔn)備好事件 System.out.println("writeable"); //取得套接字后處理,方法同上 Socket socket = (Socket) key.attachment(); SocketChannel channel = (SocketChannel) socket.getChannel(); this.writeToChannel( channel,"This is from server!"); } } } } //對通道的寫操作 public void writeToChannel( SocketChannel channel, String message ) throws IOException { ByteBuffer buf = ByteBuffer.wrap( message.getBytes() ); int nbytes = channel.write( buf ); } //對通道的讀操作 public void readfromChannel( SocketChannel channel, ClientChInstance clientInstance ) throws IOException, InterruptedException { ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE ); int nbytes = channel.read( byteBuf |
|