RabbitMQ 高級應用_網頁設計公司

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

節能減碳愛地球是景泰電動車的理念,是創立景泰電動車行的初衷,滿意態度更是服務客戶的最高品質,我們的成長來自於你的推薦。

 

 

本文是作者原創,版權歸作者所有.若要轉載,請註明出處.

本文RabbitMQ版本為rabbitmq-server-3.7.17,erlang為erlang-22.0.7.請各位去官網查看版本匹配和下載,也可以留言,我發安裝包

 

過期時間TTL(Time To Live)

過期時間TTL表示可以對消息設置預期的時間,在這個時間內都可以被消費者接收穫取;過了之後消息將自動被刪除。RabbitMQ可以對消息和隊列設置TTL。目前有兩種方法可以設置。

  • 第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。
  • 第二種方法是對消息進行單獨設置,每條消息TTL可以不同。

如果上述兩種方法同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為準。消息在隊列的生存時間一旦超過設置的TTL值,就稱為dead message被投遞到死信隊列, 消費者將無法再收到該消息。

1. 設置隊列TTL

配置resources\spring\spring-rabbitmq.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!--定義過期隊列及其屬性,不存在則自動創建-->
    <rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true">
        <rabbit:queue-arguments>
            <!--投遞到該隊列的消息如果沒有消費都將在6秒之後被刪除-->
            <entry key="x-message-ttl" value-type="long" value="6000"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
<rabbit:connection-factory id="connectionFactory" host="192.168.75.163"
port="5672"
username="test01"
password="test01"
virtual-host="/hello"

/>
</beans>

參數 x-message-ttl 的值 必須是非負 32 位整數 (0 <= n <= 2^32-1) ,以毫秒為單位表示 TTL 的值。這樣,值 6000 表示存在於 隊列 中的當前 消息 將最多只存活 6 秒鐘

如果不設置TTL,則表示此消息不會過期。如果將TTL設置為0,則表示除非此時可以直接將消息投遞到消費者,否則該消息會被立即丟棄。

 

然後在測試類中編寫如下方法發送消息到上述定義的隊列:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 過期隊列消息
     * 投遞到該隊列的消息如果沒有消費都將在6秒之後被刪除
     */
    @Test
    public void ttlQueueTest(){
        //路由鍵與隊列同名
        rabbitTemplate.convertAndSend("my_ttl_queue", "發送到過期隊列my_ttl_queue,6秒內不消費則不能再被消費。");
    }

}

啟動類中導入配置文件

//導入配置文件
@ImportResource("classpath:/spring/spring-rabbitmq.xml")
@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

}

執行單元測試,看結果

 

 

 6秒后,再看結果

 

 

 

2. 設置消息TTL

消息的過期時間;只需要在發送消息(可以發送到任何隊列,不管該隊列是否屬於某個交換機)的時候設置過期時間即可。

在測試類中編寫如下方法發送消息並設置過期時間到隊列:

/**
     * 過期消息
     * 該消息投遞任何交換機或隊列中的時候;如果到了過期時間則將從該隊列中刪除
     */
    @Test
    public void ttlMessageTest(){
        MessageProperties messageProperties = new MessageProperties();
        //設置消息的過期時間,3秒
        messageProperties.setExpiration("3000");
        Message message = new Message("測試過期消息,3秒鐘過期".getBytes(), messageProperties);
        //路由鍵與隊列同名
        rabbitTemplate.convertAndSend("my_ttl_queue", message);
    }

expiration 字段以微秒為單位表示 TTL 值。且與 x-message-ttl 具有相同的約束條件。因為 expiration 字段必須為字符串類型,broker 將只會接受以字符串形式表達的数字。

當同時指定了 queue 和 message 的 TTL 值,則兩者中較小的那個才會起作用。

 

死信隊列

DLX,全稱為Dead-Letter-Exchange , 可以稱之為死信交換機,也有人稱之為死信郵箱。

當消息在一個隊列中變成死信(dead message)之後,它能被重新發送到另一個交換機中,這個交換機就是DLX ,綁定DLX的隊列就稱之為死信隊列。

消息變成死信,可能是由於以下的原因:

  • 消息被拒絕
  • 消息過期
  • 隊列達到最大長度

DLX也是一個正常的交換機,和一般的交換機沒有區別,它能在任何的隊列上被指定,實際上就是設置某一個隊列的屬性。

當這個隊列中存在死信時,Rabbitmq就會自動地將這個消息重新發布到設置的DLX上去,進而被路由到另一個隊列,即死信隊列。

要想使用死信隊列,只需要在定義隊列的時候設置隊列參數 x-dead-letter-exchange 指定交換機即可。

 

 

具體步驟如下

1. 定義死信交換機

<!--定義定向交換機中的持久化死信隊列,不存在則自動創建-->
<rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>

<!--定義廣播類型交換機;並綁定上述隊列-->
<rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true">
<rabbit:bindings>
<!--綁定路由鍵my_ttl_dlx、my_max_dlx,可以將過期的消息轉移到my_dlx_queue隊列-->
<rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/>
<rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>

2. 隊列設置死信交換機

<!--定義過期隊列及其屬性,不存在則自動創建-->
<rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投遞到該隊列的消息如果沒有消費都將在6秒之後被投遞到死信交換機-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
<!--設置當消息過期后投遞到對應的死信交換機-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>

<!--定義限制長度的隊列及其屬性,不存在則自動創建-->
<rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投遞到該隊列的消息最多2個消息,如果超過則最早的消息被刪除投遞到死信交換機-->
<entry key="x-max-length" value-type="long" value="2"/>
<!--設置當消息過期后投遞到對應的死信交換機-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>

<!--定義定向交換機 根據不同的路由key投遞消息-->
<rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/>
<rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>

3. 消息過期的死信隊列測試

/**
     * 過期消息投遞到死信隊列
     * 投遞到一個正常的隊列,但是該隊列有設置過期時間,到過期時間之後消息會被投遞到死信交換機(隊列)
     */
    @Test
    public void dlxTTLMessageTest(){
        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_ttl_dlx",
                "測試過期消息;6秒過期後會被投遞到死信交換機2222");
    }

運行看結果

 

 6秒后

 

 

4. 消息溢出的死信隊列測試

/**
     * 消息長度超過2,會投遞到死信隊列中
     */
    @Test
    public void dlxMaxMessageTest(){
        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_max_dlx",
                "發送消息4:消息長度超過2,會被投遞到死信隊列中!");

        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_max_dlx",
                "發送消息5:消息長度超過2,會被投遞到死信隊列中!");

        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_max_dlx",
                "發送消息6:消息長度超過2,會被投遞到死信隊列中!");
        
    }

運行,看結果

 

 

 

 上面發送的3條消息中的第1條消息會被投遞到死信隊列中

 

延遲隊列

延遲隊列存儲的對象是對應的延遲消息;所謂“延遲消息” 是指當消息被發送以後,並不想讓消費者立刻拿到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費。

在RabbitMQ中延遲隊列可以通過 過期時間 + 死信隊列 來實現

 

 

延遲隊列的應用場景;如:

  • 在電商項目中的支付場景;如果在用戶下單之後的幾十分鐘內沒有支付成功;那麼這個支付的訂單算是支付失敗,要進行支付失敗的異常處理(將庫存加回去),這時候可以通過使用延遲隊列來處理
  • 在系統中如有需要在指定的某個時間之後執行的任務都可以通過延遲隊列處理

具體代碼不演示了

 

消息確認機制

確認並且保證消息被送達,提供了兩種方式:發布確認和事務。(兩者不可同時使用)在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。

1 發布確認

消息發送成功確認

connectionFactory 中啟用消息確認:

<rabbit:connection-factory id="connectionFactory" host="192.168.75.163"
                               port="5672"
                               username="test01"
                               password="test01"
                               virtual-host="/hello"
                               publisher-confirms="true"
    />

配置消息確認回調方法如下:

<!-- 消息回調處理類 -->
<bean id="confirmCallback" class="com.itheima.rabbitmq.MsgSendConfirmCallBack"/>
<!--定義rabbitTemplate對象操作可以在代碼中方便發送消息-->
<!-- confirm-callback="confirmCallback" 表示:消息失敗回調 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" 
        confirm-callback="confirmCallback"/>

消息確認回調方法如下:

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息確認成功....");
        } else {
            //處理丟失的消息
            System.out.println("消息確認失敗," + cause);
        }
    }
}

我們手動建一個spring_queue隊列.並測試如下:

 @Test
    public void queueTest(){
        //路由鍵與隊列同名
        rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息。");
    }

查看結果

 

 

 

 消息發送失敗回調

connectionFactory 中啟用回調:

!-- publisher-returns="true" 表示:啟用了失敗回調 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
        port="${rabbitmq.port}"
        username="${rabbitmq.username}"
        password="${rabbitmq.password}"
        virtual-host="${rabbitmq.virtual-host}"
        publisher-returns="true" />

配置消息失敗回調方法如下:

<!-- 消息失敗回調類 -->
<bean id="sendReturnCallback" class="com.itheima.rabbitmq.MsgSendReturnCallback"/>
<!-- return-callback="sendReturnCallback" 表示:消息失敗回調 ,同時需配置mandatory="true",否則消息則丟失-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
        confirm-callback="confirmCallback" return-callback="sendReturnCallback" 
        mandatory="true"/>

注意:同時需配置mandatory=”true”,否則消息則丟失

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

透過選單樣式的調整、圖片的縮放比例、文字的放大及段落的排版對應來給使用者最佳的瀏覽體驗,所以不用擔心有手機版網站兩個後台的問題,而視覺效果也是透過我們前端設計師優秀的空間比例設計,不會因為畫面變大變小而影響到整體視覺的美感。

 

消息失敗回調方法如下:

public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        String msgJson  = new String(message.getBody());
        System.out.println("Returned Message:"+msgJson);
    }
}

模擬消息發送失敗,功能測試如下:

@Test
public void testFailQueueTest() throws InterruptedException {
    //exchange 正確,queue 錯誤 ,confirm被回調, ack=true; return被回調 replyText:NO_ROUTE
    amqpTemplate.convertAndSend("test_fail_exchange", "", "測試消息發送失敗進行確認應答。");
}

失敗回調結果如下:

 

 

事務支持

場景:業務處理伴隨消息的發送,業務處理失敗(事務回滾)后要求消息不發送。rabbitmq 使用調用者的外部事務,通常是首選,因為它是非侵入性的(低耦合)。

外部事務的配置

<rabbit:template id="rabbitTemplate"
                     connection-factory="connectionFactory"

                     mandatory="true"
                     channel-transacted="true"
    />

    <!--平台事務管理器-->
    <bean id="transactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

測試類或者測試方法上加入@Transactional註解

   @Test
    @Transactional //開啟事務
    //@Rollback(false)//在測試的時候,需要手動的方式制定回滾的策略
    public void queueTest2(){
        //路由鍵與隊列同名
        rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02222222222222222222。");
        System.out.println("----------------dosoming:可以是數據庫的操作,也可以是其他業務類型的操作---------------");
        //模擬業務處理失敗
        //System.out.println(1/0);
        rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02。");
    }

運行看結果

 

 

 

 

 

 因為是測試類,所以spring自動回滾了,需要我們手動禁止回滾

   @Test
    @Transactional //開啟事務
    @Rollback(false)//在測試的時候,需要手動的方式制定回滾的策略
    public void queueTest2(){
        //路由鍵與隊列同名
        rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02222222222222222222。");
        System.out.println("----------------dosoming:可以是數據庫的操作,也可以是其他業務類型的操作---------------");
        //模擬業務處理失敗
        //System.out.println(1/0);
        rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02。");
    }

再執行一次測試類,查看結果

 

 

 我們手動弄個異常,再試一次

  @Test
    @Transactional //開啟事務
    @Rollback(false)//在測試的時候,需要手動的方式制定回滾的策略
    public void queueTest2(){
        //路由鍵與隊列同名
        rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02222222222222222222。");
        System.out.println("----------------dosoming:可以是數據庫的操作,也可以是其他業務類型的操作---------------");
        //模擬業務處理失敗
        System.out.println(1/0);
        rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02。");
    }

看結果

 

 

 這裏4條是因為我手動禁止了回滾

結果應該是沒問題的,就不測試了

 

消息追蹤

1.消息追蹤啟用與查看

消息中心的消息追蹤需要使用Trace實現,Trace是Rabbitmq用於記錄每一次發送的消息,方便使用Rabbitmq的開發者調試、排錯。可通過插件形式提供可視化界面。

Trace啟動後會自動創建系統Exchange:amq.rabbitmq.trace ,每個隊列會自動綁定該Exchange,綁定后發送到隊列的消息都會記錄到Trace日誌。

 

查看插件列表

rabbitmq-plugins list

 

 

 

 

rabbitmq啟用trace插件

rabbitmq-plugins enable rabbitmq_tracing

命令集 描述
rabbitmq-plugins list 查看插件列表
rabbitmq-plugins enable rabbitmq_tracing rabbitmq啟用trace插件
rabbitmqctl trace_on 打開trace的開關
rabbitmqctl trace_on -p test01 打開trace的開關(itcast為需要日誌追蹤的vhost)
rabbitmqctl trace_off 關閉trace的開關
rabbitmq-plugins disable rabbitmq_tracing rabbitmq關閉Trace插件
rabbitmqctl set_user_tags test01 administrator 只有administrator的角色才能查看日誌界面

 

 

 

 

 

 

 

 

 

 

 安裝插件並開啟 trace_on 之後,會發現多個 exchange:amq.rabbitmq.trace ,類型為:topic。

 

 

2 日誌追蹤

首先重新登錄管理控制台

 

 

第一步:發送消息

@Test
    public void queueTest3() throws InterruptedException {
        rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--01。");
    }

 

 第二步:查看trace

第三步:點擊Tracing查看Trace log files

 

 

第四步:點擊itest-log.log確認消息軌跡正確性

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

搬家費用:依消費者運送距離、搬運樓層、有無電梯、步行距離、特殊地形、超重物品等計價因素後,評估每車次單

您可能也會喜歡…