var responseStream = requestStream
.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});

很贊,因?yàn)槲覀兊捻憫?yīng)事件流是根據(jù)請(qǐng)求事件流定義的,如果我們以后有更多事件發(fā)生在請(qǐng)求事件流的話,我們也將會(huì)在相應(yīng)的響應(yīng)事件流收到響應(yīng)事件,就如所期待的那樣:
requestStream: --a-----b--c------------|->
responseStream: -----A--------B-----C---|->
(小寫的是請(qǐng)求事件流, 大寫的是響應(yīng)事件流)
現(xiàn)在,我們終于有響應(yīng)的事件流了,并且可以用我們收到的數(shù)據(jù)來(lái)渲染了:
responseStream.subscribe(function(response) {
// render `response` to the DOM however you wish
});
讓我們把所有代碼合起來(lái),看一下:
var requestStream = Rx.Observable.just('https://api.github.com/users');
var responseStream = requestStream
.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
responseStream.subscribe(function(response) {
// render `response` to the DOM however you wish
});
刷新按鈕
我還沒(méi)提到本次響應(yīng)的JSON數(shù)據(jù)是含有100個(gè)用戶數(shù)據(jù)的list,這個(gè)API只允許指定頁(yè)面偏移量(page offset),而不能指定每頁(yè)大小(page size),我們只用到了3個(gè)用戶數(shù)據(jù)而浪費(fèi)了其他97個(gè),現(xiàn)在可以先忽略這個(gè)問(wèn)題,稍后我們將學(xué)習(xí)如何緩存響應(yīng)的數(shù)據(jù)。
每當(dāng)刷新按鈕被點(diǎn)擊,請(qǐng)求事件流就會(huì)發(fā)出一個(gè)新的URL值,這樣我們就可以獲取新的響應(yīng)數(shù)據(jù)。這里我們需要兩個(gè)東西:點(diǎn)擊刷新按鈕的事件流(準(zhǔn)則:一切都能作為事件流),我們需要將點(diǎn)擊刷新按鈕的事件流作為請(qǐng)求事件流的依賴(即點(diǎn)擊刷新事件流會(huì)引起請(qǐng)求事件流)。幸運(yùn)的是,RxJS已經(jīng)有了可以從事件監(jiān)聽者轉(zhuǎn)換成被觀察者的方法了。
var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
因?yàn)樗⑿掳粹o點(diǎn)擊事件不會(huì)攜帶將要請(qǐng)求的API的URL,我們需要將每次的點(diǎn)擊映射到一個(gè)實(shí)際的URL上,現(xiàn)在我們將請(qǐng)求事件流轉(zhuǎn)換成了一個(gè)點(diǎn)擊事件流,并將每次的點(diǎn)擊映射成一個(gè)隨機(jī)的頁(yè)面偏移量(offset)參數(shù)來(lái)組成API的URL。
var requestStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
因?yàn)槲冶容^笨而且也沒(méi)有使用自動(dòng)化測(cè)試,所以我剛把之前做好的一個(gè)功能搞爛了。這樣,請(qǐng)求在一開始的時(shí)候就不會(huì)執(zhí)行,而只有在點(diǎn)擊事件發(fā)生時(shí)才會(huì)執(zhí)行。我們需要的是兩種情況都要執(zhí)行:剛開始打開網(wǎng)頁(yè)和點(diǎn)擊刷新按鈕都會(huì)執(zhí)行的請(qǐng)求。
我們知道如何為每一種情況做一個(gè)單獨(dú)的事件流:
var requestOnRefreshStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
但是我們是否可以將這兩個(gè)合并成一個(gè)呢?沒(méi)錯(cuò),是可以的,我們可以使用merge()
方法來(lái)實(shí)現(xiàn)。下圖可以解釋merge()
函數(shù)的用處:
stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
vvvvvvvvv merge vvvvvvvvv
---a-B---C--e--D--o----->
現(xiàn)在做起來(lái)應(yīng)該很簡(jiǎn)單:
var requestOnRefreshStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
var requestStream = Rx.Observable.merge(
requestOnRefreshStream, startupRequestStream
);
還有一個(gè)更干凈的寫法,省去了中間事件流變量:
var requestStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
})
.merge(Rx.Observable.just('https://api.github.com/users'));
甚至可以更簡(jiǎn)短,更具有可讀性:
var requestStream = refreshClickStream
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
})
.startWith('https://api.github.com/users');
startWith()
函數(shù)做的事和你預(yù)期的完全一樣。無(wú)論你的輸入事件流是怎樣的,使用startWith(x)
函數(shù)處理過(guò)后輸出的事件流一定是一個(gè)x
開頭的結(jié)果。但是我沒(méi)有總是重復(fù)代碼( DRY),我只是在重復(fù)API的URL字符串,改進(jìn)的方法是將startWith()
函數(shù)挪到refreshClickStream
那里,這樣就可以在啟動(dòng)時(shí),模擬一個(gè)刷新按鈕的點(diǎn)擊事件了。
var requestStream = refreshClickStream.startWith('startup click')
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
不錯(cuò),如果你倒回到"搞爛了的自動(dòng)測(cè)試"的地方,然后再對(duì)比這兩個(gè)地方,你會(huì)發(fā)現(xiàn)我僅僅是加了一個(gè)startWith()
函數(shù)而已。
用事件流將3個(gè)推薦的用戶數(shù)據(jù)模型化
直到現(xiàn)在,在響應(yīng)事件流(responseStream)的訂閱(subscribe()
)函數(shù)發(fā)生的渲染步驟里,我們只是稍微提及了一下推薦關(guān)注的UI?,F(xiàn)在有了刷新按鈕,我們就會(huì)出現(xiàn)一個(gè)問(wèn)題:當(dāng)你點(diǎn)擊了刷新按鈕,當(dāng)前的三個(gè)推薦關(guān)注用戶沒(méi)有被清楚,而只要響應(yīng)的數(shù)據(jù)達(dá)到后我們就拿到了新的推薦關(guān)注的用戶數(shù)據(jù),為了讓UI看起來(lái)更漂亮,我們需要在點(diǎn)擊刷新按鈕的事件發(fā)生的時(shí)候清楚當(dāng)前的三個(gè)推薦關(guān)注的用戶。
refreshClickStream.subscribe(function() {
// clear the 3 suggestion DOM elements
});
不,老兄,還沒(méi)那么快。我們又出現(xiàn)了新的問(wèn)題,因?yàn)槲覀儸F(xiàn)在有兩個(gè)訂閱者在影響著推薦關(guān)注的UI DOM元素(另一個(gè)是responseStream.subscribe()
),這看起來(lái)并不符合關(guān)注分離(Separation of concerns)原則,還記得響應(yīng)式編程的原則么?

現(xiàn)在,讓我們把推薦關(guān)注的用戶數(shù)據(jù)模型化成事件流形式,每個(gè)被發(fā)出的值是一個(gè)包含了推薦關(guān)注用戶數(shù)據(jù)的JSON對(duì)象。我們將把這三個(gè)用戶數(shù)據(jù)分開處理,下面是推薦關(guān)注的1號(hào)用戶數(shù)據(jù)的事件流:
var suggestion1Stream = responseStream
.map(function(listUsers) {
// get one random user from the list
return listUsers[Math.floor(Math.random()*listUsers.length)];
});
其他的,如推薦關(guān)注的2號(hào)用戶數(shù)據(jù)的事件流suggestion2Stream
和推薦關(guān)注的3號(hào)用戶數(shù)據(jù)的事件流suggestion3Stream
都可以方便的從suggestion1Stream
復(fù)制粘貼就好。這里并不是重復(fù)代碼,只是為讓我們的示例更加簡(jiǎn)單,而且我認(rèn)為這是一個(gè)思考如何避免重復(fù)代碼的好案例。
Instead of having the rendering happen in responseStream's subscribe(), we do that here:
suggestion1Stream.subscribe(function(suggestion) {
// render the 1st suggestion to the DOM
});
我們不在responseStream的subscribe()中處理渲染了,我們這樣處理:
suggestion1Stream.subscribe(function(suggestion) {
// render the 1st suggestion to the DOM
});
回到"當(dāng)刷新時(shí),清楚掉當(dāng)前的推薦關(guān)注的用戶",我們可以很簡(jiǎn)單的把刷新點(diǎn)擊映射為沒(méi)有推薦數(shù)據(jù)(null
suggestion data),并且在suggestion1Stream
中包含進(jìn)來(lái),如下:
var suggestion1Stream = responseStream
.map(function(listUsers) {
// get one random user from the list
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
refreshClickStream.map(function(){ return null; })
);
當(dāng)渲染時(shí),我們將 null
解釋為"沒(méi)有數(shù)據(jù)",然后把UI元素隱藏起來(lái)。
suggestion1Stream.subscribe(function(suggestion) {
if (suggestion === null) {
// hide the first suggestion DOM element
}
else {
// show the first suggestion DOM element
// and render the data
}
});
現(xiàn)在我們大概的示意圖如下:
refreshClickStream: ----------o--------o---->
requestStream: -r--------r--------r---->
responseStream: ----R---------R------R-->
suggestion1Stream: ----s-----N---s----N-s-->
suggestion2Stream: ----q-----N---q----N-q-->
suggestion3Stream: ----t-----N---t----N-t-->
N
代表null
作為一種補(bǔ)充,我們可以在一開始的時(shí)候就渲染空的推薦內(nèi)容。這通過(guò)把startWith(null)添加到推薦關(guān)注的事件流就可以了:
var suggestion1Stream = responseStream
.map(function(listUsers) {
// get one random user from the list
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
結(jié)果是這樣的:
refreshClickStream: ----------o---------o---->
requestStream: -r--------r---------r---->
responseStream: ----R----------R------R-->
suggestion1Stream: -N--s-----N----s----N-s-->
suggestion2Stream: -N--q-----N----q----N-q-->
suggestion3Stream: -N--t-----N----t----N-t-->
推薦關(guān)注的關(guān)閉和使用已緩存的響應(yīng)數(shù)據(jù)(responses)
只剩這一個(gè)功能沒(méi)有實(shí)現(xiàn)了,每個(gè)推薦關(guān)注的用戶UI會(huì)有一個(gè)'x'按鈕來(lái)關(guān)閉自己,然后在當(dāng)前的用戶數(shù)據(jù)UI中加載另一個(gè)推薦關(guān)注的用戶。最初的想法是:點(diǎn)擊任何關(guān)閉按鈕時(shí)都需要發(fā)起一個(gè)新的請(qǐng)求:
var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
// and the same for close2Button and close3Button
var requestStream = refreshClickStream.startWith('startup click')
.merge(close1ClickStream) // we added this
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
這樣沒(méi)什么效果,這樣會(huì)關(guān)閉和重新加載全部的推薦關(guān)注用戶,而不僅僅是處理我們點(diǎn)擊的那一個(gè)。這里有幾種方式來(lái)解決這個(gè)問(wèn)題,并且讓它變得有趣,我們將重用之前的請(qǐng)求數(shù)據(jù)來(lái)解決這個(gè)問(wèn)題。這個(gè)API響應(yīng)的每頁(yè)數(shù)據(jù)大小是100個(gè)用戶數(shù)據(jù),而我們只使用了其中三個(gè),所以還有一大堆未使用的數(shù)據(jù)可以拿來(lái)用,不用去請(qǐng)求更多數(shù)據(jù)了。
ok,再來(lái),我們繼續(xù)用事件流的方式來(lái)思考。當(dāng)'close1'點(diǎn)擊事件發(fā)生時(shí),我們想要使用最近發(fā)出的響應(yīng)數(shù)據(jù),并執(zhí)行responseStream
函數(shù)來(lái)從響應(yīng)列表里隨機(jī)的抽出一個(gè)用戶數(shù)據(jù)來(lái),就像下面這樣:
requestStream: --r--------------->
responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->
在Rx中一個(gè)組合函數(shù)叫做combineLatest
,應(yīng)該是我們需要的。這個(gè)函數(shù)會(huì)把數(shù)據(jù)流A和數(shù)據(jù)流B作為輸入,并且無(wú)論哪一個(gè)數(shù)據(jù)流發(fā)出一個(gè)值了,combineLatest
函數(shù)就會(huì)將從兩個(gè)數(shù)據(jù)流最近發(fā)出的值a
和b
作為f
函數(shù)的輸入,計(jì)算后返回一個(gè)輸出值(c = f(x,y)
),下面的圖表會(huì)讓這個(gè)函數(shù)的過(guò)程看起來(lái)會(huì)更加清晰:
stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
vvvvvvvv combineLatest(f) vvvvvvv
----AB---AC--EC---ED--ID--IQ---->
f是轉(zhuǎn)換成大寫的函數(shù)
這樣,我們就可以把combineLatest()
函數(shù)用在close1ClickStream
和 responseStream
上了,只要關(guān)閉按鈕被點(diǎn)擊,我們就可以獲得最近的響應(yīng)數(shù)據(jù),并在suggestion1Stream
上產(chǎn)生出一個(gè)新值。另一方面,combineLatest()
函數(shù)也是相對(duì)的:每當(dāng)在responseStream
上發(fā)出一個(gè)新的響應(yīng),它將會(huì)結(jié)合一次新的點(diǎn)擊關(guān)閉按鈕事件
來(lái)產(chǎn)生一個(gè)新的推薦關(guān)注的用戶數(shù)據(jù),這非常有趣,因?yàn)樗梢越o我們的suggestion1Stream
簡(jiǎn)化代碼:
var suggestion1Stream = close1ClickStream
.combineLatest(responseStream,
function(click, listUsers) {
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
現(xiàn)在,我們的拼圖還缺一小塊地方。combineLatest()
函數(shù)使用了最近的兩個(gè)數(shù)據(jù)源,但是如果某一個(gè)數(shù)據(jù)源還沒(méi)有發(fā)出任何東西,combineLatest()
函數(shù)就不能在輸出流上產(chǎn)生一個(gè)數(shù)據(jù)事件。如果你看了上面的ASCII圖表(文章中第一個(gè)圖表),你會(huì)明白當(dāng)?shù)谝粋€(gè)數(shù)據(jù)流發(fā)出一個(gè)值a
時(shí)并沒(méi)有任何的輸出,只有當(dāng)?shù)诙€(gè)數(shù)據(jù)流發(fā)出一個(gè)值b
的時(shí)候才會(huì)產(chǎn)生一個(gè)輸出值。
這里有很多種方法來(lái)解決這個(gè)問(wèn)題,我們使用最簡(jiǎn)單的一種,也就是在啟動(dòng)的時(shí)候模擬'close 1'的點(diǎn)擊事件:
var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this
.combineLatest(responseStream,
function(click, listUsers) {l
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
封裝起來(lái)
我們完成了,下面是封裝好的完整示例代碼:
var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
var closeButton1 = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
// and the same logic for close2 and close3
var requestStream = refreshClickStream.startWith('startup click')
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
return 'https://api.github.com/users?since=' + randomOffset;
});
var responseStream = requestStream
.flatMap(function (requestUrl) {
return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
});
var suggestion1Stream = close1ClickStream.startWith('startup click')
.combineLatest(responseStream,
function(click, listUsers) {
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
refreshClickStream.map(function(){ return null; })
)
.startWith(null);
// and the same logic for suggestion2Stream and suggestion3Stream
suggestion1Stream.subscribe(function(suggestion) {
if (suggestion === null) {
// hide the first suggestion DOM element
}
else {
// show the first suggestion DOM element
// and render the data
}
});
你可以在這里看到可演示的示例工程
以上的代碼片段雖小但做到很多事:它適當(dāng)?shù)氖褂藐P(guān)注分離(separation of concerns)原則的實(shí)現(xiàn)了對(duì)多個(gè)事件流的管理,甚至做到了響應(yīng)數(shù)據(jù)的緩存。這種函數(shù)式的風(fēng)格使得代碼看起來(lái)更像是聲明式編程而非命令式編程:我們并不是在給一組指令去執(zhí)行,只是定義了事件流之間關(guān)系來(lái)告訴它這是什么。例如,我們用Rx來(lái)告訴計(jì)算機(jī)suggestion1Stream
是'close 1'事件結(jié)合從最新的響應(yīng)數(shù)據(jù)中拿到的一個(gè)用戶數(shù)據(jù)的數(shù)據(jù)流,除此之外,當(dāng)刷新事件發(fā)生時(shí)和程序啟動(dòng)時(shí),它就是null
。
留意一下代碼中并未出現(xiàn)例如if
, for
, while
等流程控制語(yǔ)句,或者像JavaScript那樣典型的基于回調(diào)(callback-based)的流程控制。如果可以的話(稍候會(huì)給你留一些實(shí)現(xiàn)細(xì)節(jié)來(lái)作為練習(xí)),你甚至可以在subscribe()
上使用 filter()
函數(shù)來(lái)擺脫if
和else
。在Rx里,我們有例如: map
, filter
, scan
, merge
, combineLatest
, startWith
等數(shù)據(jù)流的函數(shù),還有很多函數(shù)可以用來(lái)控制事件驅(qū)動(dòng)編程(event-driven program)的流程。這些函數(shù)的集合可以讓你使用更少的代碼實(shí)現(xiàn)更強(qiáng)大的功能。
接下來(lái)
如果你認(rèn)為Rx將會(huì)成為你首選的響應(yīng)式編程庫(kù),接下來(lái)就需要花一些時(shí)間來(lái)熟悉一大批的函數(shù)用來(lái)變形、聯(lián)合和創(chuàng)建被觀察者。如果你想在事件流的圖表當(dāng)中熟悉這些函數(shù),那就來(lái)看一下這個(gè):。請(qǐng)記住,無(wú)論何時(shí)你遇到問(wèn)題,可以畫一下這些圖,思考一下,看一看這一大串函數(shù),然后繼續(xù)思考。以我個(gè)人經(jīng)驗(yàn),這樣效果很有效。
一旦你開始使用了Rx編程,請(qǐng)記住,理解Cold vs Hot Observables的概念是非常必要的,如果你忽視了這一點(diǎn),它就會(huì)反彈回來(lái)并殘忍的反咬你一口。我這里已經(jīng)警告你了,學(xué)習(xí)函數(shù)式編程可以提高你的技能,熟悉一些常見(jiàn)問(wèn)題,例如Rx會(huì)帶來(lái)的副作用
但是響應(yīng)式編程庫(kù)并不僅僅是Rx,還有相對(duì)容易理解的,沒(méi)有Rx那些怪癖的Bacon.js。Elm Language則以它自己的方式支持響應(yīng)式編程:它是一門會(huì)編譯成Javascript + HTML + CSS的響應(yīng)式編程語(yǔ)言,并有一個(gè)time travelling debugger功能,很棒吧。
而Rx對(duì)于像前端和App這樣需要處理大量的編程效果是非常棒的。但是它不只是可以用在客戶端,還可以用在后端或者接近數(shù)據(jù)庫(kù)的地方。事實(shí)上,RxJava就是Netflix服務(wù)端API用來(lái)處理并行的組件。Rx并不是局限于某種應(yīng)用程序或者編程語(yǔ)言的框架,它真的是你編寫任何事件驅(qū)動(dòng)程序,可以遵循的一個(gè)非常棒的編程范式。