題記:這篇介紹發(fā)布訂閱構(gòu)建塊,這是對事件驅(qū)動架構(gòu)設(shè)計的一種實現(xiàn)落地。
注:對于“Building Blocks”這個詞組的翻譯,我之前使用了“構(gòu)件塊”,現(xiàn)在和官方文檔(Dapr中文社區(qū)的貢獻)保持一致,采用“構(gòu)建塊”。
原理
發(fā)布訂閱的概念來自于事件驅(qū)動架構(gòu)(EDA)的設(shè)計思想,這是一種讓程序(應用、服務(wù))之間解耦的主要方式,通過發(fā)布訂閱的思想也可以實現(xiàn)服務(wù)之間的異步調(diào)用。而大部分分布式應用都會依賴這樣的發(fā)布訂閱解耦模式。
整個發(fā)布訂閱的思想其實是比較簡單的:

如上圖所示,把需要解耦的程序分別設(shè)定為事件發(fā)布者或者事件訂閱者(理論上,對于某個事件,一個程序僅能作為一種角色;對于不同事件,一個程序可以既作為發(fā)布者又可以作為訂閱者)。同時利用消息代理(Message Broker)中間件把兩者對接起來,消息代理即作為事件消息的傳輸通道。
在Dapr中對這種發(fā)布訂閱模式進行了高度抽象的實現(xiàn),并提供了自由替換消息代理中間件的特性,如下圖所示:
Dapr的發(fā)布訂閱構(gòu)建塊也可以被看作一種事件總線(Event Bus)的實現(xiàn),只是你不需要使用特殊的協(xié)議,在發(fā)布端和訂閱端僅使用HTTP/gRPC即可。
在事件總線中,把發(fā)布訂閱兩者關(guān)聯(lián)在一起的是事件類型,那么在Dapr中也引入了一個類似的概念——主題(Topic)。如果對消息隊列中間件熟悉的人對于這個概念不會陌生。由此發(fā)布端和訂閱端的處理過程和針對Dapr的接口也就是圍繞主題來展開的。
能力
消息發(fā)送
既然Dapr的PubSub是一種事件總線,那么要發(fā)送消息,必然需要對代表主題(事件類型)的消息進行封裝。Dapr并沒有去創(chuàng)造一種獨有的格式,而是采用了目前業(yè)界比較流行的開放協(xié)議——云事件(CloudEvents)規(guī)范。這種格式把事件消息封裝為如下JSON數(shù)據(jù):
{
"specversion" : "1.0",
"type" : "xml.message",
"source" : "https:///message",
"subject" : "Test XML Message",
"id" : "id-1234-5678-9101",
"time" : "2020-09-23T06:23:21Z",
"datacontenttype" : "text/xml",
"data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"
}
當然對消息的封裝不需要應用程序本身去關(guān)心,你只需要給Dapr傳遞data的字符串即可,而這個字符串本身是以什么格式(不管xml還是json)去承載內(nèi)容都是由應用程序確定。具體如何發(fā)送消息,下面規(guī)范部分會介紹。
消息傳遞
Dapr會自動根據(jù)主題把消息發(fā)送給所有訂閱者,傳遞過程保證“至少一次”送達。送達的判斷標準是基于訂閱者的響應是否成功(即HTTP狀態(tài)碼為20X)。
當然,訂閱者也可以在響應體中設(shè)置 status
屬性來給出更為精細的處理指令,比如 RETRY
告知Dapr之前處理失敗了,現(xiàn)在是重試成功了;或者 DROP
告知Dapr應用程序?qū)@個消息處理出現(xiàn)問題,已經(jīng)記錄了告警日志,但是不打算繼續(xù)處理它了。
消息傳遞還有一個重要的特性需要理解,就是消息的生存期(Time-to-Live,TTL)。TTL規(guī)定了消息在Dapr(實際上是在消息代理中間件)里面的存活時間,如果TTL過期,那么消息就不會再被傳遞(即變成死信)。所有目前支持的發(fā)布訂閱組件都支持TTL的特性,Dapr會幫助你處理這方面的邏輯。
消息消費
為了消費消息,需要對主題進行注冊,可以通過聲明式和編程式來進行注冊。聲明式通過外部的yaml文件定義一個K8S的CRD,來描述服務(wù)需要訂閱什么主題,接收事件的HTTP API路由地址。編程式通過暴露特定的HTTP API路由地址或者特定的gRPC方法來讓Dapr運行時進行訪問,從而注冊需要訂閱什么主題和接收事件的地址。
發(fā)布訂閱構(gòu)建塊采用的是所謂競爭者消費模式,即同一個應用(AppId相同)的多個實例,只會有一個實例獲得消息,這些同個應用的多個實例稱之為一個消費組。如果你希望消息被多個應用得到,那么就需要使用多個消費組,也即多個AppId。
主題范圍限制
從上面所知,在發(fā)送消息和消費消息的時候,都需要針對某個主題。為了對消息的傳遞進行更加精細的控制,在發(fā)布訂閱構(gòu)建塊中可以對主題范圍進行限制,即某些主題只能由某些應用來發(fā)布,某些主題只能由某些應用來訂閱。
要進行范圍限制,需要對發(fā)布訂閱組件的配置yaml進行配置,設(shè)置 spec.metadata
下面的 publishingScopes
, subscriptionScopes
和 allowedTopics
配置。(詳細說明見未來的關(guān)于組件的文章)。
規(guī)范
Dapr給PubSub這一構(gòu)建塊提供了兩方面的規(guī)范:消息生產(chǎn)端和消息消費端。
消息生產(chǎn)端
通過POST如下地址,來發(fā)送消息到特定主題:
POST http://localhost:<daprPort>/v1.0/publish/<pubsubname>/<topic>[?<metadata>]
其中pubsubname代表了PubSub組件的名稱;topic代表了目標主題名稱。
在 Content-Type
請求頭中設(shè)置請求體的格式,比如 application/json
請求體根據(jù)請求頭里面的設(shè)置格式,傳入JSON或者XML,Dapr會自動把請求體封裝為CloudEvent格式。
如果是直接調(diào)用gRPC的接口的話,是調(diào)用 PublishEvent
接口并傳遞 PublishEventRequest
實體。
消息消費端
如果你的消費端是通過聲明式來向Dapr注冊需要訂閱什么主題的消息,那么在如下格式的yaml文件中進行定義:
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: deathStarStatus
route: /dsstatus
pubsubname: pubsub
scopes:
- app1
- app2
其中 spec.topic
代表了要訂閱的主題名稱,spec.route
代表了接收訂閱消息的HTTP路由地址,spec.route
代表了針對的PubSub組件是那個。尤為關(guān)鍵是 scopes
里面設(shè)置了這樣的訂閱聲明是針對那個(或那幾個)應用起作用(填入appid)。
如果你的消費端是通過編程式來向Dapr注冊需要訂閱什么主題的消息,那么暴露一個如下特殊HTTP路由地址的接口:
GET http://localhost:<appPort>/dapr/subscribe
并返回如下格式的響應體,讓Dapr知道你的應用需要訂閱什么主題,接收消息的接口路由地址是什么:
[
{
"pubsubname": "pubsub",
"topic": "newOrder",
"route": "/orders"
}
]
當然你的應用需要暴露注冊的接收路由接口,并支持POST謂詞,接口收到請求后返回2xx狀態(tài)碼告訴Dapr消息處理成功了,或者404告訴Dapr出現(xiàn)錯誤且消息已經(jīng)丟棄,或者其他狀態(tài)碼讓Dapr進行重試。
兩種訂閱注冊方式各有優(yōu)缺,聲明式方便一個主題注冊多個應用,編程式方便一個應用注冊多個主題。
注意:如果是使用gRPC來注冊和暴露消費接口,那么規(guī)范有所不同,做法見下面。
DOTNET SDK
Dapr的.NET SDK同樣針對消息生產(chǎn)端和消費端提供相關(guān)的函數(shù)庫。
在DaprClient這個客戶端類中提供了 PublishEventAsync
這個方法來用于發(fā)送事件消息到特定PubSub的特定主題上 (底層是請求了gRPC的接口)。比如:
using var client = new DaprClientBuilder().Build();
var eventData = new { Id = "17", Amount = 10m, };
await client.PublishEventAsync(pubsubName, "deposit", eventData, cancellationToken);
在消費端,目前針對ASP.NET Core提供了一個特殊的屬性標記 TopicAttribute
來簡化編程式訂閱注冊的過程。比如:
[Topic("pubsub", "deposit")]
[HttpPost("deposit")]
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)
如果你是使用gRPC來實現(xiàn)消費端,那么目前并沒有一個簡化方式來注冊(未來我會補上這個坑),只能遵循如下規(guī)范:
首先用ListTopicSubscriptions注冊:
public override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(Empty request, ServerCallContext context)
{
var result = new ListTopicSubscriptionsResponse();
result.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = "deposit"
});
result.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = "withdraw"
});
return Task.FromResult(result);
}
接著用OnTopicEvent接收:
public override async Task<TopicEventResponse> OnTopicEvent(TopicEventRequest request, ServerCallContext context)
{
if (request.PubsubName == "pubsub")
{
var input = JsonSerializer.Deserialize<Models.Transaction>(request.Data.ToStringUtf8(), this.jsonOptions);
var transaction = new GrpcServiceSample.Generated.Transaction() { Id = input.Id, Amount = (int)input.Amount, };
if (request.Topic == "deposit")
{
await Deposit(transaction, context);
}
else
{
await Withdraw(transaction, context);
}
}
return await Task.FromResult(default(TopicEventResponse));
}
具體見SDK的examples:https://github.com/heavenwing/dapr-dotnet-sdk/blob/master/examples/AspNetCore/GrpcServiceSample/Services/BankingService.cs
用法與例子
使用SDK來進行事件消息的發(fā)布和訂閱,可以直接參考SDK的examples中的消費端例子 ControllerSample 和生產(chǎn)端例子 PublishSubscribe。
如果是非SDK的用法,可以參考我的quickstarts,消費端 PubSubConsumer和生產(chǎn)端 PubSubProducer。
我的quickstarts的消費端同時使用聲明式和編程式兩種注冊方式。大致代碼如下:
[Route("dapr/subscribe")]
[ApiController]
public class DaprSubscribeController : ControllerBase
{
public ActionResult<DaprSubscribeOutput[]> Get()
{
return Ok(new DaprSubscribeOutput[]
{
new DaprSubscribeOutput
{
PubSubName="pubsub",
Topic="quickstarts/wakeup",
Route="/api/wakeup"
}
});
}
}
public async Task<IActionResult> PostAsync(TinyCloudEvent<MessageInput> model)
{
_logger.LogInformation(model.Data.Name);
return Ok();
}
using var httpClient = new HttpClient();
httpClient.BaseAddress = new Uri(pubsubUrl);
Console.WriteLine($"To {topicName1} ...");
var request1 = new HttpRequestMessage(HttpMethod.Post, topicName1);
request1.Content = new StringContent(JsonSerializer.Serialize(new { name = "Jack" }), Encoding.UTF8, "application/json");
await httpClient.SendAsync(request1);
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: quickstarts-subscription
spec:
topic: quickstarts/sleep
route: /api/sleep
pubsubname: pubsub
scopes:
- quickstarts-pbc