通常,Java I/O 框架用途極其廣泛。同一個框架支持文件存取、網(wǎng)絡(luò)訪問、字符轉(zhuǎn)換、壓縮和加密等等。不過,有時它不是十分靈活。例如,壓縮流允許您將數(shù)據(jù)寫成壓縮格式,但它們不能讓您讀取壓縮格式的數(shù)據(jù)。同樣地,某些第三方模塊被構(gòu)建成寫出數(shù)據(jù),而沒有考慮應(yīng)用程序需要讀取數(shù)據(jù)的情形。本文是兩部分系列文章的第一部分,Java 密碼專家和作家 Merlin Hughes 介紹了使應(yīng)用程序從僅支持將數(shù)據(jù)寫至輸出流的源中有效讀取數(shù)據(jù)的框架。
自早期基于瀏覽器的 applet 和簡單應(yīng)用程序以來,Java 平臺已有了巨大的發(fā)展?,F(xiàn)在,我們有多個平臺和概要及許多新的 API,并且還在制作的差不多有數(shù)百種之多。盡管 Java 語言的復(fù)雜程度在不斷增加,但它對于日常的編程任務(wù)而言仍是一個出色的工具。雖然有時您會陷入那些日復(fù)一日的編程問題中,但偶爾您也能夠回過頭去,發(fā)現(xiàn)一個很棒的解決方案來處理您以前曾多次遇到過的問題。
就在前幾天,我想要壓縮一些通過網(wǎng)絡(luò)連接讀取的數(shù)據(jù)(我以壓縮格式將 TCP 數(shù)據(jù)中繼到一個 UDP 套接字)。記得 Java 平臺自版本 1.1 開始就支持壓縮,所以我直接求助于 java.util.zip
包,希望能找到一個適合于我的解決方案。然而,我發(fā)現(xiàn)一個問題:構(gòu)造的類都適用于常規(guī)情況,即在讀取時對數(shù)據(jù)解壓縮而在寫入時壓縮它們,沒有其它變通方法。雖然繞過 I/O 類是可能的,但我希望構(gòu)建一個基于流的解決方案,而不想偷懶直接使用壓縮程序。
不久以前,我在另一種情況下也遇到過完全相同的問題。我有一個 base-64 轉(zhuǎn)碼庫,與使用壓縮包一樣,它支持對從流中讀取的數(shù)據(jù)進(jìn)行譯碼,并對寫入流中的數(shù)據(jù)進(jìn)行編碼。然而,我需要的是一個在我從流中讀取數(shù)據(jù)的同時可以進(jìn)行編碼的庫。
在我著手解決該問題時,我認(rèn)識到我在另一種情況下也遇到過該問題:當(dāng)序列化 XML 文檔時,通常會循環(huán)遍歷整個文檔,將節(jié)點(diǎn)寫入流中。然而,我遇到的情況是需要讀取序列化格式的文檔,以便將子集重新解析成一個新文檔。
回過頭想一下,我意識到這些孤立事件表示了一個共性的問題:如果有一個遞增地將數(shù)據(jù)寫入輸出流的數(shù)據(jù)源,那么我需要一個輸入流使我能夠讀取這些數(shù)據(jù),每當(dāng)需要更多數(shù)據(jù)時,都能透明地訪問數(shù)據(jù)源。
在本文中,我們將研究對這一問題的三種可能的解決方案,同時決定一個實(shí)現(xiàn)最佳解決方案的新框架。然后,我們將針對上面列出的每個問題,檢驗(yàn)該框架。我們將扼要地談及性能方面的問題,而把對此的大量討論留到下一篇文章中。
I/O 流基礎(chǔ)知識
首先,讓我們簡單回顧一下 Java 平臺的基本流類,如圖 1 所示。 OutputStream
表示對其寫入數(shù)據(jù)的流。通常,該流將直接連接至諸如文件或網(wǎng)絡(luò)連接之類的設(shè)備,或連接至另一個輸出流(在這種情況下,它稱為 過濾器(filter))。通常,輸出流過濾器在轉(zhuǎn)換了寫入其中的數(shù)據(jù)之后,才將轉(zhuǎn)換后產(chǎn)生的數(shù)據(jù)寫入相連的流中。 InputStream
表示可以從中讀取數(shù)據(jù)的流。同樣,該流也直接連接至設(shè)備或其它流。輸入流過濾器從相連的流中讀取數(shù)據(jù),轉(zhuǎn)換該數(shù)據(jù),然后允許從中讀取轉(zhuǎn)換后的數(shù)據(jù)。
圖 1. I/O 流基礎(chǔ)知識
就我最初的問題看, GZIPOutputStream
類是一個輸出流過濾器,它壓縮寫入其中的數(shù)據(jù),然后將該壓縮數(shù)據(jù)寫入相連的流。我需要的輸入流過濾器應(yīng)該能從流中讀取數(shù)據(jù),壓縮數(shù)據(jù),然后讓我讀取結(jié)果。
Java 平臺,版本 1.4 已引入了一個新的 I/O 框架 java.nio
。不過,該框架在很大程度上與提供對操作系統(tǒng) I/O 資源的有效訪問有關(guān);而且,雖然它確實(shí)為一些傳統(tǒng)的 java.io
類提供了類似功能,并可以表示同時支持輸入和輸出的雙重用途的資源,但它并不能完全替代標(biāo)準(zhǔn)流類,并且不能直接處理我需要解決的問題。
蠻力解決方案
在著手尋找解決我問題的工程方案前,我根據(jù)標(biāo)準(zhǔn) Java API 類的精致和有效性,研究了基于這些類的解決方案。
該問題的蠻力解決方案就是簡單地從輸入源中讀取所有數(shù)據(jù),然后通過轉(zhuǎn)換程序(即,壓縮流、編碼流或 XML 序列化器)將它們推進(jìn)內(nèi)存緩沖區(qū)中。然后,我可以從該內(nèi)存緩沖區(qū)中打開要讀取的流,這樣我就解決了問題。
首先,我需要一個通用的 I/O 方法。清單 1 中的方法利用一個小緩沖區(qū)將 InputStream
中的所有數(shù)據(jù)復(fù)制到 OutputStream
。當(dāng)?shù)竭_(dá)輸入的結(jié)尾( read()
函數(shù)的返回值小于零)時,該方法就返回,但不關(guān)閉這兩個流。
清單 1. 通用的 I/O 方法
public static void io (InputStream in, OutputStream out)
throws IOException {
byte[] buffer = new byte[8192];
int amount;
while ((amount = in.read (buffer)) >= 0)
out.write (buffer, 0, amount);
}
|
清單 2 顯示蠻力解決方案如何使我讀取壓縮格式的輸入流。我打開寫入內(nèi)存緩沖區(qū)的 GZIPOutputStream
(使用 ByteArrayOutputStream
)。接著,將輸入流復(fù)制到壓縮流中,這樣將壓縮數(shù)據(jù)填入內(nèi)存緩沖區(qū)中。然后,我返回 ByteArrayInputStream
,它讓我從輸入流中讀取,如圖 2 所示。
圖 2. 蠻力解決方案
清單 2. 蠻力解決方案
public static InputStream bruteForceCompress (InputStream in)
throws IOException {
ByteArrayOutputStream sink = new ByteArrayOutputStream ():
OutputStream out = new GZIPOutputStream (sink);
io (in, out);
out.close ();
byte[] buffer = sink.toByteArray ();
return new ByteArrayInputStream (buffer);
}
|
這個解決方案有一個明顯的缺點(diǎn),它將整個壓縮文檔都存儲在內(nèi)存中。如果文檔很大,那么這種方法將不必要地浪費(fèi)系統(tǒng)資源。使用流的主要特性之一是它們允許您操作比所用系統(tǒng)內(nèi)存要大的數(shù)據(jù):您可以在讀取數(shù)據(jù)時處理它們,或在寫入數(shù)據(jù)時生成數(shù)據(jù),而無需始終將所有數(shù)據(jù)保存在內(nèi)存中。
從效率上,讓我們對在緩沖區(qū)之間復(fù)制數(shù)據(jù)進(jìn)行更深入研究。
通過 io()
方法,將數(shù)據(jù)從輸入源讀入至一個緩沖區(qū)中。然后,將數(shù)據(jù)從緩沖區(qū)寫入 ByteArrayOutputStream
中的緩沖區(qū)(通過我忽略的壓縮過程)。然而, ByteArrayOutputStream
類對擴(kuò)展的內(nèi)部緩沖區(qū)進(jìn)行操作;每當(dāng)緩沖區(qū)變滿時,就會分配一個大小是原來兩倍的新緩沖區(qū),接著將現(xiàn)有的數(shù)據(jù)復(fù)制到該緩沖區(qū)中。平均下來,這一過程每個字節(jié)復(fù)制兩次。(算術(shù)計算很簡單:當(dāng)進(jìn)入 ByteArrayOutputStream
時,對數(shù)據(jù)平均復(fù)制兩次;所有數(shù)據(jù)至少復(fù)制一次;有一半數(shù)據(jù)至少復(fù)制兩次;四分之一的數(shù)據(jù)至少復(fù)制三次,依次類推。)然后,將數(shù)據(jù)從該緩沖區(qū)復(fù)制到 ByteArrayInputStream
的一個新緩沖區(qū)中?,F(xiàn)在,應(yīng)用程序可以讀取數(shù)據(jù)了??傊?,這個解決方案將通過四個緩沖區(qū)寫數(shù)據(jù)。這對于估計其它技術(shù)的效率是一個有用的基準(zhǔn)。
管道式流解決方案
管道式流 PipedOutputStream
和 PipedInputStream
在 Java 虛擬機(jī)的線程之間提供了基于流的連接。一個線程將數(shù)據(jù)寫入 PipedOutputStream
中的同時,另一個線程可以從相關(guān)聯(lián)的 PipedInputStream
中讀取該數(shù)據(jù)。
就這樣,這些類提供了一個針對我問題的解決方案。清單 3 顯示了使用一個線程通過 GZIPOutputStream
將數(shù)據(jù)從輸入流復(fù)制到 PipedOutputStream
的代碼。然后,相關(guān)聯(lián)的 PipedInputStream
將提供對來自另一個線程的壓縮數(shù)據(jù)的讀取權(quán),如圖 3 所示:
圖 3. 管道式流解決方案
清單 3. 管道式流解決方案
private static InputStream pipedCompress (final InputStream in)
throws IOException {
PipedInputStream source = new PipedInputStream ();
final OutputStream out =
new GZIPOutputStream (new PipedOutputStream (source));
new Thread () {
public void run () {
try {
Streams.io (in, out);
out.close ();
} catch (IOException ex) {
ex.printStackTrace ();
}
}
}.start ();
return source;
}
|
理論上,這可能是個好技術(shù):通過使用線程(一個執(zhí)行壓縮,另一個處理產(chǎn)生的數(shù)據(jù)),應(yīng)用程序可以從硬件 SMP(對稱多處理)或 SMT(對稱多線程)中受益。另外,這一解決方案僅涉及兩個緩沖區(qū)寫操作:I/O 循環(huán)將數(shù)據(jù)從輸入流讀入緩沖區(qū),然后通過壓縮流寫入 PipedOutputStream
。接著,輸出流將數(shù)據(jù)存儲在內(nèi)部緩沖區(qū)中,與 PipedInputStream
共享緩沖區(qū)以供應(yīng)用程序讀取。而且,因?yàn)閿?shù)據(jù)通過固定緩沖區(qū)流動,所以從不需要將它們完全讀入內(nèi)存中。事實(shí)上,在任何給定時刻,緩沖區(qū)都只存儲小部分的工作集。
不過,實(shí)際上,它的性能很糟糕。管道式流需要利用同步,從而引起兩個線程之間激烈爭奪同步。它們的內(nèi)部緩沖區(qū)太小,無法有效地處理大量數(shù)據(jù)或隱藏鎖爭用。其次,持久共享緩沖區(qū)會阻礙許多簡單的高速緩存策略共享 SMP 機(jī)器上的工作負(fù)載。最后,線程的使用使得異常處理極其困難:沒有辦法將可能出現(xiàn)的任何 IOException
下推到管道中以便閱讀器處理。總之,這一解決方案太難處理,根本不實(shí)際。
 |
同步問題
本文中提供的代碼都不能同步;也就是說,兩個線程并發(fā)地訪問其中一個類的共享實(shí)例是不安全的。 因?yàn)槿?NIO 框架和 Collections API 之類的庫已經(jīng)公認(rèn)是實(shí)用的,所以使得同步成為應(yīng)用程序中的一種負(fù)擔(dān)。如果應(yīng)用程序希望對一個對象進(jìn)行并發(fā)訪問,應(yīng)用程序必須采取必要的步驟來同步訪問。 雖然最近的 JVM 已在其線程安全性機(jī)制的性能上有了很大的改進(jìn),但同步仍是一個開銷很大的操作。在 I/O 的情況下,對單個流的并發(fā)訪問幾乎必定是一個錯誤;結(jié)果數(shù)據(jù)流的次序是不確定的,這不是理想的情形。正因?yàn)檫@樣,要同步這些類會強(qiáng)加一些不必要的費(fèi)用,又沒有確實(shí)的收益。 我們將在這一系列文章的第 2 部分更詳細(xì)討論多線程的考慮事項(xiàng);目前,只要注意:對我所提供的對流的并發(fā)訪問將導(dǎo)致不確定錯誤。
|
|
工程解決方案
現(xiàn)在,我們將研究另一種解決該問題的工程方案。這種解決方案提供了一個特地為解決這類問題而設(shè)計的框架,該框架提供了對數(shù)據(jù)的 InputStream
訪問,這些數(shù)據(jù)是從遞增地向 OutputStream
寫入數(shù)據(jù)的源中產(chǎn)生的。遞增地寫入數(shù)據(jù)這一事實(shí)很重要。如果源在單個原子操作中將所有數(shù)據(jù)都寫入 OutputStream
,而且如果不使用線程,則我們基本上又回到了蠻力技術(shù)的老路上。不過,如果可以訪問源以遞增地寫入其數(shù)據(jù),則我們就實(shí)現(xiàn)了在蠻力和管道式流解決方案之間的良好平衡。該解決方案不僅提供了在任何時候只在內(nèi)存中保存少量數(shù)據(jù)的管道式優(yōu)點(diǎn),同時也提供了避免線程的蠻力技術(shù)的優(yōu)點(diǎn)。
圖 4 演示了完整的解決方案。我們將在本文的剩余部分研究 該解決方案的源代碼。
圖 4. 工程解決方案
輸出引擎
清單 4 提供了一個描述數(shù)據(jù)源的接口 OutputEngine
。正如我所說的,這些源遞增地將數(shù)據(jù)寫入輸出流:
清單 4. 輸出引擎
package org.merlin.io;
import java.io.*;
/**
* An incremental data source that writes data to an OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute
* it and/or modify it under the terms of the GNU
* General Public License as published by the Free
* Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public interface OutputEngine {
public void initialize (OutputStream out) throws IOException;
public void execute () throws IOException;
public void finish () throws IOException;
}
|
initialize()
方法向該引擎提供一個流,應(yīng)該向這個流寫入數(shù)據(jù)。然后,重復(fù)調(diào)用 execute()
方法將數(shù)據(jù)寫入該流中。當(dāng)數(shù)據(jù)寫完時,引擎會關(guān)閉該流。最后,當(dāng)引擎應(yīng)該關(guān)閉時,將調(diào)用 finish()
。這會發(fā)生在引擎關(guān)閉其輸出流的前后。
I/O 流引擎
輸出引擎解決了讓我費(fèi)力處理的問題,它是一個通過輸出流過濾器將數(shù)據(jù)從輸入流復(fù)制到目標(biāo)輸出流的引擎。這滿足了遞增性的特性,因?yàn)樗梢砸淮巫x寫單個緩沖區(qū)。
清單 5 到 10 中的代碼實(shí)現(xiàn)了這樣的一個引擎。通過輸入流和輸入流工廠來構(gòu)造它。清單 11 是一個生成過濾后的輸出流的工廠;例如,它會返回包裝了目標(biāo)輸出流的 GZIPOutputStream
。
清單 5. I/O 流引擎
package org.merlin.io;
import java.io.*;
/**
* An output engine that copies data from an InputStream through
* a FilterOutputStream to the target OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class IOStreamEngine implements OutputEngine {
private static final int DEFAULT_BUFFER_SIZE = 8192;
private InputStream in;
private OutputStreamFactory factory;
private byte[] buffer;
private OutputStream out;
|
該類的構(gòu)造器只初始化各種變量和將用于傳輸數(shù)據(jù)的緩沖區(qū)。
清單 6. 構(gòu)造器
public IOStreamEngine (InputStream in, OutputStreamFactory factory) {
this (in, factory, DEFAULT_BUFFER_SIZE);
}
public IOStreamEngine
(InputStream in, OutputStreamFactory factory, int bufferSize) {
this.in = in;
this.factory = factory;
buffer = new byte[bufferSize];
}
|
在 initialize()
方法中,該引擎調(diào)用其工廠來封裝與其一起提供的 OutputStream
。該工廠通常將一個過濾器連接至 OutputStream
。
清單 7. initialize() 方法
public void initialize (OutputStream out) throws IOException {
if (this.out != null) {
throw new IOException ("Already initialised");
} else {
this.out = factory.getOutputStream (out);
}
}
|
在 execute()
方法中,引擎從 InputStream
中讀取一個緩沖區(qū)的數(shù)據(jù),然后將它們寫入已封裝的 OutputStream
;或者,如果輸入結(jié)束,它會關(guān)閉 OutputStream
。
清單 8. execute() 方法
public void execute () throws IOException {
if (out == null) {
throw new IOException ("Not yet initialised");
} else {
int amount = in.read (buffer);
if (amount < 0) {
out.close ();
} else {
out.write (buffer, 0, amount);
}
}
}
|
最后,當(dāng)關(guān)閉引擎時,它就關(guān)閉其 InputStream
。
清單 9. 關(guān)閉 InputStream
public void finish () throws IOException {
in.close ();
}
|
內(nèi)部 OutputStreamFactory
接口(下面清單 10 中所示)描述可以返回過濾后的 OutputStream
的類。
清單 10. 內(nèi)部輸出流工廠接口
public static interface OutputStreamFactory {
public OutputStream getOutputStream (OutputStream out)
throws IOException;
}
}
|
清單 11 顯示將提供的流封裝到 GZIPOutputStream
中的一個示例工廠:
清單 11. GZIP 輸出流工廠
public class GZIPOutputStreamFactory
implements IOStreamEngine.OutputStreamFactory {
public OutputStream getOutputStream (OutputStream out)
throws IOException {
return new GZIPOutputStream (out);
}
}
|
該 I/O 流引擎及其輸出流工廠框架通常足以支持大多數(shù)的輸出流過濾需要。
輸出引擎輸入流
最后,我們還需要一小段代碼來完成這個解決方案。清單 12 到 16 中的代碼提供了讀取由輸出引擎所寫數(shù)據(jù)的輸入流。事實(shí)上,這段代碼有兩個部分:主類是一個從內(nèi)部緩沖區(qū)讀取數(shù)據(jù)的輸入流。與此緊密耦合的是一個輸出流(如清單 17 所示),它把輸出引擎所寫的數(shù)據(jù)填充到內(nèi)部讀緩沖區(qū)。
主輸入流類將用其內(nèi)部輸出流來初始化輸出引擎。然后,每當(dāng)它的緩沖區(qū)為空時,它會自動執(zhí)行該引擎來接收更多數(shù)據(jù)。輸出引擎將數(shù)據(jù)寫入其輸出流中,這將重新填充輸入流的內(nèi)部緩沖區(qū),以允許需要內(nèi)部緩沖區(qū)數(shù)據(jù)的應(yīng)用程序高效地讀取數(shù)據(jù)。
清單 12. 輸出引擎輸入流
package org.merlin.io;
import java.io.*;
/**
* An input stream that reads data from an OutputEngine.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class OutputEngineInputStream extends InputStream {
private static final int DEFAULT_INITIAL_BUFFER_SIZE = 8192;
private OutputEngine engine;
private byte[] buffer;
private int index, limit, capacity;
private boolean closed, eof;
|
該輸入流的構(gòu)造器獲取一個輸出引擎以從中讀取數(shù)據(jù)和一個可選的緩沖區(qū)大小。該流首先初始化其本身,然后初始化輸出引擎。
清單 13. 構(gòu)造器
public OutputEngineInputStream (OutputEngine engine) throws IOException {
this (engine, DEFAULT_INITIAL_BUFFER_SIZE);
}
public OutputEngineInputStream (OutputEngine engine, int initialBufferSize)
throws IOException {
this.engine = engine;
capacity = initialBufferSize;
buffer = new byte[capacity];
engine.initialize (new OutputStreamImpl ());
}
|
代碼的主要讀部分是一個相對簡單的基于字節(jié)數(shù)組的輸入流,與 ByteArrayInputStream
類非常相似。然而,每當(dāng)需要數(shù)據(jù)而該流為空時,它都會調(diào)用輸出引擎的 execute()
方法來重新填寫讀緩沖區(qū)。然后,將這些新數(shù)據(jù)返回給調(diào)用程序。因而,這個類將對輸出引擎所寫的數(shù)據(jù)反復(fù)讀取,直到它讀完為止,此時將設(shè)置 eof
標(biāo)志并且該流將返回已到達(dá)文件末尾的信息。
清單 14. 讀取數(shù)據(jù)
private byte[] one = new byte[1];
public int read () throws IOException {
int amount = read (one, 0, 1);
return (amount < 0) ? -1 : one[0] & 0xff;
}
public int read (byte data[], int offset, int length)
throws IOException {
if (data == null) {
throw new NullPointerException ();
} else if
((offset < 0) || (length < 0) || (offset + length > data.length)) {
throw new IndexOutOfBoundsException ();
} else if (closed) {
throw new IOException ("Stream closed");
} else {
while (index >= limit) {
if (eof)
return -1;
engine.execute ();
}
if (limit - index < length)
length = limit - index;
System.arraycopy (buffer, index, data, offset, length);
index += length;
return length;
}
}
public long skip (long amount) throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else if (amount <= 0) {
return 0;
} else {
while (index >= limit) {
if (eof)
return 0;
engine.execute ();
}
if (limit - index < amount)
amount = limit - index;
index += (int) amount;
return amount;
}
}
public int available () throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else {
return limit - index;
}
}
|
當(dāng)操作數(shù)據(jù)的應(yīng)用程序關(guān)閉該流時,它調(diào)用輸出引擎的 finish()
方法,以便可以釋放其正在使用的任何資源。
清單 15. 釋放資源
public void close () throws IOException {
if (!closed) {
closed = true;
engine.finish ();
}
}
|
當(dāng)輸出引擎將數(shù)據(jù)寫入其輸出流時,調(diào)用 writeImpl()
方法。它將這些數(shù)據(jù)復(fù)制到讀緩沖區(qū),并更新讀限制索引;這將使新數(shù)據(jù)可自動地用于讀方法。
在單次循環(huán)中,如果輸出引擎寫入的數(shù)據(jù)比緩沖區(qū)中可以保存的數(shù)據(jù)多,則緩沖區(qū)的容量會翻倍。然而,這不能頻繁發(fā)生;緩沖區(qū)應(yīng)該快速擴(kuò)展到足夠的大小,以便進(jìn)行狀態(tài)穩(wěn)定的操作。
清單 16. writeImpl() 方法
private void writeImpl (byte[] data, int offset, int length) {
if (index >= limit)
index = limit = 0;
if (limit + length > capacity) {
capacity = capacity * 2 + length;
byte[] tmp = new byte[capacity];
System.arraycopy (buffer, index, tmp, 0, limit - index);
buffer = tmp;
limit -= index;
index = 0;
}
System.arraycopy (data, offset, buffer, limit, length);
limit += length;
}
|
下面清單 17 中顯示的內(nèi)部輸出流實(shí)現(xiàn)表示了一個流將數(shù)據(jù)寫入內(nèi)部輸出流緩沖區(qū)。該代碼驗(yàn)證參數(shù)都是可接受的,并且如果是這樣的話,它調(diào)用 writeImpl()
方法。
清單 17. 內(nèi)部輸出流實(shí)現(xiàn)
private class OutputStreamImpl extends OutputStream {
public void write (int datum) throws IOException {
one[0] = (byte) datum;
write (one, 0, 1);
}
public void write (byte[] data, int offset, int length)
throws IOException {
if (data == null) {
throw new NullPointerException ();
} else if
((offset < 0) || (length < 0) || (offset + length > data.length)) {
throw new IndexOutOfBoundsException ();
} else if (eof) {
throw new IOException ("Stream closed");
} else {
writeImpl (data, offset, length);
}
}
|
最后,當(dāng)輸出引擎關(guān)閉其輸出流,表明它已寫入了所有的數(shù)據(jù)時,該輸出流設(shè)置輸入流的 eof
標(biāo)志,表明已經(jīng)讀取了所有的數(shù)據(jù)。
清單 18. 設(shè)置輸入流的 eof 標(biāo)志
public void close () {
eof = true;
}
}
}
|
敏感的讀者可能注意到我應(yīng)該將 writeImpl()
方法的主體直接放在輸出流實(shí)現(xiàn)中:內(nèi)部類有權(quán)訪問所有包含類的私有成員。然而,對這些字段的內(nèi)部類訪問比由包含類的直接方法的訪問在效率方面稍許差一些。所以,考慮到效率以及為了使類之間的相關(guān)性最小化,我使用額外的助手方法。
應(yīng)用工程解決方案:在讀取期間壓縮數(shù)據(jù)
清單 19 演示了這個類框架的使用來解決我最初的問題:在我讀取數(shù)據(jù)時壓縮它們。該解決方案歸結(jié)為創(chuàng)建一個與輸入流相關(guān)聯(lián)的 IOStreamEngine
和一個 GZIPOutputStreamFactory
,然后將 OutputEngineInputStream
與這個 GZIPOutputStreamFactory
相連。自動執(zhí)行流的初始化和連接,然后可以直接從結(jié)果流中讀取壓縮數(shù)據(jù)。當(dāng)處理完成且關(guān)閉流時,輸出引擎自動關(guān)閉,并且它關(guān)閉初始輸入流。
清單 19. 應(yīng)用工程解決方案
private static InputStream engineCompress (InputStream in)
throws IOException {
return new OutputEngineInputStream
(new IOStreamEngine (in, new GZIPOutputStreamFactory ()));
}
|
雖然為解決這類問題而設(shè)計的解決方案應(yīng)該產(chǎn)生十分清晰的代碼,這一點(diǎn)沒有什么可驚奇的,但是通常要充分留意以下教訓(xùn):無論問題大小,應(yīng)用良好的設(shè)計技術(shù)都幾乎肯定會產(chǎn)生更為清晰、更便于維護(hù)的代碼。
測試性能
從效率看, IOStreamEngine
將數(shù)據(jù)讀入其內(nèi)部緩沖區(qū),然后通過壓縮過濾器將它們寫入 OutputStreamImpl
。這將數(shù)據(jù)直接寫入 OutputEngineInputStream
,以便它們可供讀取。總共只執(zhí)行兩次緩沖區(qū)復(fù)制,這意味著我應(yīng)該從管道式流解決方案的緩沖區(qū)復(fù)制效率和蠻力解決方案的無線程效率的結(jié)合中獲益。
要測試實(shí)際的性能,我編寫了一個簡單的測試工具(請參閱所附 資源中的 test.PerformanceTest
),它使用這三個推薦的解決方案,通過使用一個空過濾器來讀取一塊啞元數(shù)據(jù)。在運(yùn)行 Java 2 SDK,版本 1.4.0 的 800 MHz Linux 機(jī)器上,達(dá)到了下列性能:
管道式流解決方案
15KB:23ms;15MB:22100ms
蠻力解決方案
15KB:0.35ms;15MB:745ms
工程解決方案
15KB:0.16ms;15MB:73ms
該問題的工程解決方案很明顯比基于標(biāo)準(zhǔn) Java API 的另兩個方法都更有效。
順便提一下,考慮到如果輸出引擎能夠遵守這樣的約定:在將數(shù)據(jù)寫入其輸出流后,它不修改從中寫入數(shù)據(jù)的數(shù)組而返回,那么我能提供一個只使用一次緩沖區(qū)復(fù)制操作的解決方案??墒?,輸出引擎很少會遵守這種約定。如果需要,輸出引擎只要通過實(shí)現(xiàn)適當(dāng)?shù)臉?biāo)記程序接口,就能宣稱它支持這種方式的操作。
應(yīng)用工程解決方案:讀取編碼的字符數(shù)據(jù)
任何可以用“提供對將數(shù)據(jù)反復(fù)寫入 OutputStream
的實(shí)體的讀訪問權(quán)”表述的問題,都可以用這一框架解決。在這一節(jié)和下一節(jié)中,我們將研究這樣的問題示例及其有效的解決方案。
首先,考慮要讀取 UTF-8 編碼格式的字符流的情況: InputStreamReader
類讓您將以二進(jìn)制編碼的字符數(shù)據(jù)作為一系列 Unicode 字符讀?。凰硎玖藦淖止?jié)輸入流到字符輸入流的關(guān)口。 OutputStreamWriter
類讓您將一系列二進(jìn)制編碼格式的 Unicode 字符寫入輸出流;它表示從字符輸出流到字節(jié)輸入流的關(guān)口。 String
類的 getBytes()
方法將字符串轉(zhuǎn)換成經(jīng)編碼的字節(jié)數(shù)組。然而,這些類中沒有一個能直接讓您讀取 UTF-8 編碼格式的字符流。
清單 20 到 24 中的代碼演示了以與 IOStreamEngine
類極其相似的方式使用 OutputEngine
框架的一種解決方案。我們并不是從輸入流讀取和通過輸出流過濾器進(jìn)行寫操作,而是從字符流讀取,并通過所選的字符進(jìn)行編碼的 OutputStreamWriter
進(jìn)行寫操作。
清單 20. 讀取編碼的字符數(shù)據(jù)
package org.merlin.io;
import java.io.*;
/**
* An output engine that copies data from a Reader through
* a OutputStreamWriter to the target OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class ReaderWriterEngine implements OutputEngine {
private static final int DEFAULT_BUFFER_SIZE = 8192;
private Reader reader;
private String encoding;
private char[] buffer;
private Writer writer;
|
該類的構(gòu)造器接受要從中讀取的字符流、要使用的編碼以及可選的緩沖區(qū)大小。
清單 21. 構(gòu)造器
public ReaderWriterEngine (Reader in, String encoding) {
this (in, encoding, DEFAULT_BUFFER_SIZE);
}
public ReaderWriterEngine
(Reader reader, String encoding, int bufferSize) {
this.reader = reader;
this.encoding = encoding;
buffer = new char[bufferSize];
}
|
當(dāng)該引擎初始化時,它將以所選編碼格式寫字符的 OutputStreamWriter
連接至提供的輸出流。
清單 22. 初始化輸出流寫程序
public void initialize (OutputStream out) throws IOException {
if (writer != null) {
throw new IOException ("Already initialised");
} else {
writer = new OutputStreamWriter (out, encoding);
}
}
|
當(dāng)執(zhí)行該引擎時,它從輸入字符流中讀取數(shù)據(jù),然后將它們寫入 OutputStreamWriter
,接著 OutputStreamWriter 將它們以所選的編碼格式傳遞給相連的輸出流。至此,該框架使數(shù)據(jù)可供讀取。
清單 23. 讀取數(shù)據(jù)
public void execute () throws IOException {
if (writer == null) {
throw new IOException ("Not yet initialised");
} else {
int amount = reader.read (buffer);
if (amount < 0) {
writer.close ();
} else {
writer.write (buffer, 0, amount);
}
}
}
|
當(dāng)引擎執(zhí)行完時,它關(guān)閉其輸入。
清單 24. 關(guān)閉輸入
public void finish () throws IOException {
reader.close ();
}
}
|
在這種與壓縮不同的情況中,Java I/O 包不提供對 OutputStreamWriter
之下的字符編碼類的低級別訪問。因此,這是在 Java 平臺 1.4 之前的發(fā)行版上讀取編碼格式的字符流的唯一有效解決方案。從版本 1.4 開始, java.nio.charset
包確實(shí)提供了與流無關(guān)的字符編碼和譯碼能力。然而,這個包不能滿足我們對基于輸入流的解決方案的要求。
應(yīng)用工程解決方案:讀取序列化的 DOM 文檔
最后,讓我們研究該框架的最后一種用法。清單 25 到 29 中的代碼提供了一個用來讀取序列化格式的 DOM 文檔或文檔子集的解決方案。這一代碼的潛在用途可能是對部分 DOM 文檔執(zhí)行確認(rèn)性重新解析。
清單 25. 讀取序列化的 DOM 文檔
package org.merlin.io;
import java.io.*;
import java.util.*;
import org.w3c.dom.*;
import org.w3c.dom.traversal.*;
/**
* An output engine that serializes a DOM tree using a specified
* character encoding to the target OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class DOMSerializerEngine implements OutputEngine {
private NodeIterator iterator;
private String encoding;
private OutputStreamWriter writer;
|
構(gòu)造器獲取要在上面進(jìn)行循環(huán)的 DOM 節(jié)點(diǎn),或預(yù)先構(gòu)造的節(jié)點(diǎn)迭代器(這是 DOM 2 的一部分),以及一個用于序列化格式的編碼。
清單 26. 構(gòu)造器
public DOMSerializerEngine (Node root) {
this (root, "UTF-8");
}
public DOMSerializerEngine (Node root, String encoding) {
this (getIterator (root), encoding);
}
private static NodeIterator getIterator (Node node) {
DocumentTraversal dt= (DocumentTraversal)
(node.getNodeType () ==
Node.DOCUMENT_NODE) ? node : node.getOwnerDocument ();
return dt.createNodeIterator (node, NodeFilter.SHOW_ALL, null, false);
}
public DOMSerializerEngine (NodeIterator iterator, String encoding) {
this.iterator = iterator;
this.encoding = encoding;
}
|
初始化期間,該引擎將適當(dāng)?shù)?OutputStreamWriter
連接至目標(biāo)輸出流。
清單 27. initialize() 方法
public void initialize (OutputStream out) throws IOException {
if (writer != null) {
throw new IOException ("Already initialised");
} else {
writer = new OutputStreamWriter (out, encoding);
}
}
|
在執(zhí)行階段,該引擎從節(jié)點(diǎn)迭代器中獲得下一個節(jié)點(diǎn),然后將其序列化至 OutputStreamWriter
。當(dāng)獲取了所有節(jié)點(diǎn)后,引擎關(guān)閉它的流。
清單 28. execute() 方法
public void execute () throws IOException {
if (writer == null) {
throw new IOException ("Not yet initialised");
} else {
Node node = iterator.nextNode ();
closeElements (node);
if (node == null) {
writer.close ();
} else {
writeNode (node);
writer.flush ();
}
}
}
|
當(dāng)該引擎關(guān)閉時,沒有要釋放的資源。
清單 29. 關(guān)閉
public void finish () throws IOException {
}
// private void closeElements (Node node) throws IOException ...
// private void writeNode (Node node) throws IOException ...
}
|
序列化每個節(jié)點(diǎn)的其它內(nèi)部細(xì)節(jié)不太有趣;這一過程主要涉及根據(jù)節(jié)點(diǎn)的類型和 XML 1.0 規(guī)范寫出節(jié)點(diǎn),所以我將在本文中省略這一部分的代碼。請參閱附帶的 源代碼,獲取完整的詳細(xì)信息。
結(jié)束語
我所提供的是一個有用的框架,它利用標(biāo)準(zhǔn)輸入流 API 讓您能有效讀取由只能寫入輸出流的系統(tǒng)產(chǎn)生的數(shù)據(jù)。它讓我們讀取經(jīng)壓縮或編碼的數(shù)據(jù)及序列化文檔等。雖然可以使用標(biāo)準(zhǔn) Java API 實(shí)現(xiàn)這一功能,但使用這些類的效率根本不行。應(yīng)該充分注意到,這種解決方案比最簡單的蠻力解決方案更有效(即使在數(shù)據(jù)不大的情況下)。將數(shù)據(jù)寫入 ByteArrayOutputStream
以便進(jìn)行后續(xù)處理的任何應(yīng)用程序都可能從這一框架中受益。
字節(jié)數(shù)組流的拙劣性能和管道式流難以置信的蹩腳性能,實(shí)際上都是我下一篇文章的主題。在那篇文章中,我將研究重新實(shí)現(xiàn)這些類,并比這些類的原創(chuàng)者更加關(guān)注它們的性能。只要 API 約定稍微寬松一點(diǎn),性能就可能改進(jìn)一百倍了。
我討厭洗碗。不過,正如大多數(shù)我自認(rèn)為是較好(雖然常常還是微不足道)的想法一樣,這些類背后的想法都是在我洗碗時冒出來的。我時常發(fā)現(xiàn)撇開實(shí)際代碼,回頭看看并且把問題的范圍考慮得更廣些,可能會得出一個更好的解決方案,它最終為您提供的方法可能比您找出的容易方法更好。這些解決方案常常會產(chǎn)生更清晰、更有效而且更可維護(hù)的代碼。
我真的擔(dān)心我們有了洗碗機(jī)的那一天。
參考資料
關(guān)于作者