1.什么是消息隊(duì)列
消息隊(duì)列允許應(yīng)用間通過(guò)消息的發(fā)送與接收的方式進(jìn)行通信,當(dāng)消息接收方服務(wù)忙或不可用時(shí),其提供了一個(gè)消息暫存的功能。 消息隊(duì)列中的幾個(gè)基本概念: Producer: 消息發(fā)送方,也叫做消息生產(chǎn)者 Consumer: 消息接收方,也叫做消息消費(fèi)者 Broker: 消息投遞的代理 2.應(yīng)用場(chǎng)景低耦合、可靠投遞、廣播、流量控制、最終一致性。 1)解耦(為面向服務(wù)的架構(gòu)(SOA)提供基本的最終一致性實(shí)現(xiàn))基于消息的模型,關(guān)心的是“通知”,而非“處理”。 短信、郵件通知、緩存刷新等操作使用消息隊(duì)列進(jìn)行通知。 系統(tǒng)性能評(píng)價(jià)標(biāo)準(zhǔn): 非消息系統(tǒng)隊(duì)列系統(tǒng):系統(tǒng)中最慢的組件運(yùn)行時(shí)間(短板效應(yīng)) 消息隊(duì)列系統(tǒng):異步通知減少短板效應(yīng)的影響 消息隊(duì)列和RPC的區(qū)別與比較: RPC: 異步調(diào)用,及時(shí)獲得調(diào)用結(jié)果,具有強(qiáng)一致性結(jié)果,關(guān)心業(yè)務(wù)調(diào)用處理結(jié)果。 消息隊(duì)列:兩次異步RPC調(diào)用,將調(diào)用內(nèi)容在隊(duì)列中進(jìn)行轉(zhuǎn)儲(chǔ),并選擇合適的時(shí)機(jī)進(jìn)行投遞(錯(cuò)峰流控) 2)廣播(發(fā)布/訂閱模型,基于消息格式進(jìn)行編程)如果沒(méi)有消息隊(duì)列,每當(dāng)一個(gè)新的業(yè)務(wù)方接入,我們都要聯(lián)調(diào)一次新接口。有了消息隊(duì)列,我們只需要關(guān)心消息是否送達(dá)了隊(duì)列,至于誰(shuí)希望訂閱,是下游的事情,無(wú)疑極大地減少了開發(fā)和聯(lián)調(diào)的工作量。 3) 錯(cuò)峰及流控不同服務(wù)間的處理能力不同。比如: WEB前端上千萬(wàn)次/s的負(fù)載,數(shù)據(jù)庫(kù)tps跟不上。 沒(méi)有消息隊(duì)列時(shí),通過(guò)滑動(dòng)窗口、擁塞控制等一系列措施,可以做到服務(wù)調(diào)用間的流量控制,但是不具有通用性,且維護(hù)復(fù)雜。 通過(guò)控制消息隊(duì)列的分發(fā)速度可以實(shí)現(xiàn)通用的流量控制。利用中間系統(tǒng)轉(zhuǎn)儲(chǔ)兩個(gè)系統(tǒng)的通信內(nèi)容,并在下游系統(tǒng)有能力處理這些消息的時(shí)候,再處理這些消息,是一套相對(duì)較通用的方式。 總結(jié): 1. 消息隊(duì)列適用于處理非強(qiáng)一致性事務(wù)要求,非延遲敏感的業(yè)務(wù)場(chǎng)景。RPC可滿足強(qiáng)一致性和延遲敏感要求; 2.可以用支持最終一致性的消息隊(duì)列來(lái)實(shí)現(xiàn)輕量級(jí)、非延遲敏感的分布式事務(wù); 3.上下游業(yè)務(wù)系統(tǒng)處理能力存在較大差距,且不包含在業(yè)務(wù)主流程的功能,可以交給消息隊(duì)列; 4.有多個(gè)下游業(yè)務(wù)關(guān)心系統(tǒng)發(fā)出的通知消息時(shí),基于標(biāo)準(zhǔn)的消息內(nèi)容格式,使用消息隊(duì)列可減少重復(fù)開發(fā)和聯(lián)調(diào)。 3. 消息隊(duì)列的基本特性數(shù)據(jù)流轉(zhuǎn)過(guò)程:producer發(fā)送給broker, broker發(fā)送給consumer, consumer回復(fù)消費(fèi)確認(rèn),broker刪除消息 1)消息丟失(可靠性)會(huì)導(dǎo)致消息重復(fù)及延遲。 當(dāng)有可能發(fā)生消息丟失風(fēng)險(xiǎn)的操作前,先將消息數(shù)據(jù)落地,然后發(fā)送。失敗或超時(shí)時(shí),不斷輪詢所有待發(fā)送消息重新發(fā)送,保證最終一定送達(dá)。消息消費(fèi)者收到消息后給服務(wù)端一個(gè)確認(rèn),當(dāng)所有訂閱者都確認(rèn)收到消息后,刪除消息。 消息重復(fù)和丟失必須得面對(duì)一個(gè),只能在兩者間做平衡。 2)順序投遞1.允許消息丟失。 2.從發(fā)送方到服務(wù)方到接受者都是單點(diǎn)單線程。 3)重復(fù)投遞如何鑒別重復(fù)消息? 重復(fù)消息如何冪等處理? i) 版本號(hào) 在一個(gè)回話周期內(nèi)維護(hù)一個(gè)單調(diào)遞增的消息版本號(hào),檢測(cè)到小于最近接收到的版本號(hào)時(shí)判別為 重復(fù)投遞的消息。 發(fā)送方必須攜帶消息版本號(hào); 對(duì)于嚴(yán)格要求消息順序的業(yè)務(wù),接收方必須存儲(chǔ)消息版本號(hào)。 ii)狀態(tài)機(jī) 主流消息隊(duì)列的設(shè)計(jì)范式里,在不丟消息的前提下,盡量減少重復(fù)消息,不保證消息的投遞順序。 4)push or pullpush 模式, 即 broker 主動(dòng)推送消息給消費(fèi)者 pull 模式, 即消費(fèi)者主動(dòng)從 broker 中拉取消息 4. rabbitmq(以下內(nèi)容均基于AMQP協(xié)議)1)基本特性* 可靠性 消息持久化、消息發(fā)送和投遞確認(rèn)機(jī)制、集群高可用方案 * 靈活路由 消息通過(guò)exchange的方式路由到不同的queue中,提供了包括fanout, direct, topic等多種exchange實(shí)現(xiàn),并且支持通過(guò)編寫exchange插件的方式自實(shí)現(xiàn)路由方案 * 支持集群 同網(wǎng)段下的rabbitmq節(jié)點(diǎn)可以通過(guò)集群的方式,組成一個(gè)邏輯上的單一broker * Federation 通過(guò)Federation可以在跨網(wǎng)段節(jié)點(diǎn)間組件集群 * 高可用消息隊(duì)列 通過(guò)設(shè)置鏡像隊(duì)列的方式,消息可以在鏡像隊(duì)列間進(jìn)行復(fù)制,使節(jié)點(diǎn)down機(jī)或硬件損壞的情況下保證隊(duì)列服務(wù)的高可用 * 多協(xié)議支持 包括AMQP, STOMP, MQTT, HTTP等多種消息交換協(xié)議 * 多客戶端支持 JAVA, .NET, Ruby, Python, PHP, Node, Go...... * 可視化管理界面 * 豐富的插件支持 tracing, managment-plugin, and you can also write your own. 2)基礎(chǔ)模型基礎(chǔ)組件:Publisher, Exchange, Queue, Consumer 基礎(chǔ)動(dòng)作:Publish, Binding, Route, Consume i) Queue的基本屬性: Name Durable (the queue will survive a broker restart) Exclusive (used by only one connection and the queue will be deleted when that connection closes) Auto-delete (queue is deleted when last consumer unsubscribes) Arguments (some brokers use it to implement additional features like message TTL) ii) Exchange路由策略 direct(default): 路由到完全匹配的routing_key對(duì)應(yīng)綁定的queue中 Fanout: 將消息路由轉(zhuǎn)發(fā)到所有綁定到該exchange上的queue Topic: 通過(guò)比對(duì)routing_key是否匹配來(lái)路由消息。 *(star) can substitute for exactly one word. #(hash) can substitute for zero or more words. Header: 忽略routingKey,通過(guò)匹配header中的鍵值對(duì)方式來(lái)路由,有all(全部匹配)和any(部分匹配)兩種模式。 iii) Binding 綁定動(dòng)作是將發(fā)送到exchange上的消息路由到特定queue的規(guī)則的綁定。 3)消息投遞的可靠性保證 Consumet Acknowledgements & Publisher Confirms i) (Consumer) Delivery Acknowledgements 當(dāng)broker投遞一條消息給consumer后,broker需要獲取客戶端是否正確處理了該條消息的狀態(tài),當(dāng)所有消息訂閱者都成功確認(rèn)了該條消息時(shí),證明該條消息投遞成功,broker會(huì)從隊(duì)列里面刪除該條消息。 Delivery Identifiers: Delivery Tags 當(dāng)一個(gè)consumer向服務(wù)器注冊(cè)了一個(gè)連接(建立起了一個(gè)channel),broker向consumer投遞消息時(shí)會(huì)攜帶一個(gè)作用域?yàn)閏hannel,單調(diào)遞增,long類型的delivery tag,consumer通過(guò)收到的delivery tag作為標(biāo)識(shí),向broker發(fā)起消息確認(rèn)流程。 (注:delivery tag是一個(gè)64位的long類型數(shù),其最大值為:9223372036854775807,channel作用域內(nèi)超過(guò)其最大值的可能性很?。?/p> Acknowledgements Models basic.ack消息被成功處理 basic.nack客戶端無(wú)法處理該消息,支持一次拒絕多條消息(multiple表示reject basic.reject客戶端無(wú)法處理該消息 Channel Prefetch Setting (QoS) Consumer可以同時(shí)接收多少條消息。 ex: tag=5,6,7,8都處于未確認(rèn)狀態(tài),QoS=4,那么broker將不會(huì)再向該Consumer投遞任何消息,直到有一條消息被確認(rèn)后。 ii) Publisher Confirms 保證消息被準(zhǔn)確送達(dá)到broker的兩種方式: transaction & Publisher Acknowledgements transaction 當(dāng)channel處于事務(wù)模式下時(shí),publisher投遞消息至broker,broker的消息落地在同一個(gè)事務(wù)中,保證了消息從publisher到broker的確定性。 事務(wù)模式相較于普通異步模式,性能上有250倍的下降。 Publisher Acknowledgements 通過(guò)delivery tag來(lái)作為消息的唯一標(biāo)識(shí),來(lái)對(duì)publisher的消息做異步確認(rèn),對(duì)于basic.reject & nack的情況需要業(yè)務(wù)方自己對(duì)消息做暫存重試。 官方示例:http://hg./rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java 消息在什么時(shí)候被確認(rèn) 消息沒(méi)有找到對(duì)應(yīng)的路由策略,broker會(huì)返回basic.nack表示broker不能正確處理該條消息; 對(duì)于持久化隊(duì)列的消息,當(dāng)消息被成功寫入磁盤后,會(huì)返回basic.ack的確認(rèn)消息,表示消息broker正常收到消息且數(shù)據(jù)已落地; 對(duì)于鏡像隊(duì)列,當(dāng)所有鏡像結(jié)點(diǎn)的對(duì)應(yīng)queue已經(jīng)成功接收到數(shù)據(jù)時(shí),會(huì)返回broker.ack表示數(shù)據(jù)已成功同步; rabbitmq的異步publisher confirm策略會(huì)將一段時(shí)間間隔內(nèi)(a few hundred milliseconds)的所有message寫入到磁盤,所有異步confirm大概會(huì)有幾百毫秒的延遲,但是這樣卻帶來(lái)了吞吐量的極大提高。 4)消息投遞的順序當(dāng)消息從單publisher產(chǎn)生,通過(guò)單個(gè)exchange路由到單個(gè)queue,并被單個(gè)consumer消費(fèi)時(shí)保證消息是嚴(yán)格按照順序消費(fèi)的。 當(dāng)消息被多個(gè)consumer訂閱時(shí),c1消費(fèi)成功,c2消費(fèi)失敗且進(jìn)行了requeue操作,此時(shí)就打亂了原來(lái)的順序。 5)TTL(time to live)消息存活時(shí)間,當(dāng)一個(gè)消息從入隊(duì)開始計(jì)時(shí),過(guò)了設(shè)定的TTL時(shí)間后,就認(rèn)為該消息為dead message. 6)DLX(Dead Letter Exchange)死信隊(duì)列。當(dāng)發(fā)生如下情況時(shí),消息會(huì)進(jìn)入DLX: The message is rejected (basic.rejectorbasic.nack) withrequeue=false, The TTL for the message expires; or The queue length limit is exceeded. 總結(jié)rabbitmq保證如下特性: 消息不丟失 消息至少被成功消費(fèi)一次 不保證消息不重復(fù),需業(yè)務(wù)方自行進(jìn)行去重邏輯(版本號(hào)&狀態(tài)機(jī)&msgid) 不保證完全按照順序消費(fèi) 5)高可用方案cluster + ha policycluster機(jī)制多個(gè)全聯(lián)通節(jié)點(diǎn)之間元信息(exchange、queue、binding等)保持強(qiáng)一致,但是隊(duì)列消息只會(huì)存儲(chǔ)在其中一個(gè)節(jié)點(diǎn)。 優(yōu)點(diǎn):提高吞吐量,部分解決擴(kuò)展性問(wèn)題。 缺點(diǎn):不能提升數(shù)據(jù)可靠性和系統(tǒng)可用性。 ha policy機(jī)制在cluster機(jī)制基礎(chǔ)上可以指定集群內(nèi)任意數(shù)量隊(duì)列組成鏡像隊(duì)列,隊(duì)列消息會(huì)在多節(jié)點(diǎn)間復(fù)制。實(shí)現(xiàn)數(shù)據(jù)高可靠和系統(tǒng)高可用。 設(shè)置參數(shù):ha-mode和ha-params可以細(xì)粒度(哪些節(jié)點(diǎn),哪些隊(duì)列)設(shè)置鏡像隊(duì)列。 設(shè)置參數(shù):ha-sync-mode=manual(默認(rèn))/automatic可以指定集群中新節(jié)點(diǎn)的數(shù)據(jù)同步策略。 home node:rabbitmq中每一個(gè)queue都有一個(gè)home node,稱之為queue master,所有對(duì)queue的操作都是首先通過(guò)queue master向其他節(jié)點(diǎn)進(jìn)行復(fù)制的,該機(jī)制保證了隊(duì)列的FIFO特性。 我們可以通過(guò)設(shè)置queue的x-queue-master-locator設(shè)置queue master策略 Pick the node hosting the minimum number of masters:min-masters Pick the node the client that declares the queue is connected to:client-local Pick a random node:random 消息節(jié)點(diǎn)故障的幾種情況slave節(jié)點(diǎn)故障 slave節(jié)點(diǎn)故障時(shí),集群會(huì)自動(dòng)關(guān)將其剔除,不再將master的信息同步至該節(jié)點(diǎn)。 new node joining the cluster. 節(jié)點(diǎn)可在任意時(shí)間加入集群,初始化加入集群的slave隊(duì)列信息為空,但是可以接收從master新同步過(guò)來(lái)的消息。master隊(duì)列頭部消息不斷被消費(fèi),尾部不斷新增消息,當(dāng)某一時(shí)刻,在slave加入之前的歷史消息都被消費(fèi)完畢時(shí),master隊(duì)列的size和slave的size相等,認(rèn)為slave節(jié)點(diǎn)從master的數(shù)據(jù)同步完成。 新加入的slave不能為加入之前的數(shù)據(jù)增加額外的冗余和可用性。因?yàn)閳?zhí)行明確的同步操作會(huì)使queue無(wú)響應(yīng),因此只允許非活躍的queues進(jìn)行明確的同步操作而活躍的queues進(jìn)行自然的同步操作是一種好的策略。 也可以在新的slave加入集群后,手動(dòng)或自動(dòng)觸發(fā)同步歷史數(shù)據(jù),但是在同步過(guò)程中,隊(duì)列queue將處于不可用狀態(tài)(類似于MySQL在更改表結(jié)構(gòu)的時(shí)候的鎖表)。 rabbitmqctl sync_queuename 手動(dòng)觸發(fā)歷史消息同步 rabbitmqctl cancel_sync_queuename 取消消息同步操作 rabbitmqctl list_queues name slave_pids synchronised_slave_pids 查看正在同步的隊(duì)列 通過(guò)設(shè)置隊(duì)列的ha-sync-mode屬性automatic-新slave加入自動(dòng)同步,manual-通過(guò)增量的方式同步 Stopping nodes and synchronisation master節(jié)點(diǎn)故障,最先加入集群的slave將被提升為master,假設(shè)此時(shí)集群中存在一個(gè)已經(jīng)完全同步的節(jié)點(diǎn),即最先加入集群的slave。 集群中的節(jié)點(diǎn)繼續(xù)故障,到最終只剩下唯一的一個(gè)節(jié)點(diǎn),該節(jié)點(diǎn)為master,對(duì)于該節(jié)點(diǎn)上持久化隊(duì)列,會(huì)在其故障或者重啟前不斷持久化消息,當(dāng)故障節(jié)點(diǎn)恢復(fù)后,重新加入節(jié)點(diǎn),因?yàn)槠錈o(wú)法得知其故障之前持久化的鏡像隊(duì)列的歷史消息是否被正確投遞,rabbitmq采取的策略是清空其持久化信息,作為一個(gè)空的新slave加入集群。 Stopping master nodes with only unsynchronised slaves 當(dāng)master節(jié)點(diǎn)故障時(shí),cluster中只有未同步完成的節(jié)點(diǎn)時(shí),有以下兩種情況: 當(dāng)在可控條件下故障:rabbitmqctl stop, 優(yōu)雅地關(guān)閉OS時(shí),故障節(jié)點(diǎn)將不會(huì)轉(zhuǎn)移到鏡像隊(duì)列上,此時(shí)鏡像隊(duì)列處于不可用狀態(tài)。(defalut setting) 當(dāng)不可控條件下故障,機(jī)器斷電損壞等,master將會(huì)轉(zhuǎn)移到未同步完成的節(jié)點(diǎn)上。 Loss of a master while all slaves are stopped 通常情況下,我們希望最后的節(jié)點(diǎn)在重啟后繼續(xù)接管master權(quán)限,因?yàn)樵撉闆r下,最后的master節(jié)點(diǎn)保存了最全的信息,而這些信息其他之前故障的slave節(jié)點(diǎn)并不能接收到。 然而,我們?cè)赟topping nodes and synchronisation一節(jié)中知道,新啟動(dòng)加入集群的slave會(huì)首先清空其持久化的消息信息,作為一個(gè)新節(jié)點(diǎn)加入集群,所以在這里我們需要先執(zhí)行rabbitmqctl forget_cluster_node,此時(shí)rabbitmq會(huì)嘗試尋找當(dāng)前node上的所有queue的master節(jié)點(diǎn)機(jī)器,并在master機(jī)器再次重啟加入集群后重新提升其為master節(jié)點(diǎn)。 master節(jié)點(diǎn)故障時(shí): 一個(gè)slave被提升為新的master。被提升的slave是“最舊”的slave。因?yàn)樵搒lave與原master中內(nèi)容完全同步的幾率最大。然而,也有可能所有的salve都未與master完全同步,此時(shí)只有master中存在而slaves中不存在的message將丟失。 slave認(rèn)為所有之前的consumers的連接突然斷開了。因此,它重新將已經(jīng)投遞但還未被確認(rèn)的messages重新排隊(duì)。這些“未被確認(rèn)的”message可能包含client已發(fā)出確認(rèn)但確認(rèn)在到達(dá)master前丟失了的情形,也包含client已發(fā)出確認(rèn)且確認(rèn)已到達(dá)master但在master廣播給slaves時(shí)丟失的情形。在上述任意一種情況下,新的master都必須為他認(rèn)為未收到確認(rèn)的message重新排隊(duì)。 之前請(qǐng)求原master的clients被取消。 由于重新排隊(duì),從queue重新consume的clients需要知道此時(shí)可能接收到之前已經(jīng)被處理過(guò)的message。 x-cancel-on-ha-failover=true時(shí),consuming過(guò)程將被取消,consumer cancellation notification會(huì)被發(fā)出,由業(yè)務(wù)方確認(rèn)是否重新消費(fèi)。 當(dāng)master節(jié)點(diǎn)故障時(shí),如果publisher使用的時(shí)noAck模式,消息可能會(huì)丟失,因?yàn)閚oAck模式下,消息一旦進(jìn)入broker的處理流程,就代表消息已經(jīng)處理完成,不需要任何確認(rèn)過(guò)程。此時(shí)master發(fā)生故障,將不會(huì)requeue該消息,消息有丟失的風(fēng)險(xiǎn)。 4.消息隊(duì)列的應(yīng)用場(chǎng)景舉例可靠的延遲隊(duì)列 RPC: Our RPC will work like this: When the Client starts up, it creates an anonymous exclusive callback queue. For an RPC request, the Client sends a message with two properties:replyTo, which is set to the callback queue andcorrelationId, which is set to a unique value for every request. The request is sent to anrpc_queuequeue. The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from thereplyTofield. The client waits for data on the callback queue. When a message appears, it checks thecorrelationIdproperty. If it matches the value from the request it returns the response to the application.
|