本文整理自Apache RocketMQ&DataPipeline 在北京舉辦的Meetup 本文主要從三方面展開。首先介紹VIPKID使用MQ的歷史,即如何演化到現(xiàn)在的RocketMQ Kafka。其次,為RocketMQ在VIPKID的實(shí)踐,主要介紹如何解決在使用RocketMQ過(guò)程中出現(xiàn)的相關(guān)問(wèn)題。最后,提到VIPKID的MQ服務(wù)關(guān)于以后可能會(huì)如何變化和發(fā)展。 MQ History in VIPKID 關(guān)于VIPKID的需求: 第一性能。VIPKID是貫穿北美和中國(guó)的線上英語(yǔ)學(xué)習(xí)機(jī)構(gòu),數(shù)據(jù)、消息量龐大,對(duì)于MQ吞吐量、延遲等有極度的要求。 第二海量的topic。因?yàn)闃I(yè)務(wù)方的業(yè)務(wù)場(chǎng)景相對(duì)比較復(fù)雜,不同于常規(guī),所以對(duì)多topic的要求較高。目前在測(cè)試環(huán)節(jié),包含的topic應(yīng)該有3000 ,消費(fèi)者有超過(guò)10k 。 第三統(tǒng)一的管理平臺(tái)。由于各種topic,消費(fèi)者紛繁復(fù)雜, 需要一個(gè)像Operator那樣可以在平臺(tái)上“指指畫畫”即可管理topic,消費(fèi)者,和相關(guān)負(fù)責(zé)人的信息的平臺(tái)。 第四友好的API。對(duì)RD同學(xué)而言,不需花費(fèi)過(guò)多的精力學(xué)習(xí),只需看API便能上手接入。 另外對(duì)提供MQ服務(wù)的同學(xué)而言,方便二次開發(fā)。中間件大部分選擇用Java,如果使用其它語(yǔ)言,比如GO,Scala,對(duì)二次開發(fā)的難度會(huì)比較大,維護(hù)成本也會(huì)很高。 考慮到學(xué)習(xí)成本,以上為我們?cè)谥懊媾R的一些問(wèn)題和訴求,基于這些訴求,我們?cè)谥熬烷_始做選擇。 公司之前使用MQ服務(wù)時(shí),管理簡(jiǎn)單。這也讓我們面臨一些較為尷尬的問(wèn)題。 之后經(jīng)過(guò)層層篩選,各種把關(guān),選擇了RocketMQ+Kafka,RocketMQ主要提供給業(yè)務(wù)方使用,做一些業(yè)務(wù)數(shù)據(jù)的流轉(zhuǎn)。包括像削峰填谷,或是一些異步數(shù)據(jù)等同步,另外還需同步一些BI數(shù)據(jù)給業(yè)務(wù)方。 為什么會(huì)這么選?現(xiàn)在市面上有許多比較成熟的MQ產(chǎn)品,比如像目前使用的Kafka,第二個(gè)是RocketMQ,第三個(gè)為RabbitMQ,最后決定Kafka先保持現(xiàn)狀,把業(yè)務(wù)型的ActiveMQ替換成RocketMQ。 同時(shí),也積極和RocketMQ社區(qū)保持聯(lián)系,一來(lái)可以依靠社區(qū)力量將公司MQ服務(wù)做得更加穩(wěn)定。另外也可以向社區(qū)其他同學(xué)分享我們的實(shí)踐經(jīng)驗(yàn)。 為什么Kafka會(huì)保持,原因在于對(duì)于ELK生態(tài)而言,Kafka目前是不可或缺的,在性能上可以滿足要求。 為什么選擇RocketMQ,有以下原因:低延遲、百萬(wàn)級(jí)topic、二次開發(fā)容易、管理部署方便。 下圖為系統(tǒng)的簡(jiǎn)單結(jié)構(gòu)圖,大概描述了生產(chǎn)消費(fèi)的大概過(guò)程。還有Broker端的部署結(jié)構(gòu),最上面一層為生產(chǎn)過(guò)程,中間生產(chǎn)到了Broker的集群,下面是一個(gè)消費(fèi)者的過(guò)程,最右邊為retry的隊(duì)列,主要做容錯(cuò)。 比如發(fā)一個(gè)消息時(shí)不是一錘子買賣,當(dāng)消息發(fā)過(guò)去之后,由于網(wǎng)絡(luò)抖了一下,接下來(lái)第二第三API失敗了,此時(shí)可不可以允許我再發(fā)一次消息,像這種容錯(cuò)的情況是支持的,所以RocketMQ也是很重要的原因。 RocketMQ 最佳實(shí)踐 首先最重要的問(wèn)題,也是研發(fā)同學(xué)反饋過(guò)來(lái)的問(wèn)題中最難理解的一部分,即訂閱關(guān)系一致。 所謂的訂閱關(guān)系一致,指一個(gè)消費(fèi)者組在一個(gè)服務(wù)中,假設(shè)消費(fèi)者組消費(fèi)了3個(gè)topic,該服務(wù)部署到任何一臺(tái)機(jī)器上,消費(fèi)者組消費(fèi)到topic都必須一樣,這就是訂閱關(guān)系一致。因?yàn)闇y(cè)試環(huán)節(jié)是多環(huán)境的,在多環(huán)境情況下會(huì)遇到訂閱關(guān)系不一致的情況,這屬于使用問(wèn)題。 第二個(gè)是Docker,因?yàn)槎喹h(huán)境情況下,在測(cè)試環(huán)節(jié)走的是K8S,它會(huì)啟動(dòng)Docker容器,在Docker里會(huì)面臨一個(gè)問(wèn)題,消費(fèi)者客戶端在起寫時(shí),會(huì)將自己的訂閱信息上報(bào)給Broker,告訴它訂閱了哪些topic,以及消費(fèi)者客戶端ID是多少,以此來(lái)表明身份。當(dāng)在取客戶端ID時(shí),取了一個(gè)本地IP,但是在Docker里面取出來(lái)的本地大部分是相同的。 上述情況會(huì)出現(xiàn)類似于訂閱關(guān)系不一致的結(jié)果。會(huì)發(fā)現(xiàn)topic里面有的消息、隊(duì)列沒(méi)有消費(fèi)者去訂閱,實(shí)際是有的。因?yàn)锽roker在做rebalance時(shí),由于沒(méi)有辦法正常區(qū)分到底起了多少個(gè)消費(fèi)者時(shí)令,導(dǎo)致分配隊(duì)列時(shí)出現(xiàn)問(wèn)題。原因在于當(dāng)上報(bào)訂閱關(guān)系給Broker時(shí),數(shù)據(jù)有錯(cuò)誤,這是Broker從本地IP取出來(lái)導(dǎo)致的。 第三個(gè)是多環(huán)境問(wèn)題,假設(shè)有ABC三個(gè)服務(wù),A服務(wù)依賴于B,B依賴于C。通常假設(shè)在開發(fā)一個(gè)服務(wù)時(shí),會(huì)單獨(dú)拉一個(gè)分支進(jìn)行開發(fā),此時(shí)可能會(huì)與上一步的下一個(gè)服務(wù)做聯(lián)調(diào)。 同時(shí)在測(cè)試環(huán)節(jié),還會(huì)有一個(gè)穩(wěn)定的服務(wù)提供給其他人做聯(lián)調(diào),在沒(méi)有開發(fā)時(shí)需要給QA做集成測(cè)試。這個(gè)環(huán)境中稱為多環(huán)境,在多環(huán)境的情況就會(huì)遇到訂閱關(guān)系不一致的問(wèn)題。同時(shí),在多環(huán)境下如何做消息路由也是需要考慮的問(wèn)題。 首先解釋訂閱關(guān)系不一致,假設(shè)有兩個(gè)人開發(fā)兩個(gè)功能,其中一個(gè)是Host為1.1.1.1,消費(fèi)者組有3個(gè)topic,1、2、3,當(dāng)拿到一個(gè)新的需求后需要將topic3換成topic4,獲取消息。 在消費(fèi)寫完之后,需要發(fā)到一個(gè)環(huán)境進(jìn)行測(cè)試,假設(shè)將這兩個(gè)同時(shí)放到環(huán)境中,就會(huì)出現(xiàn)訂閱關(guān)系不一致,這時(shí)有可能會(huì)出現(xiàn)以下情況: 當(dāng)添加topic4去掉topic3之后,在測(cè)試topic4是否能收到消息時(shí),結(jié)果發(fā)現(xiàn)收不到?;蚴钱?dāng)發(fā)送一條消息之后,連續(xù)不斷地收到了100多萬(wàn)條消息。 如果打開console平臺(tái),發(fā)現(xiàn)有的隊(duì)列沒(méi)有消費(fèi)者去消費(fèi),這就是所謂的訂閱關(guān)系不一致。 比如,同樣的一個(gè)ConsumerGroup1,第一臺(tái)機(jī)器消費(fèi)了3個(gè)topic1、2、3,第二臺(tái)機(jī)器消費(fèi)了topic1、2、4,這就是訂閱關(guān)系不一致。即同一個(gè)消費(fèi)組部署的兩臺(tái)實(shí)例,消費(fèi)者的topic不一樣,稱為訂閱關(guān)系不一致。 為此,在多環(huán)境的情況下制定了一個(gè)簡(jiǎn)單規(guī)則。通常發(fā)這種分支時(shí),是在多環(huán)境的子環(huán)境,比如,當(dāng)發(fā)一個(gè)新功能時(shí),需要將開發(fā)分支放到A1環(huán)境進(jìn)行測(cè)試,這時(shí)就有一個(gè)簡(jiǎn)單的規(guī)則,即不能跨環(huán)境生產(chǎn)消費(fèi)。 假設(shè)一個(gè)開發(fā)分支是A1環(huán)鏡的,當(dāng)消費(fèi)消息時(shí),可用A1環(huán)境的consumerGroup去消費(fèi)A1環(huán)境的topic。如果要測(cè)試一下收的消息處理邏輯是否正確,就可以去控制臺(tái)模擬發(fā)送給A1的topic。 這時(shí)就會(huì)發(fā)現(xiàn),自己的開發(fā)分支跟正常的穩(wěn)定環(huán)境完全不影響。多環(huán)境時(shí)無(wú)需害怕環(huán)境,只需排查當(dāng)前處于哪個(gè)環(huán)境,哪些環(huán)境是不需要訂閱的,那就將其去掉,換成所處的環(huán)境即可。 針對(duì)訂閱關(guān)系不一致的情況,可以去客戶端中查看訂閱關(guān)系哪里不一致,哪個(gè)環(huán)境的topic或消費(fèi)者不應(yīng)該去訂閱,這時(shí)容易排查問(wèn)題。 另外關(guān)于Docker,當(dāng)消費(fèi)者集起來(lái)時(shí),該如何告訴Broker起了多少個(gè)實(shí)例,以此讓Broker來(lái)分配,哪些實(shí)例消費(fèi)哪些partition或哪些Q。在consumer集起來(lái)時(shí),可將訂閱關(guān)系還有能唯一標(biāo)識(shí)是我的客戶端ID發(fā)給Broker,讓Broker決定哪個(gè)客戶端ID消費(fèi)哪個(gè)隊(duì)列,進(jìn)而做好分配,這時(shí)就能保證正常消費(fèi)。 那如何標(biāo)識(shí)我是我?這時(shí)有一個(gè)字符串,字符串有三個(gè)值組成,第一是消費(fèi)者所在機(jī)器的IP,第二是at符號(hào),第三是instance name,即實(shí)例名。 之前的配置是一個(gè)consumer中的實(shí)例名是相同的,如果是Docker環(huán)境,取出的IP也是相同的。假設(shè)有10臺(tái)機(jī)器在Docker里集起來(lái),當(dāng)將消息匯報(bào)上去之后,Broker在拿到ID去重后發(fā)現(xiàn)只有一臺(tái)機(jī)器啟起來(lái),它便將所有的隊(duì)列只分配給這一臺(tái)機(jī)器,但這臺(tái)機(jī)器很隨機(jī),其它機(jī)器便收不到消息。此時(shí)本來(lái)有10臺(tái)機(jī)器,結(jié)果只有一臺(tái)機(jī)器收到消息,或會(huì)重復(fù)收到消息,結(jié)果就會(huì)很棘手。 為解決上述問(wèn)題,在配置時(shí),可去掉消費(fèi)者的instance name,這時(shí)它會(huì)變成一串?dāng)?shù)字,即當(dāng)前進(jìn)程啟起來(lái)的進(jìn)程號(hào)。大部分情況,容器中啟起來(lái)的進(jìn)程號(hào),我們認(rèn)為是不一樣的,但也有可能是相同的,這時(shí)可以重啟一下。但這種概率比較小,如果有兩套集群就有可能了,這樣就解決了Docker的問(wèn)題。 接下來(lái)簡(jiǎn)單解釋一下RocketMQ里面的訂閱關(guān)系,一條訂閱關(guān)系指一個(gè)消費(fèi)者組訂閱了一個(gè)topic的某一個(gè)tag,三者合起來(lái)稱為一個(gè)訂閱關(guān)系。 關(guān)于tag,指topic的一個(gè)子分類,topic是對(duì)消息邏輯的歸類,我可以為某一些消息打上特定的tag,這樣做的益處是會(huì)做一些服務(wù)端過(guò)濾,假設(shè)只訂閱某個(gè)tag消息,那就不會(huì)發(fā)送其它tag消息給你,中間沒(méi)有數(shù)據(jù)流量的,這對(duì)于網(wǎng)絡(luò)輿論而言會(huì)減少很大一部分。 另外,我們做了簡(jiǎn)單的MQ云平臺(tái),可以理解為與阿里云平臺(tái)是類似的,用戶有租戶隔離,不同的租戶之間看不到彼此的數(shù)據(jù),所有的管理維度是通過(guò)APP的維度。 簡(jiǎn)單介紹下,一個(gè)應(yīng)用有應(yīng)用的owner,應(yīng)用的member,這些人可以管理APP中的所有資源,包括APP中的生產(chǎn)者組,topic、消費(fèi)者組,通過(guò)這些可以完成以下事情: 一位研發(fā)同學(xué)如果想與另外一個(gè)團(tuán)隊(duì)進(jìn)行消息交互,作為生產(chǎn)消息的人,可以去平臺(tái)申請(qǐng)個(gè)topic,進(jìn)而發(fā)消息將topic告訴消費(fèi)者。比如,我的topic是這個(gè),你去消費(fèi)吧。在拿到之后,再去創(chuàng)建一個(gè)消費(fèi)者組,訂閱這個(gè)topic,全程無(wú)人干預(yù),目前需要的干預(yù)是我們會(huì)做一層審核。 現(xiàn)在很多人不是很熟悉,所以需要引導(dǎo)一段時(shí)間后,再將權(quán)限完全開放出來(lái),屆時(shí)審計(jì)的工作就給應(yīng)用的owner即可。 應(yīng)用的owner通常是團(tuán)隊(duì)的leader,這樣就可以在自己的小組中做到完全自助。對(duì)研發(fā)團(tuán)隊(duì)而言,可以省去許多繁雜流程。 目前整套MQ的工作過(guò)程是如果需要資源,先去VKMQ平臺(tái)申請(qǐng)資源,比如我是生產(chǎn)者,這時(shí)需要一個(gè)topic,一個(gè)生產(chǎn)者組來(lái)生產(chǎn)消息,此時(shí)可以在自己創(chuàng)建之后用平臺(tái)客戶端發(fā),無(wú)需擔(dān)心到底需要發(fā)到哪個(gè)Broker,哪個(gè)name server。 這時(shí)研發(fā)同學(xué)用起來(lái)會(huì)比較方便,不用關(guān)心需要哪些集群。如果沒(méi)有這樣的發(fā)現(xiàn)機(jī)制,可能還需要寫B(tài)roker地址或name server地址。 How about the future 提到以后可能的發(fā)展走向,之前與阿里中間件技術(shù)團(tuán)隊(duì)專家王昕聊過(guò)這個(gè)問(wèn)題,了解到會(huì)有OpenMessaging,OpenMessaging的目的是有一套公開的標(biāo)準(zhǔn),是MQ層面的標(biāo)準(zhǔn),會(huì)定義一些基本操作。就現(xiàn)在情況而言,個(gè)人覺(jué)得,這三者可以結(jié)合到一起。 目前我們有用Kafka和RocketMQ。這時(shí)該如何管理,如何統(tǒng)一,對(duì)人的要求就很高。 但具體處理問(wèn)題時(shí)會(huì)有差異,我可以大膽猜測(cè)一下,可以將兩個(gè)可能合而為一,依循OpenMessaging的標(biāo)準(zhǔn)。既然有一套比較好的標(biāo)準(zhǔn)可以遵循,如果能實(shí)現(xiàn)統(tǒng)一,我覺(jué)得對(duì)后來(lái)的人就會(huì)很方便。 |
|
來(lái)自: 劉振東 > 《RocketMQ實(shí)踐》