原標(biāo)題:Spring認(rèn)證中國教育管理中心-了解如何使用 Spring 和 RabbitMQ 創(chuàng)建一個(gè)簡(jiǎn)單的發(fā)布和訂閱應(yīng)用程序。(內(nèi)容來源:Spring中國教育管理中心) 本指南將引導(dǎo)您完成設(shè)置發(fā)布和訂閱消息的 RabbitMQ AMQP 服務(wù)器以及創(chuàng)建 Spring Boot 應(yīng)用程序以與該 RabbitMQ 服務(wù)器交互的過程。 你將建造什么您將構(gòu)建一個(gè)應(yīng)用程序,該應(yīng)用程序使用 Spring AMQP 發(fā)布消息RabbitTemplate并使用MessageListenerAdapter. 你需要什么
如何完成本指南像大多數(shù) Spring入門指南一樣,您可以從頭開始并完成每個(gè)步驟,也可以繞過您已經(jīng)熟悉的基本設(shè)置步驟。無論哪種方式,您最終都會(huì)得到工作代碼。 要從頭開始,請(qǐng)繼續(xù)設(shè)置 RabbitMQ 代理。 要跳過基礎(chǔ)知識(shí),請(qǐng)執(zhí)行以下操作:
完成后,您可以對(duì)照中的代碼檢查結(jié)果 設(shè)置 RabbitMQ 代理在構(gòu)建消息傳遞應(yīng)用程序之前,您需要設(shè)置一個(gè)服務(wù)器來處理接收和發(fā)送消息。 RabbitMQ 是一個(gè) AMQP 服務(wù)器。該服務(wù)器可在 brew install rabbitmq 通過在終端窗口中運(yùn)行以下命令,解壓縮服務(wù)器并使用默認(rèn)設(shè)置啟動(dòng)它: rabbitmq-server 您應(yīng)該會(huì)看到類似于以下內(nèi)容的輸出: RabbitMQ 3.1.3. Copyright (C) 2007-2013 VMware, Inc.## ## Licensed under the MPL. See https://www./## ############ Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log########## Starting broker... completed with 6 plugins. 如果您在本地運(yùn)行 Docker,您還可以使用Docker Compose快速啟動(dòng) RabbitMQ 服務(wù)器。Github 項(xiàng)目docker-compose.yml的根目錄中有一個(gè)。complete這非常簡(jiǎn)單,如下面的清單所示: rabbitmq: image: rabbitmq:management ports: - "5672:5672" - "15672:15672" 使用當(dāng)前目錄中的此文件,您可以運(yùn)行docker-compose up以使 RabbitMQ 在容器中運(yùn)行。 從 Spring Initializr 開始您可以使用這個(gè)預(yù)先初始化的項(xiàng)目并單擊 Generate 下載 ZIP 文件。此項(xiàng)目配置為適合本教程中的示例。 手動(dòng)初始化項(xiàng)目:
如果您的 IDE 具有 Spring Initializr 集成,您可以從您的 IDE 完成此過程。 你也可以從 Github 上 fork 項(xiàng)目并在你的 IDE 或其他編輯器中打開它。 創(chuàng)建 RabbitMQ 消息接收器對(duì)于任何基于消息傳遞的應(yīng)用程序,您都需要?jiǎng)?chuàng)建一個(gè)響應(yīng)已發(fā)布消息的接收器。以下清單(來自 package com.example.messagingrabbitmq;import java.util.concurrent.CountDownLatch;import org.springframework.stereotype.Component;@Componentpublic class Receiver { private CountDownLatch latch = new CountDownLatch(1); public void receiveMessage(String message) { System.out.println("Received <" + message + ">"); latch.countDown(); } public CountDownLatch getLatch() { return latch; } } 這Receiver是一個(gè) POJO,它定義了接收消息的方法。當(dāng)您注冊(cè)它以接收消息時(shí),您可以將其命名為任何您想要的名稱。 為了方便起見,這個(gè) POJO 也有一個(gè)CountDownLatch. 這讓它發(fā)出已收到消息的信號(hào)。這是您不太可能在生產(chǎn)應(yīng)用程序中實(shí)現(xiàn)的東西。 注冊(cè)監(jiān)聽器并發(fā)送消息Spring AMQPRabbitTemplate提供了使用 RabbitMQ 發(fā)送和接收消息所需的一切。但是,您需要:
Spring Boot 會(huì)自動(dòng)創(chuàng)建連接工廠和 RabbitTemplate,從而減少您必須編寫的代碼量。 您將使用RabbitTemplate來發(fā)送消息,并且您將Receiver使用消息偵聽器容器注冊(cè)一個(gè)以接收消息。連接工廠驅(qū)動(dòng)兩者,讓它們連接到 RabbitMQ 服務(wù)器。以下清單(來自 package com.example.messagingrabbitmq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;@SpringBootApplicationpublic class MessagingRabbitmqApplication { static final String topicExchangeName = "spring-boot-exchange"; static final String queueName = "spring-boot"; @Bean Queue queue() { return new Queue(queueName, false); } @Bean TopicExchange exchange() { return new TopicExchange(topicExchangeName); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#"); } @Bean SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); container.setMessageListener(listenerAdapter); return container; } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } public static void main(String[] args) throws InterruptedException { SpringApplication.run(MessagingRabbitmqApplication.class, args).close(); } } @SpringBootApplication是一個(gè)方便的注釋,它添加了以下所有內(nèi)容:
該main()方法使用 Spring Boot 的SpringApplication.run()方法來啟動(dòng)應(yīng)用程序。您是否注意到?jīng)]有一行 XML?也沒有web.xml文件。這個(gè) Web 應(yīng)用程序是 100% 純 Java,您不必處理任何管道或基礎(chǔ)設(shè)施的配置。 方法中定義的 beanlistenerAdapter()被注冊(cè)為容器中的消息監(jiān)聽器(定義在 中container())。它偵聽spring-boot隊(duì)列中的消息。因?yàn)?span style="box-sizing: border-box;margin: 0px;padding: 0px;border: 0px;color: rgb(0, 0, 153);--tt-darkmode-color: #393FFF;">Receiver該類是 POJO,所以需要將其包裝在 中MessageListenerAdapter,您可以在其中指定它調(diào)用receiveMessage. JMS 隊(duì)列和 AMQP 隊(duì)列具有不同的語義。例如,JMS 僅將排隊(duì)的消息發(fā)送給一個(gè)消費(fèi)者。雖然 AMQP 隊(duì)列做同樣的事情,但 AMQP 生產(chǎn)者并不直接將消息發(fā)送到隊(duì)列。相反,一條消息被發(fā)送到一個(gè)交換器,該交換器可以發(fā)送到單個(gè)隊(duì)列或扇出到多個(gè)隊(duì)列,模擬 JMS 主題的概念。 消息偵聽器容器和接收器 bean 是您偵聽消息所需的全部內(nèi)容。要發(fā)送消息,您還需要一個(gè) Rabbit 模板。 該queue()方法創(chuàng)建一個(gè) AMQP 隊(duì)列。該exchange()方法創(chuàng)建主題交換。該方法將這兩者綁定在一起,定義發(fā)布到交換binding()時(shí)發(fā)生的行為。RabbitTemplate Spring AMQP 要求將Queue、TopicExchange和Binding聲明為頂級(jí) Spring bean 以便正確設(shè)置。 在這種情況下,我們使用主題交換,并且隊(duì)列與路由鍵綁定foo.bar.#,這意味著以 開頭的路由鍵發(fā)送的任何消息都會(huì)foo.bar.被路由到隊(duì)列。 發(fā)送測(cè)試消息在此示例中,測(cè)試消息由 a 發(fā)送CommandLineRunner,它還等待接收器中的閂鎖并關(guān)閉應(yīng)用程序上下文。以下清單(來自 package com.example.messagingrabbitmq;import java.util.concurrent.TimeUnit;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;@Componentpublic class Runner implements CommandLineRunner { private final RabbitTemplate rabbitTemplate; private final Receiver receiver; public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) { this.receiver = receiver; this.rabbitTemplate = rabbitTemplate; } @Override public void run(String... args) throws Exception { System.out.println("Sending message..."); rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!"); receiver.getLatch().await(10000, TimeUnit.MILLISECONDS); } } foo.bar.baz請(qǐng)注意,模板使用與綁定匹配的路由鍵將消息路由到交換器。 在測(cè)試中,您可以模擬運(yùn)行器,以便可以單獨(dú)測(cè)試接收器。 運(yùn)行應(yīng)用程序該main()方法通過創(chuàng)建 Spring 應(yīng)用程序上下文來啟動(dòng)該過程。這將啟動(dòng)消息偵聽器容器,該容器開始偵聽消息。有一個(gè)Runnerbean,然后會(huì)自動(dòng)運(yùn)行。它從應(yīng)用程序上下文中檢索并在隊(duì)列RabbitTemplate中發(fā)送Hello from RabbitMQ!消息。spring-boot最后,它關(guān)閉 Spring 應(yīng)用程序上下文,應(yīng)用程序結(jié)束。 構(gòu)建一個(gè)可執(zhí)行的 JAR您可以使用 Gradle 或 Maven 從命令行運(yùn)行應(yīng)用程序。您還可以構(gòu)建一個(gè)包含所有必要依賴項(xiàng)、類和資源的單個(gè)可執(zhí)行 JAR 文件并運(yùn)行它。構(gòu)建可執(zhí)行 jar 可以在整個(gè)開發(fā)生命周期、跨不同環(huán)境等中輕松地將服務(wù)作為應(yīng)用程序交付、版本化和部署。 如果您使用 Gradle,則可以使用./gradlew bootRun. 或者,您可以使用構(gòu)建 JAR 文件./gradlew build,然后運(yùn)行 JAR 文件,如下所示: java -jar build/libs/gs-messaging-rabbitmq-0.1.0.jar 如果您使用 Maven,則可以使用./mvnw spring-boot:run. 或者,您可以使用構(gòu)建 JAR 文件,./mvnw clean package然后運(yùn)行該 JAR 文件,如下所示: java -jar 目標(biāo)/gs-messaging-rabbitmq-0.1.0.jar 此處描述的步驟創(chuàng)建了一個(gè)可運(yùn)行的 JAR。您還可以構(gòu)建經(jīng)典的 WAR 文件。 您應(yīng)該看到以下輸出: Sending message... Received <Hello from RabbitMQ!>復(fù)制 總結(jié)恭喜!您剛剛使用 Spring 和 RabbitMQ 開發(fā)了一個(gè)簡(jiǎn)單的發(fā)布和訂閱應(yīng)用程序。您可以使用Spring 和 RabbitMQ做比這里更多的事情,但本指南應(yīng)該提供一個(gè)良好的開端。 |
|