- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) >
- rabbitmq五種模式詳解(含實(shí)現代碼)
當生產(chǎn)端發(fā)送消息到交換機,交換機根據消息屬性發(fā)送到隊列,消費者監聽(tīng)綁定隊列實(shí)現消息的接收和消費邏輯編寫(xiě).簡(jiǎn)單模式下,強調的一個(gè)隊列queue只被一個(gè)消費者監聽(tīng)消費.
1.1 結構
生產(chǎn)者:生成消息,發(fā)送到交換機交換機:根據消息屬性,將消息發(fā)送給隊列消費者:監聽(tīng)這個(gè)隊列,發(fā)現消息后,獲取消息執行消費邏輯
1.2應用場(chǎng)景
常見(jiàn)的應用場(chǎng)景就是一發(fā),一接的結構
例如:
手機短信郵件單發(fā)
強調的也是后端隊列與消費者綁定的結構
2.1結構
生產(chǎn)者:發(fā)送消息到交換機交換機:根據消息屬性將消息發(fā)送給隊列消費者:多個(gè)消費者,同時(shí)綁定監聽(tīng)一個(gè)隊列,之間形成了爭搶消息的效果
2.2應用場(chǎng)景
從路由模式開(kāi)始,關(guān)心的就是消息如何到達的隊列,路由模式需要使用的交換機類(lèi)型就是路由交換機(direct)
3.1 結構
3.2應用場(chǎng)景
手機號/郵箱地址,都可以是路由key
不計算路由的一種特殊交換機
4.1結構
4.2應用場(chǎng)景
路由key值是一種多級路徑。中國.四川.成都.武侯區
5.1結構
生產(chǎn)端:攜帶路由key,發(fā)送消息到交換機
隊列:綁定交換機和路由不一樣,不是一個(gè)具體的路由key,而可以使用*和#代替一個(gè)范圍
| * | 字符串,只能表示一級 |
| --- | --- |
| # | 多級字符串 |
交換機:根據匹配規則,將路由key對應發(fā)送到隊列
消息路由key:
5.2 應用場(chǎng)景
做物流分揀的多級傳遞.
1.1 工程基本信息
1.2 依賴(lài)信息
1.3 配置文件applicasion.properties
# 應用名稱(chēng) spring.application.name=springboot-demo # Actuator Web 訪(fǎng)問(wèn)端口 management.server.port=8801 management.endpoints.jmx.exposure.include=* management.endpoints.web.exposure.include=* management.endpoint.health.show-details=always # 應用服務(wù) WEB 訪(fǎng)問(wèn)端口 server.port=8801 ######################### RabbitMQ配置 ######################## # RabbitMQ主機 spring.rabbitmq.host=127.0.0.1 # RabbitMQ虛擬主機 spring.rabbitmq.virtual-host=demo # RabbitMQ服務(wù)端口 spring.rabbitmq.port=5672 # RabbitMQ服務(wù)用戶(hù)名 spring.rabbitmq.username=admin # RabbitMQ服務(wù)密碼 spring.rabbitmq.password=admin # RabbitMQ服務(wù)發(fā)布確認屬性配置 ## NONE值是禁用發(fā)布確認模式,是默認值 ## CORRELATED值是發(fā)布消息成功到交換器后會(huì )觸發(fā)回調方法 ## SIMPLE值經(jīng)測試有兩種效果,其一效果和CORRELATED值一樣會(huì )觸發(fā)回調方法,其二在發(fā)布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點(diǎn)返回發(fā)送結果,根據返回結果來(lái)判定下一步的邏輯,要注意的點(diǎn)是waitForConfirmsOrDie方法如果返回false則會(huì )關(guān)閉channel,則接下來(lái)無(wú)法發(fā)送消息到broker; spring.rabbitmq.publisher-confirm-type=simple # RabbitMQ服務(wù)開(kāi)啟消息發(fā)送確認 spring.rabbitmq.publisher-returns=true ######################### simple模式配置 ######################## # RabbitMQ服務(wù) 消息接收確認模式 ## NONE:不確認 ## AUTO:自動(dòng)確認 ## MANUAL:手動(dòng)確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 指定最小的消費者數量 spring.rabbitmq.listener.simple.concurrency=1 # 指定最大的消費者數量 spring.rabbitmq.listener.simple.max-concurrency=1 # 開(kāi)啟支持重試 spring.rabbitmq.listener.simple.retry.enabled=true
2.1 創(chuàng )建SimpleQueueConfig 簡(jiǎn)單隊列配置類(lèi)
package com.gmtgo.demo.simple; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 大帥 */ @Configuration public class SimpleQueueConfig { /** * 定義簡(jiǎn)單隊列名. */ private final String simpleQueue = "queue_simple"; @Bean public Queue simpleQueue() { return new Queue(simpleQueue); } }
2.2 編寫(xiě)生產(chǎn)者
package com.gmtgo.demo.simple; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author 大帥 */ @Slf4j @Component public class SimpleProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage() { for (int i = 0; i < 5; i++) { String message = "簡(jiǎn)單消息" + i; log.info("我是生產(chǎn)信息:{}", message); rabbitTemplate.convertAndSend( "queue_simple", message); } } }
2.3 編寫(xiě)消費者
package com.gmtgo.demo.simple; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帥 */ @Slf4j @Component public class SimpleConsumers { @RabbitListener(queues = "queue_simple") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消費信息:{}", new String(message.getBody())); } }
2.4 編寫(xiě)訪(fǎng)問(wèn)類(lèi)
package com.gmtgo.demo.simple; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 大帥 */ @RestController @RequestMapping(value = "/rabbitMq") public class SimpleRabbitMqController { @Autowired private SimpleProducer simpleProducer; @RequestMapping(value = "/simpleQueueTest") public String simpleQueueTest() { simpleProducer.sendMessage(); return "success"; } }
2.5 測試啟動(dòng)項目訪(fǎng)問(wèn) simpleQueueTest
訪(fǎng)問(wèn)地址:
結果:
3.1 編寫(xiě)工作配置
package com.gmtgo.demo.work; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 大帥 */ @Configuration public class WorkQueueConfig { /** * 隊列名. */ private final String work = "work_queue"; @Bean public Queue workQueue() { return new Queue(work); } }
3.2 編寫(xiě)生產(chǎn)者
package com.gmtgo.demo.work; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author 大帥 */ @Slf4j @Component public class WorkProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage() { for (int i = 0; i < 10; i++) { String message = "工作消息" + i; log.info("我是生產(chǎn)信息:{}", message); rabbitTemplate.convertAndSend("work_queue", message); } } }
3.3 編寫(xiě)消費者1
package com.gmtgo.demo.work; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帥 */ @Slf4j @Component public class WorkConsumers1 { @RabbitListener(queues = "work_queue") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消費信息1:{}", new String(message.getBody())); } }
3.4 編寫(xiě)消費者2
package com.gmtgo.demo.work; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帥 */ @Slf4j @Component public class WorkConsumers2 { @RabbitListener(queues = "work_queue") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消費信息2:{}", new String(message.getBody())); } }
3.5 編寫(xiě)測試方法
package com.gmtgo.demo.work; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 大帥 */ @RestController @RequestMapping(value = "rabbitMq") public class WorkRabbitMqController { @Autowired private WorkProducer workProducer; @RequestMapping(value = "workQueueTest") public String workQueueTest() { workProducer.sendMessage(); return "success"; } }
3.6 測試啟動(dòng)項目訪(fǎng)問(wèn) workQueueTest
訪(fǎng)問(wèn)地址
結果:
控制臺打印,發(fā)現10條消息 偶數條消費者1獲取,奇數條消費者2獲取,并且平均分配。
當然通過(guò)代碼實(shí)現按需分配,即誰(shuí)的性能強,誰(shuí)優(yōu)先原則,實(shí)現負載均衡。
配置可控分配數
訂閱模式–多個(gè)消費者監聽(tīng)不同的隊列,但隊列都綁定同一個(gè)交換機
4.1 編寫(xiě)訂閱配置類(lèi)
package com.gmtgo.demo.fanout; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 大帥 */ @Configuration public class FanoutQueueConfig { /** * 聲明隊列名. */ private final String fanout1 = "fanout_queue_1"; private final String fanout2 = "fanout_queue_2"; /** * 聲明交換機的名字. */ private final String fanoutExchange = "fanoutExchange"; /** * 聲明隊列. * * @return */ @Bean public Queue fanoutQueue1() { return new Queue(fanout1); } @Bean public Queue fanoutQueue2() { return new Queue(fanout2); } /** * 聲明交換機. */ @Bean public FanoutExchange exchange() { return new FanoutExchange(fanoutExchange); } /** * 隊列綁定交換機,也可在可視化工具中進(jìn)行綁定. * * @return */ @Bean public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange exchange) { return BindingBuilder.bind(fanoutQueue1).to(exchange); } @Bean public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange exchange) { return BindingBuilder.bind(fanoutQueue2).to(exchange); } }
4.2 編寫(xiě)訂閱生產(chǎn)者
package com.gmtgo.demo.fanout; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author 大帥 */ @Slf4j @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage() { for (int i = 0; i < 5; i++) { String message = "訂閱模式消息" + i; log.info("我是生產(chǎn)信息:{}", message); rabbitTemplate.convertAndSend("fanoutExchange", "", message); } } }
4.3 編寫(xiě)訂閱消費者1
package com.gmtgo.demo.fanout; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帥 */ @Slf4j @Component public class FanoutConsumers1 { @RabbitListener(queues = "fanout_queue_1") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消費信息1:{}", new String(message.getBody())); } }
4.4 編寫(xiě)訂閱消費者2
package com.gmtgo.demo.fanout; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帥 */ @Slf4j @Component public class FanoutConsumers2 { @RabbitListener(queues = "fanout_queue_2") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消費信息2:{}", new String(message.getBody())); } }
4.5 編寫(xiě)測試方法
package com.gmtgo.demo.fanout; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 大帥 */ @RestController @RequestMapping(value = "rabbitMq") public class FanoutRabbitMqController { @Autowired private FanoutProducer fanoutProducer; @RequestMapping(value = "fanoutQueueTest") public String fanoutQueueTest() { fanoutProducer.sendMessage(); return "success"; } }
3.6 測試啟動(dòng)項目訪(fǎng)問(wèn) fanoutQueueTest
控制臺打印 ,發(fā)現兩個(gè)綁定了不同隊列的消費者都接受到了同一條消息查看RabbitMq 服務(wù)器:
5.1 編寫(xiě)路由配置類(lèi)
package com.gmtgo.demo.direct; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 大帥 */ @Configuration public class DirectQueueConfig { /** * 聲明隊列名. */ private final String direct1 = "direct_queue_1"; private final String direct2 = "direct_queue_2"; /** * 聲明交換機的名字. */ private final String directExchange = "directExchange"; /** * 聲明隊列. * * @return */ @Bean public Queue directQueue1() { return new Queue(direct1); } @Bean public Queue directQueue2() { return new Queue(direct2); } /** * 聲明路由交換機. * * @return */ @Bean public DirectExchange directExchange() { return new DirectExchange(directExchange); } /** * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定. * * @return */ @Bean Binding bindingDirectExchange1(Queue directQueue1, DirectExchange exchange) { return BindingBuilder.bind(directQueue1).to(exchange).with("update"); } /** * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定. * * @return */ @Bean Binding bindingDirectExchange2(Queue directQueue2, DirectExchange exchange) { return BindingBuilder.bind(directQueue2).to(exchange).with("add"); } }
5.2 編寫(xiě)生產(chǎn)者
package com.gmtgo.demo.direct; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author 大帥 */ @Slf4j @Component public class DirectProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessageA() { for (int i = 0; i < 5; i++) { String message = "路由模式--routingKey=update消息" + i; log.info("我是生產(chǎn)信息:{}", message); rabbitTemplate.convertAndSend("directExchange", "update", message); } } public void sendMessageB() { for (int i = 0; i < 5; i++) { String message = "路由模式--routingKey=add消息" + i; log.info("我是生產(chǎn)信息:{}", message); rabbitTemplate.convertAndSend("directExchange", "add", message); } } }
5.3 編寫(xiě)消費者1
package com.gmtgo.demo.direct; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帥 */ @Slf4j @Component public class DirectConsumers1 { @RabbitListener(queues = "direct_queue_1") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消費信息1:{}", new String(message.getBody())); } }
5.4 編寫(xiě)消費者2
package com.gmtgo.demo.direct; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帥 */ @Slf4j @Component public class DirectConsumers2 { @RabbitListener(queues = "direct_queue_2") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消費信息2:{}", new String(message.getBody())); } }
5.5 編寫(xiě)訪(fǎng)問(wèn)類(lèi)
package com.gmtgo.demo.direct; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 大帥 */ @RestController @RequestMapping(value = "rabbitMq") public class DirectRabbitMqController { @Autowired private DirectProducer directProducer; @RequestMapping(value = "directQueueTest1") public String directQueueTest1() { directProducer.sendMessageA(); return "success"; } @RequestMapping(value = "directQueueTest2") public String directQueueTest2() { directProducer.sendMessageB(); return "success"; } }
5.6 測試啟動(dòng)項目訪(fǎng)問(wèn)directQueueTest1 , directQueueTest2
訪(fǎng)問(wèn)地址
訪(fǎng)問(wèn)地址
結果:directQueueTest1:
directQueueTest2:
6.1 編寫(xiě)路由配置類(lèi)
package com.gmtgo.demo.topic; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 大帥 */ @Configuration public class TopicQueueConfig { /** * 聲明隊列名. */ private final String topic1 = "topic_queue_1"; private final String topic2 = "topic_queue_2"; /** * 聲明交換機的名字. */ private final String topicExchange = "topicExchange"; /** * 聲明隊列. * * @return */ @Bean public Queue topicQueue1() { return new Queue(topic1); } @Bean public Queue topicQueue2() { return new Queue(topic2); } /** * 聲明路由交換機. * * @return */ @Bean public TopicExchange topicExchange() { return new TopicExchange(topicExchange); } /** * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定. * * @return */ @Bean Binding bindingTopicExchange1(Queue topicQueue1, TopicExchange exchange) { return BindingBuilder.bind(topicQueue1).to(exchange).with("topic.keyA"); } /** * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定. * 綁定的routing key 也可以使用通配符: * *:匹配不多不少一個(gè)詞 * #:匹配一個(gè)或多個(gè)詞 * * @return */ @Bean Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange exchange) { return BindingBuilder.bind(topicQueue2).to(exchange).with("topic.#"); } }
6.2 編寫(xiě)生產(chǎn)者
package com.gmtgo.demo.topic; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author 大帥 */ @Slf4j @Component public class TopicProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessageA() { for (int i = 0; i < 5; i++) { String message = "通配符模式--routingKey=topic.keyA消息" + i; log.info("我是生產(chǎn)信息:{}", message); rabbitTemplate.convertAndSend("topicExchange", "topic.keyA", message); } } public void sendMessageB() { for (int i = 0; i < 5; i++) { String message = "通配符模式--routingKey=topic.#消息" + i; log.info("我是生產(chǎn)信息:{}", message); rabbitTemplate.convertAndSend("topicExchange", "topic.keyD.keyE", message); } } }
6.3 編寫(xiě)消費者1
package com.gmtgo.demo.topic; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帥 */ @Slf4j @Component public class TopicConsumers1 { @RabbitListener(queues = "topic_queue_1") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消費信息1:{}",new String(message.getBody())); } }
6.4 編寫(xiě)消費者2
package com.gmtgo.demo.topic; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帥 */ @Slf4j @Component public class TopicConsumers2 { @RabbitListener(queues = "topic_queue_2") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消費信息2:{}",new String(message.getBody())); } }
6.5 編寫(xiě)訪(fǎng)問(wèn)類(lèi)
package com.gmtgo.demo.topic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 大帥 */ @RestController @RequestMapping(value = "rabbitMq") public class TopicRabbitMqController { @Autowired private TopicProducer topicProducer; @RequestMapping(value = "topicQueueTest1") public String topicQueueTest1() { topicProducer.sendMessageA(); return "success"; } @RequestMapping(value = "topicQueueTest2") public String topicQueueTest2() { topicProducer.sendMessageB(); return "success"; } }
6.6 測試啟動(dòng)項目訪(fǎng)問(wèn)topicQueueTest1 , topicQueueTest2
topicQueueTest1,兩個(gè)消費者都能消費
topicQueueTest2,只有消費者2 可以消費
至此,五種隊列的實(shí)現已結束!
7.1 配置文件
######################### RabbitMQ配置 ######################## # RabbitMQ主機 spring.rabbitmq.host=127.0.0.1 # RabbitMQ虛擬主機 spring.rabbitmq.virtual-host=demo # RabbitMQ服務(wù)端口 spring.rabbitmq.port=5672 # RabbitMQ服務(wù)用戶(hù)名 spring.rabbitmq.username=admin # RabbitMQ服務(wù)密碼 spring.rabbitmq.password=admin # RabbitMQ服務(wù)發(fā)布確認屬性配置 ## NONE值是禁用發(fā)布確認模式,是默認值 ## CORRELATED值是發(fā)布消息成功到交換器后會(huì )觸發(fā)回調方法 ## SIMPLE值經(jīng)測試有兩種效果,其一效果和CORRELATED值一樣會(huì )觸發(fā)回調方法,其二在發(fā)布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點(diǎn)返回發(fā)送結果,根據返回結果來(lái)判定下一步的邏輯,要注意的點(diǎn)是waitForConfirmsOrDie方法如果返回false則會(huì )關(guān)閉channel,則接下來(lái)無(wú)法發(fā)送消息到broker; spring.rabbitmq.publisher-confirm-type=simple # 連接超時(shí)時(shí)間 spring.rabbitmq.connection-timeout=20000 # RabbitMQ服務(wù)開(kāi)啟消息發(fā)送確認 spring.rabbitmq.publisher-returns=true ######################### simple模式配置 ######################## # RabbitMQ服務(wù) 消息接收確認模式 ## NONE:不確認 ## AUTO:自動(dòng)確認 ## MANUAL:手動(dòng)確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 指定最小的消費者數量 spring.rabbitmq.listener.simple.concurrency=1 # 指定最大的消費者數量 spring.rabbitmq.listener.simple.max-concurrency=1 # 每次只消費一個(gè)消息 spring.rabbitmq.listener.simple.prefetch=1 # 開(kāi)啟支持重試 spring.rabbitmq.listener.simple.retry.enabled=true # 啟用強制信息,默認為false spring.rabbitmq.template.mandatory=true
7.2 編寫(xiě)消息發(fā)送確認類(lèi) RabbitConfirmCallback
package com.gmtgo.demo.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * @author 大帥 */ @Slf4j public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("=======ConfirmCallback========="); log.info("correlationData {} " , correlationData); log.info("ack = {}" , ack); log.info("cause = {}" , cause); log.info("=======ConfirmCallback========="); } }
7.3 編寫(xiě)消息發(fā)送交換機返回機制RabbitConfirmReturnCallBack
package com.gmtgo.demo.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * @author 大帥 */ @Slf4j public class RabbitConfirmReturnCallBack implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("--------------ReturnCallback----------------"); log.info("message = " + message); log.info("replyCode = {}", replyCode); log.info("replyText = {}", replyText); log.info("exchange = {}", exchange); log.info("routingKey = {}", routingKey); log.info("--------------ReturnCallback----------------"); } }
7.4 RabbitMQ配置
在我們的rabbit隊列配置類(lèi)里設置RabbitTemplate
舉例:
package com.gmtgo.demo.topic; import com.gmtgo.demo.config.RabbitConfirmCallback; import com.gmtgo.demo.config.RabbitConfirmReturnCallBack; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; /** * @author 大帥 */ @Configuration public class TopicQueueConfig { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void initRabbitTemplate() { // 設置生產(chǎn)者消息確認 rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback()); rabbitTemplate.setReturnCallback(new RabbitConfirmReturnCallBack()); } /** * 聲明隊列名. */ private final String topic1 = "topic_queue_1"; private final String topic2 = "topic_queue_2"; /** * 聲明交換機的名字. */ private final String topicExchange = "topicExchange"; /** * 聲明隊列. * * @return */ @Bean public Queue topicQueue1() { return new Queue(topic1); } @Bean public Queue topicQueue2() { return new Queue(topic2); } /** * 聲明路由交換機. * * @return */ @Bean public TopicExchange topicExchange() { return new TopicExchange(topicExchange); } /** * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定. * * @return */ @Bean Binding bindingTopicExchange1(Queue topicQueue1, TopicExchange exchange) { return BindingBuilder.bind(topicQueue1).to(exchange).with("topic.keyA"); } /** * 隊列綁定交換機,指定routingKey,也可在可視化工具中進(jìn)行綁定. * 綁定的routing key 也可以使用通配符: * *:匹配不多不少一個(gè)詞 * #:匹配一個(gè)或多個(gè)詞 * * @return */ @Bean Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange exchange) { return BindingBuilder.bind(topicQueue2).to(exchange).with("topic.#"); } }
啟動(dòng)項目發(fā)送消息,消息被正常消費,confim回調返回ack=true如果我們將exchange修改,發(fā)送到一個(gè)不存在的exchange中,會(huì )怎么樣呢?
會(huì )發(fā)現confirm回調為false,打印出結果為不存在topicExchange1111的交換機
如果我們在消費端處理邏輯時(shí)出錯會(huì )怎么樣呢?修改消費端代碼我們在消費時(shí)讓它報錯
confirm回調為true,但是在rabbitmq的web界面會(huì )發(fā)現存在5條沒(méi)有消費的消息
如果我們把
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
中最后一個(gè)參數改為false呢,會(huì )發(fā)現在web管理界面沒(méi)有未被消費的消息,說(shuō)明這條消息已經(jīng)被摒棄。
實(shí)際開(kāi)發(fā)中,到底是打回到隊列呢還是摒棄,要看自己的需求,但是打回隊列應該有次數限制,不然會(huì )陷入死循環(huán)。
繼續測試,將routingKey修改為一個(gè)沒(méi)有的key,
7.5 結論
8. 項目示例代碼:
下載地址:
到此這篇關(guān)于rabbitmq五種模式詳解(含實(shí)現代碼)的文章就介紹到這了,更多相關(guān)rabbitmq五種模式內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自互聯(lián)網(wǎng)轉載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權請聯(lián)系QQ:712375056 進(jìn)行舉報,并提供相關(guān)證據,一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容。
Copyright ? 2009-2021 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 珠海市特網(wǎng)科技有限公司 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站