消息容器介紹
1. 前言
Hello,大家好。本小節(jié)會(huì)為同學(xué)們介紹 RabbitMQ 在 Spring 生態(tài)中的消息容器,消息容器是 RabbitMQ 在 Spring 生態(tài)中的第三個(gè)核心元素,消息容器指的不是一種特定的容納消息的容器,而是一系列提供容納消息、監(jiān)聽消息等其他對(duì)消息指標(biāo)進(jìn)行監(jiān)控的工具,接下來就讓我們來看看到底什么是消息容器吧。
本節(jié)主要內(nèi)容:
-
消息容器基礎(chǔ)概念概述;
-
常用消息容器基礎(chǔ)配置概述。
2. 消息容器基礎(chǔ)概念概述
基礎(chǔ)概念:
消息容器,即容納消息的容器。通過對(duì)上述部分小節(jié)的學(xué)習(xí),我們已經(jīng)知道,在 原生 RabbitMQ 知識(shí)體系中,并沒有單獨(dú)地一個(gè)消息容器的概念,只有消息隊(duì)列的概念。
但是,在 Spring-AMQP 中,消息容器的概念和原生 RabbitMQ 知識(shí)體系中的消息隊(duì)列的概念完全不一樣。Spring-AMQP 中的消息容器指的是:可以提供對(duì)消息隊(duì)列、消息、消費(fèi)者、消息簽收模式進(jìn)行控制,以及消息的全方位監(jiān)聽的一系列工具的統(tǒng)稱。
我們知道,在 RabbitMQ 中,充當(dāng)核心角色的就是我們應(yīng)用程序中的數(shù)據(jù),也就是消息,那么,對(duì)消息以及消息隊(duì)列進(jìn)行全方位監(jiān)控的工具,在 Spring-AMQP 中就被稱為消息容器。
消息容器的主要作用就是對(duì)經(jīng)過 RabbitTemplate 消息模板發(fā)送到 RabbitMQ Server 中的消息進(jìn)行監(jiān)聽,當(dāng)然,要確保 RabbitTemplate 和消息容器所使用的是同一個(gè) RabbitAdmin 構(gòu)造的連接,這樣才能對(duì)消息進(jìn)行監(jiān)控。
在介紹完消息容器的基礎(chǔ)概念之后,下面讓我們來看一下如何對(duì)消息容器進(jìn)行簡單的配置吧。
3. 常用消息容器基礎(chǔ)配置概述
還是像上節(jié)小節(jié)一樣,要想在 Spring 中使用消息容器,需要將 Spring-AMQP 和 AMQP-Stater 的依賴先引入進(jìn)來,方便起見,同學(xué)們可以直接拷貝下放代碼:
3.1 引入消息容器
以 Maven 引入方式為例,引入代碼如下所示:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
在將這兩個(gè)依賴進(jìn)行引入之后,我們就可以對(duì)消息容器進(jìn)行配置了。
3.2 常用消息容器基礎(chǔ)配置
在 Spring-AMQP 中,消息容器共有這五種:AbstractMessageListenerContainer、DirectMessageListenerContainer、DirectReplyToMessageListenerContainer、SimpleMessageListenerContainer,以及 MessageListenerContainer 接口。
在這五種消息容器中,常用的消息容器只有一種,就是 SimpleMessageListenerContainer ,由于 SimpleMessageListenerContainer 消息容器配置起來簡單方便,所支持的配置的屬性比較全面,所以該消息容器是所有消息容器中使用頻率最高的,也是實(shí)際工作中使用最多的一種消息容器。
本節(jié)以 SimpleMessageListenerContainer 消息容器為例,來介紹一下 SimpleMessageListenerContainer 消息容器該如何使用吧。
Tips: SimpleMessageListenerContainer 消息容器是眾多消息容器中最基本的消息容器,我們把 SimpleMessageListenerContainer 消息容器的基本使用了解之后,其他的消息容器就無師自通了,只不過其他的消息容器所提供的配置屬性或多或少罷了。
初始化 SimpleMessageListenerContainer 消息容器
像 RabbitAdmin 和 RabbitTemplate 一樣,要想使用消息容器,需要先對(duì)消息容器進(jìn)行初始化,這個(gè)初始化過程非常簡單,初始化 SimpleMessageListenerContainer 消息容器的代碼如下所示:
代碼實(shí)現(xiàn):
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
return simpleMessageListenerContainer;
}
代碼解釋:
第 1 行,我們使用 Spring 的 Bean 注解將我們聲明的 simpleMessageListenerContainer 方法注入到 Spring 容器中,這樣 Spring 容器就可以監(jiān)聽到我們注入的配置。
第 2 行,我們使用 Spring-AMQP 中的 SimpleMessageListenerContainer 類,來聲明了一個(gè)名為 simpleMessageListenerContainer 的方法,用來對(duì) simpleMessageListenerContainer 消息容器進(jìn)行初始化。
第 3 行,我們實(shí)例化了一個(gè) simpleMessageListenerContainer 實(shí)例,該實(shí)例是 Spring-AMQP 中對(duì) simpleMessageListenerContainer 消息容器進(jìn)行初始化的實(shí)例,要想使用 SimpleMessageListenerContainer ,就必須要初始化該實(shí)例。
第 4 行,我們將初始化好的 simpleMessageListenerContainer 實(shí)例進(jìn)行返回。
SimpleMessageListenerContainer 基本使用
配置 SimpleMessageListenerContainer 消息容器,和之前的配置 RabbitAdmin 以及 RabbitTemplate 不同,我們只需要直接在上述初始化方法中去填充我們需要的配置屬性即可,這里以基本常用屬性為例,代碼如下所示:
simpleMessageListenerContainer.setQueues(new Queue("test_001"), new Queue("test_002"), new Queue("test_003"));
simpleMessageListenerContainer.setConcurrentConsumers(1);
simpleMessageListenerContainer.setMaxConcurrentConsumers(3);
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + "created consumer tag";
}
});
simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
// do someting...
}
simpleMessageListenerContainer.setAutoStartup(true)
});
代碼解釋:
第 1-2 行,我們使用 simpleMessageListenerContainer 的 setQueues 方法,來設(shè)置我們需要進(jìn)行監(jiān)聽的隊(duì)列,這里新建了三個(gè)隊(duì)列,名稱分別為:test_001 ,test_002 ,test_003。
第 3-4 行,我們分別使用了 simpleMessageListenerContainer 的 setConcurrentConsumers 方法和 setMaxConcurrentConsumers 方法,來分別設(shè)置當(dāng)前用于消費(fèi)消息的消費(fèi)者個(gè)數(shù),以及最大允許用于消費(fèi)消息的消費(fèi)者個(gè)數(shù),這里分別被設(shè)置成了 1 和 3 ,表示:當(dāng)前用于消費(fèi)消息的消費(fèi)者個(gè)數(shù)為 1 個(gè),最大允許進(jìn)行消費(fèi)消息的消費(fèi)者個(gè)數(shù)為 3 個(gè)。
第 5 行,我們使用 simpleMessageListenerContainer 的 setDefaultRequeueRejected 方法,來設(shè)置是否開啟重回隊(duì)列, 關(guān)于什么是重回隊(duì)列,在前面小節(jié)中都有詳細(xì)介紹,這里不再贅述。當(dāng) setDefaultRequeueRejected 方法的值為 true 時(shí),表示開啟重回隊(duì)列,當(dāng)為 false 時(shí),表示關(guān)閉重回隊(duì)列,這里表示不適使用重回隊(duì)列,即該方法的值為 false。
第 6 行,我們使用 simpleMessageListenerContainer 的 setAcknowledgeMode 方法,來設(shè)置當(dāng)前消息的接收模式, 即對(duì)應(yīng)原生 RabbitMQ 中的消息簽收模式,是自動(dòng)簽收還是手動(dòng)簽收,這里,AcknowledgeMode.AUTO 表示自動(dòng)簽收消息。
第 7-10 行,我們使用 simpleMessageListenerContainer 的 setConsumerTagStrategy 方法,來為消費(fèi)者設(shè)置一個(gè)標(biāo)簽,設(shè)置標(biāo)簽需要通過 new ConsumerTagStrategy 匿名內(nèi)部類的方式來實(shí)現(xiàn),通過重寫其中的 createConsumerTag 方法來設(shè)置消費(fèi)者的 Tag 標(biāo)簽。
第 13-17 行,我們使用 simpleMessageListenerContainer 的 setMessageListener 方法,來添加一種監(jiān)聽器, 添加監(jiān)聽器的方式也是通過匿名內(nèi)部類實(shí)現(xiàn);new ChannelAwareMessageListener 監(jiān)聽器監(jiān)聽的是,當(dāng)消息經(jīng)過 channel 時(shí)的情況,實(shí)現(xiàn)該監(jiān)聽器需要重寫其中的 onMessage 方法,我們可以將業(yè)務(wù)邏輯填充在 onMessage 方法中。
setMessageListener 方法可以添加任意一種 Spring-AMQP 中支持的監(jiān)聽器,不單單只有 ChannelAwareMessageListener 監(jiān)聽器這一種,其他的監(jiān)聽器就交給同學(xué)們自行探索吧。
第 19 行,我們使用 simpleMessageListenerContainer 的 setAutoStartup 方法,來設(shè)置 simpleMessageListenerContainer 消息容器的啟動(dòng)模式,這里被設(shè)置為了 true ,表示,當(dāng) Spring 容器啟動(dòng)時(shí),simpleMessageListenerContainer 消息容器也隨之啟動(dòng)。
Tips: 1. setQueues 方法中的參數(shù)只有一個(gè),就是消息隊(duì)列,其參數(shù)類型為 Queue… ,該參數(shù)理論上可以支持無限個(gè)隊(duì)列被添加進(jìn)去,從而對(duì)這些隊(duì)列進(jìn)行監(jiān)聽;
2. AcknowledgeMode 的消息簽收類型,不只有 AUTO 這一種,還有 NONE 和 MANUAL ,分別表示不設(shè)置和手動(dòng)監(jiān)聽。
3. simpleMessageListenerContainer 消息容器是支持熱加載的一款消息容器,即當(dāng)我們的應(yīng)用程序處于運(yùn)行狀態(tài)時(shí),我們手動(dòng)改變了某一配置屬性的值,這種改動(dòng)是立即生效的,不需要我們重啟我們的應(yīng)用程序,這點(diǎn)同學(xué)們注意。
4. 小結(jié)

本小節(jié)詳細(xì)為同學(xué)們介紹了 Spring 生態(tài)中的消息容器這一概念,并通過結(jié)合常用的 SimpleMessageListenerContainer 消息容器,來介紹了消息容器的基本配置方法和常用配置屬性,希望同學(xué)們可以對(duì) Spring-AMQP 中的常用消息容器 SimpleMessageListenerContainer 有一個(gè)基礎(chǔ)的認(rèn)知,這樣,我們以后在使用其他消息容器時(shí),才會(huì)做到融會(huì)貫通。