国产成人精品18p,天天干成人网,无码专区狠狠躁天天躁,美女脱精光隐私扒开免费观看

rabbitmq五種模式詳解(含實(shí)現代碼)

發(fā)布時(shí)間:2021-07-06 11:12 來(lái)源:腳本之家 閱讀:0 作者:大帥小站 欄目: 開(kāi)發(fā)技術(shù)

目錄

一、五種模式詳解

1.簡(jiǎn)單模式(Queue模式)

當生產(chǎn)端發(fā)送消息到交換機,交換機根據消息屬性發(fā)送到隊列,消費者監聽(tīng)綁定隊列實(shí)現消息的接收和消費邏輯編寫(xiě).簡(jiǎn)單模式下,強調的一個(gè)隊列queue只被一個(gè)消費者監聽(tīng)消費.

1.1 結構

生產(chǎn)者:生成消息,發(fā)送到交換機交換機:根據消息屬性,將消息發(fā)送給隊列消費者:監聽(tīng)這個(gè)隊列,發(fā)現消息后,獲取消息執行消費邏輯

1.2應用場(chǎng)景

常見(jiàn)的應用場(chǎng)景就是一發(fā),一接的結構
例如:

手機短信郵件單發(fā)

2.爭搶模式(Work模式)

強調的也是后端隊列與消費者綁定的結構

2.1結構

生產(chǎn)者:發(fā)送消息到交換機交換機:根據消息屬性將消息發(fā)送給隊列消費者:多個(gè)消費者,同時(shí)綁定監聽(tīng)一個(gè)隊列,之間形成了爭搶消息的效果

2.2應用場(chǎng)景

  1. 搶紅包
  2. 資源分配系統

3.路由模式(Route模式 Direct定向)

從路由模式開(kāi)始,關(guān)心的就是消息如何到達的隊列,路由模式需要使用的交換機類(lèi)型就是路由交換機(direct)

3.1 結構

  • 生產(chǎn)端:發(fā)送消息,在消息中處理消息內容,攜帶一個(gè)routingkey
  • 交換機:接收消息,根據消息的routingkey去計算匹配后端隊列的routingkey
  • 隊列:存儲交換機發(fā)送的消息
  • 消費端:簡(jiǎn)單模式 工作爭搶

3.2應用場(chǎng)景

  • 短信
  • 聊天工具
  • 郵箱。。

手機號/郵箱地址,都可以是路由key

4.發(fā)布訂閱模式(Pulish/Subscribe模式 Fanout廣播)

不計算路由的一種特殊交換機

4.1結構

4.2應用場(chǎng)景

  • 消息推送
  • 廣告

5.主題模式(Topics模式 Tpoic通配符)

路由key值是一種多級路徑。中國.四川.成都.武侯區

5.1結構

生產(chǎn)端:攜帶路由key,發(fā)送消息到交換機

隊列:綁定交換機和路由不一樣,不是一個(gè)具體的路由key,而可以使用*和#代替一個(gè)范圍
| * | 字符串,只能表示一級 |
| --- | --- |
| # | 多級字符串 |

交換機:根據匹配規則,將路由key對應發(fā)送到隊列

消息路由key:

  • 北京市.朝陽(yáng)區.酒仙橋
  • 北京市.#: 匹配true
  • 上海市.浦東區.*: 沒(méi)匹配false
  • 新疆.烏魯木齊.#

5.2 應用場(chǎng)景

做物流分揀的多級傳遞.

6.完整結構

二、代碼實(shí)現

1.創(chuàng )建SpringBoot工程

1.1 工程基本信息

1.2 依賴(lài)信息

1.3 配置文件applicasion.properties

# 應用名稱(chēng)
spring.application.name=springboot-demo
# Actuator Web 訪(fǎng)問(wèn)端口
management.server.port=8801
management.endpoints.jmx.exposure.include=*
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
# 應用服務(wù) WEB 訪(fǎng)問(wèn)端口
server.port=8801

######################### RabbitMQ配置 ########################
# RabbitMQ主機
spring.rabbitmq.host=127.0.0.1
# RabbitMQ虛擬主機
spring.rabbitmq.virtual-host=demo
# RabbitMQ服務(wù)端口
spring.rabbitmq.port=5672
# RabbitMQ服務(wù)用戶(hù)名
spring.rabbitmq.username=admin
# RabbitMQ服務(wù)密碼
spring.rabbitmq.password=admin
# RabbitMQ服務(wù)發(fā)布確認屬性配置
## NONE值是禁用發(fā)布確認模式,是默認值
## CORRELATED值是發(fā)布消息成功到交換器后會(huì )觸發(fā)回調方法
## SIMPLE值經(jīng)測試有兩種效果,其一效果和CORRELATED值一樣會(huì )觸發(fā)回調方法,其二在發(fā)布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點(diǎn)返回發(fā)送結果,根據返回結果來(lái)判定下一步的邏輯,要注意的點(diǎn)是waitForConfirmsOrDie方法如果返回false則會(huì )關(guān)閉channel,則接下來(lái)無(wú)法發(fā)送消息到broker;
spring.rabbitmq.publisher-confirm-type=simple
# RabbitMQ服務(wù)開(kāi)啟消息發(fā)送確認
spring.rabbitmq.publisher-returns=true
######################### simple模式配置 ########################
# RabbitMQ服務(wù) 消息接收確認模式
## NONE:不確認
## AUTO:自動(dòng)確認
## MANUAL:手動(dòng)確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 指定最小的消費者數量
spring.rabbitmq.listener.simple.concurrency=1
# 指定最大的消費者數量
spring.rabbitmq.listener.simple.max-concurrency=1
# 開(kāi)啟支持重試
spring.rabbitmq.listener.simple.retry.enabled=true

2.簡(jiǎn)單模式

2.1 創(chuàng )建SimpleQueueConfig 簡(jiǎn)單隊列配置類(lèi)

package com.gmtgo.demo.simple;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 大帥
 */
@Configuration
public class SimpleQueueConfig {
    /**
     * 定義簡(jiǎn)單隊列名.
     */
    private final String simpleQueue = "queue_simple";
    @Bean
    public Queue simpleQueue() {
        return new Queue(simpleQueue);
    }
}

2.2 編寫(xiě)生產(chǎn)者

package com.gmtgo.demo.simple;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author 大帥
 */
@Slf4j
@Component
public class SimpleProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        for (int i = 0; i < 5; i++) {
            String message = "簡(jiǎn)單消息" + i;
            log.info("我是生產(chǎn)信息:{}", message);
            rabbitTemplate.convertAndSend( "queue_simple", message);
        }
    }
}

2.3 編寫(xiě)消費者

package com.gmtgo.demo.simple;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 大帥
 */
@Slf4j
@Component
public class SimpleConsumers {

    @RabbitListener(queues = "queue_simple")
    public void readMessage(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("我是消費信息:{}", new String(message.getBody()));
    }
}

2.4 編寫(xiě)訪(fǎng)問(wèn)類(lèi)

package com.gmtgo.demo.simple;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 大帥
 */
@RestController
@RequestMapping(value = "/rabbitMq")
public class SimpleRabbitMqController {

    @Autowired
    private SimpleProducer simpleProducer;

    @RequestMapping(value = "/simpleQueueTest")
    public String simpleQueueTest() {
        simpleProducer.sendMessage();
        return "success";
    }
}

2.5 測試啟動(dòng)項目訪(fǎng)問(wèn) simpleQueueTest

訪(fǎng)問(wèn)地址:

結果:

3.Work隊列

3.1 編寫(xiě)工作配置

package com.gmtgo.demo.work;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 大帥
 */
@Configuration
public class WorkQueueConfig {

    /**
     * 隊列名.
     */
    private final String work = "work_queue";

    @Bean
    public Queue workQueue() {
        return new Queue(work);
    }
}

3.2 編寫(xiě)生產(chǎn)者

package com.gmtgo.demo.work;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author 大帥
 */
@Slf4j
@Component
public class WorkProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        for (int i = 0; i < 10; i++) {
            String message = "工作消息" + i;
            log.info("我是生產(chǎn)信息:{}", message);
            rabbitTemplate.convertAndSend("work_queue", message);
        }
    }
}

3.3 編寫(xiě)消費者1

package com.gmtgo.demo.work;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 大帥
 */
@Slf4j
@Component
public class WorkConsumers1 {

    @RabbitListener(queues = "work_queue")
    public void readMessage(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("我是消費信息1:{}", new String(message.getBody()));
    }
}

3.4 編寫(xiě)消費者2

package com.gmtgo.demo.work;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 大帥
 */
@Slf4j
@Component
public class WorkConsumers2 {

    @RabbitListener(queues = "work_queue")
    public void readMessage(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("我是消費信息2:{}", new String(message.getBody()));
    }
}

3.5 編寫(xiě)測試方法

package com.gmtgo.demo.work;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 大帥
 */
@RestController
@RequestMapping(value = "rabbitMq")
public class WorkRabbitMqController {
    @Autowired
    private WorkProducer workProducer;

    @RequestMapping(value = "workQueueTest")
    public String workQueueTest() {
        workProducer.sendMessage();
        return "success";
    }
}

3.6 測試啟動(dòng)項目訪(fǎng)問(wèn) workQueueTest

訪(fǎng)問(wèn)地址

結果:

控制臺打印,發(fā)現10條消息 偶數條消費者1獲取,奇數條消費者2獲取,并且平均分配。
當然通過(guò)代碼實(shí)現按需分配,即誰(shuí)的性能強,誰(shuí)優(yōu)先原則,實(shí)現負載均衡。
配置可控分配數

4. 發(fā)布訂閱模式(Publish/Subscibe模式)

訂閱模式–多個(gè)消費者監聽(tīng)不同的隊列,但隊列都綁定同一個(gè)交換機

4.1 編寫(xiě)訂閱配置類(lèi)

package com.gmtgo.demo.fanout;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 大帥
 */
@Configuration
public class FanoutQueueConfig {

    /**
     * 聲明隊列名.
     */
    private final String fanout1 = "fanout_queue_1";

    private final String fanout2 = "fanout_queue_2";

    /**
     * 聲明交換機的名字.
     */
    private final String fanoutExchange = "fanoutExchange";

    /**
     * 聲明隊列.
     *
     * @return
     */
    @Bean
    public Queue fanoutQueue1() {
        return new Queue(fanout1);
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue(fanout2);
    }

    /**
     * 聲明交換機.
     */
    @Bean
    public FanoutExchange exchange() {
        return new FanoutExchange(fanoutExchange);
    }

    /**
     * 隊列綁定交換機,也可在可視化工具中進(jìn)行綁定.
     *
     * @return
     */
    @Bean
    public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange exchange) {
        return BindingBuilder.bind(fanoutQueue1).to(exchange);
    }

    @Bean
    public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange exchange) {
        return BindingBuilder.bind(fanoutQueue2).to(exchange);
    }
}

4.2 編寫(xiě)訂閱生產(chǎn)者

package com.gmtgo.demo.fanout;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author 大帥
 */
@Slf4j
@Component
public class FanoutProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        for (int i = 0; i < 5; i++) {
            String message = "訂閱模式消息" + i;
            log.info("我是生產(chǎn)信息:{}", message);
            rabbitTemplate.convertAndSend("fanoutExchange", "", message);
        }
    }
}

4.3 編寫(xiě)訂閱消費者1

package com.gmtgo.demo.fanout;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


/**
 * @author 大帥
 */
@Slf4j
@Component
public class FanoutConsumers1 {

    @RabbitListener(queues = "fanout_queue_1")
    public void readMessage(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("我是消費信息1:{}", new String(message.getBody()));
    }
}

4.4 編寫(xiě)訂閱消費者2

package com.gmtgo.demo.fanout;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 大帥
 */
@Slf4j
@Component
public class FanoutConsumers2 {

    @RabbitListener(queues = "fanout_queue_2")
    public void readMessage(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("我是消費信息2:{}", new String(message.getBody()));
    }
}

4.5 編寫(xiě)測試方法

package com.gmtgo.demo.fanout;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 大帥
 */
@RestController
@RequestMapping(value = "rabbitMq")
public class FanoutRabbitMqController {
    @Autowired
    private FanoutProducer fanoutProducer;

    @RequestMapping(value = "fanoutQueueTest")
    public String fanoutQueueTest() {
        fanoutProducer.sendMessage();
        return "success";
    }
}

3.6 測試啟動(dòng)項目訪(fǎng)問(wèn) fanoutQueueTest

  • 訪(fǎng)問(wèn)地址
  • 結果:

控制臺打印 ,發(fā)現兩個(gè)綁定了不同隊列的消費者都接受到了同一條消息查看RabbitMq 服務(wù)器


5. 路由模式(Route模式 Direct定向)

5.1 編寫(xiě)路由配置類(lèi)

package com.gmtgo.demo.direct;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 大帥
 */
@Configuration
public class DirectQueueConfig {

    /**
     * 聲明隊列名.
     */
    private final String direct1 = "direct_queue_1";

    private final String direct2 = "direct_queue_2";

    /**
     * 聲明交換機的名字.
     */
    private final String directExchange = "directExchange";

    /**
     * 聲明隊列.
     *
     * @return
     */
    @Bean
    public Queue directQueue1() {
        return new Queue(direct1);
    }

    @Bean
    public Queue directQueue2() {
        return new Queue(direct2);
    }

    /**
     * 聲明路由交換機.
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(directExchange);
    }

    /**
     * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定.
     *
     * @return
     */
    @Bean
    Binding bindingDirectExchange1(Queue directQueue1, DirectExchange exchange) {
        return BindingBuilder.bind(directQueue1).to(exchange).with("update");
    }

    /**
     * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定.
     *
     * @return
     */
    @Bean
    Binding bindingDirectExchange2(Queue directQueue2, DirectExchange exchange) {
        return BindingBuilder.bind(directQueue2).to(exchange).with("add");
    }

}

5.2 編寫(xiě)生產(chǎn)者

package com.gmtgo.demo.direct;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author 大帥
 */
@Slf4j
@Component
public class DirectProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessageA() {
        for (int i = 0; i < 5; i++) {
            String message = "路由模式--routingKey=update消息" + i;
            log.info("我是生產(chǎn)信息:{}", message);
            rabbitTemplate.convertAndSend("directExchange", "update", message);
        }
    }

    public void sendMessageB() {
        for (int i = 0; i < 5; i++) {
            String message = "路由模式--routingKey=add消息" + i;
            log.info("我是生產(chǎn)信息:{}", message);
            rabbitTemplate.convertAndSend("directExchange", "add", message);
        }
    }
}

5.3 編寫(xiě)消費者1

package com.gmtgo.demo.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 大帥
 */
@Slf4j
@Component
public class DirectConsumers1 {

    @RabbitListener(queues = "direct_queue_1")
    public void readMessage(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("我是消費信息1:{}", new String(message.getBody()));
    }
}

5.4 編寫(xiě)消費者2

package com.gmtgo.demo.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 大帥
 */
@Slf4j
@Component
public class DirectConsumers2 {

    @RabbitListener(queues = "direct_queue_2")
    public void readMessage(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("我是消費信息2:{}", new String(message.getBody()));
    }
}

5.5 編寫(xiě)訪(fǎng)問(wèn)類(lèi)

package com.gmtgo.demo.direct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 大帥
 */
@RestController
@RequestMapping(value = "rabbitMq")
public class DirectRabbitMqController {
    @Autowired
    private DirectProducer directProducer;

    @RequestMapping(value = "directQueueTest1")
    public String directQueueTest1() {
        directProducer.sendMessageA();
        return "success";
    }

    @RequestMapping(value = "directQueueTest2")
    public String directQueueTest2() {
        directProducer.sendMessageB();
        return "success";
    }
}

5.6 測試啟動(dòng)項目訪(fǎng)問(wèn)directQueueTest1 , directQueueTest2

訪(fǎng)問(wèn)地址

訪(fǎng)問(wèn)地址

結果:directQueueTest1:

directQueueTest2:

6. 主題模式(Topics模式 Tpoic通配符)

6.1 編寫(xiě)路由配置類(lèi)

package com.gmtgo.demo.topic;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 大帥
 */
@Configuration
public class TopicQueueConfig {
    /**
     * 聲明隊列名.
     */
    private final String topic1 = "topic_queue_1";

    private final String topic2 = "topic_queue_2";

    /**
     * 聲明交換機的名字.
     */
    private final String topicExchange = "topicExchange";

    /**
     * 聲明隊列.
     *
     * @return
     */
    @Bean
    public Queue topicQueue1() {
        return new Queue(topic1);
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue(topic2);
    }

    /**
     * 聲明路由交換機.
     *
     * @return
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(topicExchange);
    }

    /**
     * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定.
     *
     * @return
     */
    @Bean
    Binding bindingTopicExchange1(Queue topicQueue1, TopicExchange exchange) {
        return BindingBuilder.bind(topicQueue1).to(exchange).with("topic.keyA");
    }

    /**
     * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定.
     * 綁定的routing key 也可以使用通配符:
     * *:匹配不多不少一個(gè)詞
     * #:匹配一個(gè)或多個(gè)詞
     *
     * @return
     */
    @Bean
    Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange exchange) {
        return BindingBuilder.bind(topicQueue2).to(exchange).with("topic.#");
    }
}

6.2 編寫(xiě)生產(chǎn)者

package com.gmtgo.demo.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author 大帥
 */
@Slf4j
@Component
public class TopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessageA() {
        for (int i = 0; i < 5; i++) {
            String message = "通配符模式--routingKey=topic.keyA消息" + i;
            log.info("我是生產(chǎn)信息:{}", message);
            rabbitTemplate.convertAndSend("topicExchange", "topic.keyA", message);
        }
    }

    public void sendMessageB() {
        for (int i = 0; i < 5; i++) {
            String message = "通配符模式--routingKey=topic.#消息" + i;
            log.info("我是生產(chǎn)信息:{}", message);
            rabbitTemplate.convertAndSend("topicExchange", "topic.keyD.keyE", message);
        }
    }
}

6.3 編寫(xiě)消費者1

package com.gmtgo.demo.topic;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
/**
 * @author 大帥
 */
@Slf4j
@Component
public class TopicConsumers1 {

    @RabbitListener(queues = "topic_queue_1")
    public void readMessage(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("我是消費信息1:{}",new String(message.getBody()));
    }
}

6.4 編寫(xiě)消費者2

package com.gmtgo.demo.topic;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
/**
 * @author 大帥
 */
@Slf4j
@Component
public class TopicConsumers2 {

    @RabbitListener(queues = "topic_queue_2")
    public void readMessage(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("我是消費信息2:{}",new String(message.getBody()));
    }
}

6.5 編寫(xiě)訪(fǎng)問(wèn)類(lèi)

package com.gmtgo.demo.topic;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 大帥
 */
@RestController
@RequestMapping(value = "rabbitMq")
public class TopicRabbitMqController {
    @Autowired
    private TopicProducer topicProducer;

    @RequestMapping(value = "topicQueueTest1")
    public String topicQueueTest1() {
        topicProducer.sendMessageA();
        return "success";
    }

    @RequestMapping(value = "topicQueueTest2")
    public String topicQueueTest2() {
        topicProducer.sendMessageB();
        return "success";
    }
}

6.6 測試啟動(dòng)項目訪(fǎng)問(wèn)topicQueueTest1 , topicQueueTest2

  • 訪(fǎng)問(wèn)地址
  • 訪(fǎng)問(wèn)地址
  • 結果:

topicQueueTest1,兩個(gè)消費者都能消費

topicQueueTest2,只有消費者2 可以消費

至此,五種隊列的實(shí)現已結束!

7. 實(shí)現生產(chǎn)者消息確認

7.1 配置文件

######################### RabbitMQ配置 ########################
# RabbitMQ主機
spring.rabbitmq.host=127.0.0.1
# RabbitMQ虛擬主機
spring.rabbitmq.virtual-host=demo
# RabbitMQ服務(wù)端口
spring.rabbitmq.port=5672
# RabbitMQ服務(wù)用戶(hù)名
spring.rabbitmq.username=admin
# RabbitMQ服務(wù)密碼
spring.rabbitmq.password=admin
# RabbitMQ服務(wù)發(fā)布確認屬性配置
## NONE值是禁用發(fā)布確認模式,是默認值
## CORRELATED值是發(fā)布消息成功到交換器后會(huì )觸發(fā)回調方法
## SIMPLE值經(jīng)測試有兩種效果,其一效果和CORRELATED值一樣會(huì )觸發(fā)回調方法,其二在發(fā)布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點(diǎn)返回發(fā)送結果,根據返回結果來(lái)判定下一步的邏輯,要注意的點(diǎn)是waitForConfirmsOrDie方法如果返回false則會(huì )關(guān)閉channel,則接下來(lái)無(wú)法發(fā)送消息到broker;
spring.rabbitmq.publisher-confirm-type=simple
# 連接超時(shí)時(shí)間
spring.rabbitmq.connection-timeout=20000
# RabbitMQ服務(wù)開(kāi)啟消息發(fā)送確認
spring.rabbitmq.publisher-returns=true
######################### simple模式配置 ########################
# RabbitMQ服務(wù) 消息接收確認模式
## NONE:不確認
## AUTO:自動(dòng)確認
## MANUAL:手動(dòng)確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 指定最小的消費者數量
spring.rabbitmq.listener.simple.concurrency=1
# 指定最大的消費者數量
spring.rabbitmq.listener.simple.max-concurrency=1
# 每次只消費一個(gè)消息
spring.rabbitmq.listener.simple.prefetch=1
# 開(kāi)啟支持重試
spring.rabbitmq.listener.simple.retry.enabled=true
# 啟用強制信息,默認為false
spring.rabbitmq.template.mandatory=true

7.2 編寫(xiě)消息發(fā)送確認類(lèi) RabbitConfirmCallback

package com.gmtgo.demo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
 * @author 大帥
 */
@Slf4j
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("=======ConfirmCallback=========");
        log.info("correlationData {} " , correlationData);
        log.info("ack = {}" , ack);
        log.info("cause = {}" , cause);
        log.info("=======ConfirmCallback=========");
    }
}

7.3 編寫(xiě)消息發(fā)送交換機返回機制RabbitConfirmReturnCallBack

package com.gmtgo.demo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * @author 大帥
 */
@Slf4j
public class RabbitConfirmReturnCallBack implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("--------------ReturnCallback----------------");
        log.info("message = " + message);
        log.info("replyCode = {}", replyCode);
        log.info("replyText = {}", replyText);
        log.info("exchange = {}", exchange);
        log.info("routingKey = {}", routingKey);
        log.info("--------------ReturnCallback----------------");
    }
}

7.4 RabbitMQ配置

在我們的rabbit隊列配置類(lèi)里設置RabbitTemplate
舉例:

package com.gmtgo.demo.topic;

import com.gmtgo.demo.config.RabbitConfirmCallback;
import com.gmtgo.demo.config.RabbitConfirmReturnCallBack;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @author 大帥
 */
@Configuration
public class TopicQueueConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void initRabbitTemplate() {
        // 設置生產(chǎn)者消息確認
        rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
        rabbitTemplate.setReturnCallback(new RabbitConfirmReturnCallBack());
    }

    /**
     * 聲明隊列名.
     */
    private final String topic1 = "topic_queue_1";

    private final String topic2 = "topic_queue_2";

    /**
     * 聲明交換機的名字.
     */
    private final String topicExchange = "topicExchange";

    /**
     * 聲明隊列.
     *
     * @return
     */
    @Bean
    public Queue topicQueue1() {
        return new Queue(topic1);
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue(topic2);
    }

    /**
     * 聲明路由交換機.
     *
     * @return
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(topicExchange);
    }

    /**
     * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定.
     *
     * @return
     */
    @Bean
    Binding bindingTopicExchange1(Queue topicQueue1, TopicExchange exchange) {
        return BindingBuilder.bind(topicQueue1).to(exchange).with("topic.keyA");
    }

    /**
     * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定.
     * 綁定的routing key 也可以使用通配符:
     * *:匹配不多不少一個(gè)詞
     * #:匹配一個(gè)或多個(gè)詞
     *
     * @return
     */
    @Bean
    Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange exchange) {
        return BindingBuilder.bind(topicQueue2).to(exchange).with("topic.#");
    }
}

啟動(dòng)項目發(fā)送消息,消息被正常消費,confim回調返回ack=true如果我們將exchange修改,發(fā)送到一個(gè)不存在的exchange中,會(huì )怎么樣呢?

會(huì )發(fā)現confirm回調為false,打印出結果為不存在topicExchange1111的交換機

如果我們在消費端處理邏輯時(shí)出錯會(huì )怎么樣呢?修改消費端代碼我們在消費時(shí)讓它報錯

confirm回調為true,但是在rabbitmq的web界面會(huì )發(fā)現存在5條沒(méi)有消費的消息

如果我們把

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

中最后一個(gè)參數改為false呢,會(huì )發(fā)現在web管理界面沒(méi)有未被消費的消息,說(shuō)明這條消息已經(jīng)被摒棄。

實(shí)際開(kāi)發(fā)中,到底是打回到隊列呢還是摒棄,要看自己的需求,但是打回隊列應該有次數限制,不然會(huì )陷入死循環(huán)。
繼續測試,將routingKey修改為一個(gè)沒(méi)有的key,

7.5 結論

  • 如果消息沒(méi)有到exchange,則confirm回調,ack=false
  • 如果消息到達exchange,則confirm回調,ack=true
  • exchange到queue成功,則不回調return
  • exchange到queue失敗,則回調return

8. 項目示例代碼:

下載地址:

到此這篇關(guān)于rabbitmq五種模式詳解(含實(shí)現代碼)的文章就介紹到這了,更多相關(guān)rabbitmq五種模式內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自互聯(lián)網(wǎng)轉載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權請聯(lián)系QQ:712375056 進(jìn)行舉報,并提供相關(guān)證據,一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容。

久久久久国色AV免费看| 中文字幕一区二区三区日韩精品| 亚洲乱码无人区卡1卡2卡3| 亚洲综合AV一区二区三区不卡| 久久亚洲国产成人精品无码区| 日韩人妻无码一区二区三区综合部|