最近中文字幕高清中文字幕无,亚洲欧美高清一区二区三区,一本色道无码道dvd在线观看 ,一个人看的www免费高清中文字幕

全部開發(fā)者教程

RabbitMQ 入門教程

RabbitMQ 簡(jiǎn)介
RabbitMQ 簡(jiǎn)介
首頁(yè) 慕課教程 RabbitMQ 入門教程 RabbitMQ 入門教程 RabbitMQ中消費(fèi)者ACK與重回隊(duì)列機(jī)制

RabbitMQ 中消費(fèi)者ACK與重回隊(duì)列機(jī)制

1. 前言

Hello,大家好。本小節(jié)會(huì)為大家介紹 RabbitMQ 中的消費(fèi)者 ACK 與消息的重回隊(duì)列機(jī)制。消費(fèi)者 ACK 與我們上節(jié)介紹的消息確認(rèn)機(jī)制類似,都是針對(duì)消息而言的,而消息的重回隊(duì)列機(jī)制確是針對(duì)特殊的消息類型而來。

消費(fèi)者 ACK 與消息重回隊(duì)列機(jī)制和上節(jié)中介紹的消息確認(rèn)機(jī)制與消息返回機(jī)制意義相同,都屬于 RabbitMQ 自帶的補(bǔ)償機(jī)制,只不過他們是針對(duì)于不同的消息來說的,下面就讓我們來看一下究竟什么是 RabbitMQ 中的消費(fèi)者 ACK 與重回隊(duì)列機(jī)制。

本節(jié)主要內(nèi)容:

  • 什么是消費(fèi)者 ACK;

  • 消息重回隊(duì)列機(jī)制概述;

2. 什么是消費(fèi)者 ACK

基礎(chǔ)概念:

消費(fèi)者 ACK ,是描述消息與消費(fèi)者之間的一種確認(rèn)關(guān)系,其主要內(nèi)容就是用來監(jiān)聽,消息是否已經(jīng)被消費(fèi)者成功消費(fèi)了。

我們都知道,當(dāng)消息成功被發(fā)送到 RabbitMQ Server 中,并且經(jīng)交換機(jī)和頻道,被路由到了相應(yīng)的消息隊(duì)里之后,這些消息就需要等待合適的消費(fèi)者來接收這些消息,并最終將這些消息進(jìn)行消費(fèi)。消費(fèi)者 ACK 就是在這個(gè)過程中間充當(dāng)了一種監(jiān)聽器的作用,主要就是用來監(jiān)聽這些消息被消費(fèi)者進(jìn)行消費(fèi)的結(jié)果,并將消息消費(fèi)的結(jié)果返回給消費(fèi)者。

從上述基礎(chǔ)概念中,可以很清晰的得出消費(fèi)者 ACK 的概念圖,如下圖所示:

根據(jù)上圖,消息在被成功發(fā)送到 RabbitMQ Server 中之后,消費(fèi)端從相應(yīng)的消息隊(duì)列中獲取到了消息,并在消息被消費(fèi)之后,給消費(fèi)端返回了 ACK 信號(hào),這個(gè) ACK 信號(hào)的返回一共分為兩種結(jié)果。

第一種 ACK 信號(hào)返回結(jié)果就是消息已經(jīng)被成功消費(fèi)了,這個(gè)時(shí)候返回給消費(fèi)端的是 ack 信號(hào),即消息消費(fèi)成功的確認(rèn)信號(hào);另一種 ACK 信號(hào)返回結(jié)果是消息沒有被消費(fèi)成功,這個(gè)時(shí)候返回給消費(fèi)端的是 nack 信號(hào),即消息沒有被消費(fèi)者消費(fèi)成功。

對(duì)于這兩種返回信號(hào)來說,第一種 ACK 返回信息是 RabbitMQ Server 中的消息一旦被成功消費(fèi)后就會(huì)返回給消費(fèi)端,這個(gè)過程是自動(dòng)的;而另一種 ACK 信號(hào),由于消息在沒有被成功消息后,RabbitMQ 會(huì)有自帶的解決措施,所以此種 ACK 信號(hào)需要我們手動(dòng)來選擇,到底是使用 RabbitMQ 自帶的解決措施,還是使用消費(fèi)者 ACK 機(jī)制。

而無論 RabbitMQ Server 返回給了消費(fèi)端哪種信號(hào),都需要我們手動(dòng)來獲取消息隊(duì)列中的消息,并且手動(dòng)對(duì)這些消息進(jìn)行消費(fèi)。

我們?cè)诹私饬讼M(fèi)者 ACK 機(jī)制的基礎(chǔ)概念和作用之后,我們還需要了解在 RabbitMQ 中,如何通過代碼來實(shí)現(xiàn) RabbitMQ 的消費(fèi)者 ACK 機(jī)制。

代碼實(shí)現(xiàn):

實(shí)現(xiàn)消費(fèi)者 ACK 機(jī)制,只需要在消費(fèi)端進(jìn)行配置即可,代碼如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("xx");
connectionFactory.setPort("5672");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
DefaultConsumer defaultConsumer = new DefaultCnsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
};

代碼解釋:

第 1-5 行,我們使用 ConnectionFactory 創(chuàng)建了一個(gè)客戶端連接 RabbitMQ Server 的連接。

第 6 行,我們使用建立好的連接,來創(chuàng)建了一個(gè)頻道 channel 。

第 7-10 行,我們使用手動(dòng)創(chuàng)建 DefaultConsumer 的方式來手動(dòng)從消息隊(duì)列中獲取消息并進(jìn)行消費(fèi),其中,此種手動(dòng)創(chuàng)建 Consumer 的方式需要通過匿名內(nèi)部類的方式來實(shí)現(xiàn),在類的內(nèi)部,還需要重寫 handleDelivery 方法,該方法的第一個(gè)參數(shù),是消費(fèi)者的標(biāo)簽,第二個(gè)參數(shù)是當(dāng)前消費(fèi)者所處的環(huán)境信息,第三個(gè)參數(shù)和第四個(gè)參數(shù)分別為消息的 properties 參數(shù)和消息體。

第 12 行,我們使用了 channel 的 basicAck 方法,來對(duì)消息進(jìn)行消費(fèi) ACK ,即一旦消息被成功消費(fèi),就會(huì)通過 basicAck 方法將第一種 ACK 信號(hào)返回給消費(fèi)端。

Tips: 1.handleDeliery 方法是進(jìn)行消息有沒有被消費(fèi)的方法,我們只有重寫了該方法,才能手動(dòng)監(jiān)聽到消息有沒有被消費(fèi);
2. channel 的 basicAck 方法的第一個(gè)參數(shù)就是當(dāng)前消費(fèi)者的標(biāo)簽,第二個(gè)參數(shù)表示是否啟用 RabbitMQ 的消費(fèi)者自動(dòng) ACK ,如果要想手動(dòng)對(duì)消費(fèi)端進(jìn)行 ACK ,那么這個(gè)屬性一定不要開啟,即將該屬性的值設(shè)置為 false ,關(guān)閉消費(fèi)端的 autoAck 才行。

3. 消息重回隊(duì)列機(jī)制概述

基礎(chǔ)概念:

消息重回隊(duì)列機(jī)制,是描述那些沒有被成功消費(fèi)的消息與消費(fèi)端之間的一種保障策略, 其主要目的就是為了存儲(chǔ)那些沒有被消費(fèi)者成功消費(fèi)掉的消息,即在 RabbitMQ 諸多的消息隊(duì)列中,專門用來存儲(chǔ)那些沒有被消費(fèi)者成功消費(fèi)掉的消息的隊(duì)列,這種隊(duì)列就被稱為重回隊(duì)列。

那么為什么被稱為重回隊(duì)列呢?因?yàn)?,?RabbitMQ 中,如果一條消息沒有被消費(fèi)者成功消費(fèi)掉,那么這條消息除了會(huì)被放到重回隊(duì)列中之外,RabbitMQ Server 還會(huì)從該重回隊(duì)列中獲取到這些消息,并且將這些沒有被消費(fèi)成功的消息重新發(fā)送到 RabbitMQ Server 中,再次由消費(fèi)者進(jìn)行消費(fèi),直到這些消息成功地被消費(fèi)者消費(fèi)。

從上述消息重回隊(duì)列的基礎(chǔ)概念可以得出,消息重回隊(duì)列的原理圖,如下圖所示:

我們只看上圖的消費(fèi)端部分,當(dāng)消息隊(duì)列中的消息沒有被消費(fèi)者成功消費(fèi)后,首先 RabbitMQ Server 會(huì)返回給消費(fèi)端一個(gè) nack 的信號(hào);其次,該條沒有被消費(fèi)的消息會(huì)被放入重回隊(duì)列中;最后,RabbitMQ Server 會(huì)從這個(gè)重回隊(duì)列中獲取該條消息,并重新發(fā)送到 RabbitMQ Server 中等待消費(fèi)。

上述過程就是我們的重回隊(duì)列機(jī)制處理未被消費(fèi)的消息的全過程,同學(xué)們需要對(duì)此有所了解才行。

接下來我們來看一下,消息重回隊(duì)列如何來進(jìn)行代碼配置。

代碼實(shí)現(xiàn):

實(shí)現(xiàn)消息重回隊(duì)列機(jī)制,也是只需要在消費(fèi)端進(jìn)行配置即可,代碼如下:

// 省略客戶端連接 RabbitMQ Server 的過程
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
DefaultConsumer defaultConsumer = new DefaultCnsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        if (properties.getHeaders().get("type") != "utf-8") {
            channel.basicNack(envelope.getDeliveryTag(), false, true)
        }
    }
};

代碼解釋:

第 1-2 行,我們創(chuàng)建了客戶端連接 RabbitMQ Server 的連接,并且創(chuàng)建了一個(gè) channel 。

第 8-11 行,我們?cè)?handleDelivery 方法的內(nèi)部,添加了一個(gè)判斷條件,當(dāng)滿足該條件時(shí),意味著該條消息沒有被成功消費(fèi),返回給消費(fèi)端 nack 信號(hào),這個(gè)過程是通過 channel 的 basicNack 方法實(shí)現(xiàn)。

Tips: 1. 在 channel 的 basicNack 方法中,方法的第二個(gè)參數(shù)表示是否批量確認(rèn),這里設(shè)置為了 false ,表示不進(jìn)行批量確認(rèn),第三個(gè)參數(shù)表示是否啟用消息重回隊(duì)列,這里設(shè)置為了 true ,表示啟用重回隊(duì)列,默認(rèn)情況下,該屬性是 false ,即不啟用重回隊(duì)列;
2. 在實(shí)際工作中,我們一般也會(huì)關(guān)閉消息重回隊(duì)列,使用 RabbitMQ 自帶的措施和打印日志的方式來進(jìn)行補(bǔ)償,RabbitMQ 默認(rèn)會(huì)將那些沒有被成功消費(fèi)的消息刪除,不會(huì)放入重回隊(duì)列中。

4. 小結(jié)

本小節(jié)為同學(xué)們介紹了 RabbitMQ 中的消費(fèi)者 ACK ,以及消息重回隊(duì)里機(jī)制。從消費(fèi)者 ACK 與消息重回隊(duì)列機(jī)制的基礎(chǔ)概念開始,到不同機(jī)制的代碼實(shí)現(xiàn)結(jié)束,詳細(xì)介紹了什么是消費(fèi)者 ACK、什么是重回隊(duì)列,以及消息重回隊(duì)列機(jī)制,且通過不同機(jī)制的代碼實(shí)現(xiàn),分別闡述了如何通過代碼來對(duì)兩種機(jī)制進(jìn)行配置。希望同學(xué)們可以完全理解兩種機(jī)制的基礎(chǔ)概念和實(shí)現(xiàn)方式。