- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) > 編程語(yǔ)言 >
- SpringBoot整合RabbitMQ消息隊列的完整步驟
SpringBoot整合RabbitMQ
主要實(shí)現RabbitMQ以下三種消息隊列:
1. 引入pom依賴(lài)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 配置yml文件
基于上篇《RabbitMQ安裝與配置》實(shí)現的情況下,進(jìn)行基礎配置。
spring: rabbitmq: host: 121.5.168.31 port: 5672 # 默認可省略 virtual-host: /*** # 虛擬主機 username: *** # 用戶(hù)名 password: *** # 用戶(hù)密碼 # 開(kāi)啟投遞成功回調 P -> Exchange publisher-confirm-type: correlated # 開(kāi)啟投遞消息到隊列失敗回調 Exchange -> Queue publisher-returns: true # 開(kāi)啟手動(dòng)ACK確認模式 Queue -> C listener: simple: acknowledge-mode: manual # 代表手動(dòng)ACK確認 # 一些基本參數的設置 concurrency: 3 prefetch: 15 retry: enabled: true max-attempts: 5 max-concurrency: 10
3. 公共Constants類(lèi)
/** * @author Mr.Horse * @version 1.0 * @description: {description} * @date 2021/4/23 15:28 */ public class Constants { /** * 第一個(gè)配置Queue,Exchange,Key(非注解方式) */ public final static String HORSE_SIMPLE_QUEUE = "HORSE_SIMPLE_QUEUE"; public final static String HORSE_SIMPLE_EXCHANGE = "HORSE_SIMPLE_EXCHANGE"; public final static String HORSE_SIMPLE_KEY = "HORSE_SIMPLE_KEY"; /** * 第二個(gè)配置Queue,Exchange,Key(注解方式) */ public final static String HORSE_ANNOTATION_QUEUE = "HORSE_ANNOTATION_QUEUE"; public final static String HORSE_ANNOTATION_EXCHANGE = "HORSE_ANNOTATION_EXCHANGE"; public final static String HORSE_ANNOTATION_KEY = "HORSE_ANNOTATION_KEY"; //************************************延時(shí)消息隊列配置信息************************** /** * 延時(shí)隊列信息配置 */ public final static String HORSE_DELAY_EXCHANGE = "HORSE_DELAY_EXCHANGE"; public final static String HORSE_DELAY_QUEUE = "HORSE_DELAY_QUEUE"; public final static String HORSE_DELAY_KEY = "HORSE_DELAY_KEY"; /** * 死信隊列 */ public final static String HORSE_DEAD_EXCHANGE = "HORSE_DEAD_EXCHANGE"; public final static String HORSE_DEAD_QUEUE = "HORSE_DEAD_QUEUE"; public final static String HORSE_DEAD_KEY = "HORSE_DEAD_KEY"; //**************************************延時(shí)消息隊列配置信息(插件版)****************************** /** * 新延時(shí)隊列信息配置 */ public final static String HORSE_PLUGIN_EXCHANGE = "HORSE_PLUGIN_EXCHANGE"; public final static String HORSE_PLUGIN_QUEUE = "HORSE_PLUGIN_QUEUE"; public final static String HORSE_PLUGIN_KEY = "HORSE_PLUGIN_KEY"; }
簡(jiǎn)單消息隊列(direct模式)
4. RabbitTemplate模板配置
主要定義消息投遞Exchange成功回調函數和消息從Exchange投遞到消息隊列失敗的回調函數。
package com.topsun.rabbit; import com.sun.org.apache.xpath.internal.operations.Bool; import com.topsun.constants.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author Mr.Horse * @version 1.0 * @description: {description} * @date 2021/4/23 14:17 */ @Configuration public class RabbitConfig { private static Logger logger = LoggerFactory.getLogger(RabbitConfig.class); @Autowired private CachingConnectionFactory connectionFactory; /** * @return */ @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 觸發(fā)setReturnCallback回調必須設置mandatory=true,否則Exchange沒(méi)有找到Queue就會(huì )丟棄掉消息, 而不會(huì )觸發(fā)回調 rabbitTemplate.setMandatory(Boolean.TRUE); // 設置序列化機制 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // 消息由投遞到Exchange中時(shí)觸發(fā)的回調 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> logger.info("消息發(fā)送到Exchange情況反饋:唯一標識:correlationData={},消息確認:ack={},原因:cause={}", correlationData, ack, cause) ); // 消息由Exchange發(fā)送到Queue時(shí)失敗觸發(fā)的回調 rabbitTemplate.setReturnsCallback((returnedMessage) -> { // 如果是插件形式實(shí)現的延時(shí)隊列,則直接返回 // 原因: 因為發(fā)送方確實(shí)沒(méi)有投遞到隊列上,只是在交換器上暫存,等過(guò)期時(shí)間到了 才會(huì )發(fā)往隊列,從而實(shí)現延時(shí)隊列的操作 if (Constants.HORSE_PLUGIN_EXCHANGE.equals(returnedMessage.getExchange())) { return; } logger.warn("消息由Exchange發(fā)送到Queue時(shí)失敗:message={},replyCode={},replyText={},exchange={},rountingKey={}", returnedMessage.getMessage(), returnedMessage.getReplyText(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey()); }); return rabbitTemplate; } //*******************************************直接配置綁定關(guān)系***************************************** /** * 聲明隊列 * * @return */ @Bean public Queue horseQueue() { return new Queue(Constants.HORSE_SIMPLE_QUEUE, Boolean.TRUE); } /** * 聲明指定模式交換機 * * @return */ @Bean public DirectExchange horseExchange() { return new DirectExchange(Constants.HORSE_SIMPLE_EXCHANGE, Boolean.TRUE, Boolean.FALSE); } /** * 綁定交換機,隊列,路由Key * * @return */ @Bean public Binding horseBinding() { return BindingBuilder.bind(horseQueue()).to(horseExchange()).with(Constants.HORSE_SIMPLE_KEY); } }
5. 定義消息監聽(tīng)器
基于 @RabbitListenerzi注解,實(shí)現自定義消息監聽(tīng)器。主要有兩種實(shí)現方式:
package com.topsun.rabbit; import com.rabbitmq.client.Channel; import com.topsun.constants.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author Mr.Horse * @version 1.0 * @description: {description} * @date 2021/4/23 14:58 */ @Component public class MsgListener { private static Logger logger = LoggerFactory.getLogger(MsgListener.class); /** * 配置類(lèi)中已經(jīng)完成綁定,這里直接根據隊列值接收 * * @param message * @param channel * @param msg */ @RabbitListenerzi(queues = Constants.HORSE_SIMPLE_QUEUE) public void customListener(Message message, Channel channel, String msg) { // 獲取每條消息唯一標識(用于手動(dòng)ACK確認) long tag = message.getMessageProperties().getDeliveryTag(); try { logger.info(" ==> customListener接收" + msg); // 手動(dòng)ACK確認 channel.basicAck(tag, false); } catch (IOException e) { logger.error(" ==> 消息接收失敗: {}", tag); } } /** * 根據注解的形式進(jìn)行綁定接收 * * @param message * @param channel * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = Constants.HORSE_ANNOTATION_QUEUE, durable = "true"), exchange = @Exchange(value = Constants.HORSE_ANNOTATION_EXCHANGE, ignoreDeclarationExceptions = "true"), key = {Constants.HORSE_ANNOTATION_KEY} )) public void annotationListener(Message message, Channel channel, String msg) { // 獲取每條消息唯一標識(用于手動(dòng)ACK確認) long tag = message.getMessageProperties().getDeliveryTag(); try { logger.info(" ==> annotationListener接收" + msg); // 手動(dòng)ACK確認 channel.basicAck(tag, false); } catch (IOException e) { logger.error(" ==> 消息接收失敗: {}", tag); } } }
6. 測試接口
這里發(fā)送100條消息:
@GetMapping("/rabbit") public void sendMsg() { for (int i = 1; i <= 100; i++) { String msg = "第" + i + "條消息"; logger.info("==> 發(fā)送" + msg); if (i % 2 == 1) { rabbitTemplate.convertAndSend(Constants.HORSE_SIMPLE_EXCHANGE, Constants.HORSE_SIMPLE_KEY, msg, new CorrelationData(String.valueOf(i))); } else { rabbitTemplate.convertAndSend(Constants.HORSE_ANNOTATION_EXCHANGE, Constants.HORSE_ANNOTATION_KEY, msg, new CorrelationData(String.valueOf(i))); } } }
結果:自行測試過(guò),非常成功:smile::smile::smile:
原理:生產(chǎn)者生產(chǎn)一條延時(shí)消息,根據需要延時(shí)時(shí)間的不同,利用不同的routingkey將消息路由到不同的延時(shí)隊列,每個(gè)隊列都設置了不同的TTL屬性,并綁定在同一個(gè)死信交換機中,消息過(guò)期后,根據routingkey的不同,又會(huì )被路由到不同的死信隊列中,消費者只需要監聽(tīng)對應的死信隊列進(jìn)行處理即可。
7. 配置綁定相關(guān)信息
/** * @author Mr.Horse * @version 1.0 * @description: {description} * @date 2021/4/24 14:22 */ @Configuration public class DelayRabbitConfig { private static Logger logger = LoggerFactory.getLogger(DelayRabbitConfig.class); /** * 聲明延時(shí)隊列交換機 * * @return */ @Bean public DirectExchange delayExchange() { return new DirectExchange(Constants.HORSE_DELAY_EXCHANGE, Boolean.TRUE, Boolean.FALSE); } /** * 聲明死信隊列交換機 * * @return */ @Bean public DirectExchange deadExchange() { return new DirectExchange(Constants.HORSE_DEAD_EXCHANGE, Boolean.TRUE, Boolean.FALSE); } /** * 聲明延時(shí)隊列 延時(shí)10s(單位:ms),并將延時(shí)隊列綁定到對應的死信交換機和路由Key * * @return */ @Bean public Queue delayQueue() { Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機 args.put("x-dead-letter-exchange", Constants.HORSE_DEAD_EXCHANGE); // x-dead-letter-routing-key 這里聲明當前隊列的死信路由key args.put("x-dead-letter-routing-key", Constants.HORSE_DEAD_KEY); // x-message-ttl 聲明隊列的TTL(過(guò)期時(shí)間) // 可以在這里直接寫(xiě)死,也可以進(jìn)行動(dòng)態(tài)的設置(推薦動(dòng)態(tài)設置) // args.put("x-message-ttl", 10000); return QueueBuilder.durable(Constants.HORSE_DELAY_QUEUE).withArguments(args).build(); } /** * 聲明死信隊列 * * @return */ @Bean public Queue deadQueue() { return new Queue(Constants.HORSE_DEAD_QUEUE, Boolean.TRUE); } /** * 延時(shí)隊列綁定管理 * * @return */ @Bean public Binding delayBinding() { return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(Constants.HORSE_DELAY_KEY); } /** * 死信隊列綁定管理 * * @return */ @Bean public Binding deadBinding() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(Constants.HORSE_DEAD_KEY); } //**********************************延時(shí)消息隊列配置信息(插件版)************************************ @Bean public Queue pluginQueue() { return new Queue(Constants.HORSE_PLUGIN_QUEUE); } /** * 設置延時(shí)隊列的交換機,必須是 CustomExchange 類(lèi)型交換機 * 參數必須,不能改變 * @return */ @Bean public CustomExchange customPluginExchange() { Map<String, Object> args = new HashMap<>(2); args.put("x-delayed-type", "direct"); return new CustomExchange(Constants.HORSE_PLUGIN_EXCHANGE, "x-delayed-message", Boolean.TRUE, Boolean.FALSE, args); } @Bean public Binding pluginBinding() { return BindingBuilder.bind(pluginQueue()).to(customPluginExchange()).with(Constants.HORSE_PLUGIN_KEY).noargs(); } }
8. 定義延時(shí)監聽(tīng)器
/** * @author Mr.Horse * @version 1.0 * @description: {description} * @date 2021/4/24 14:51 */ @Component public class DelayMsgListener { private static Logger logger = LoggerFactory.getLogger(DelayMsgListener.class); /** * 監聽(tīng)死信隊列 * * @param message * @param channel * @param msg */ @RabbitListener(queues = Constants.HORSE_DEAD_QUEUE) public void consumeDeadListener(Message message, Channel channel, String msg) { long tag = message.getMessageProperties().getDeliveryTag(); try { logger.info(" ==> consumeDeadListener接收" + msg); // 手動(dòng)ACK確認 channel.basicAck(tag, false); } catch (IOException e) { logger.error(" ==> 消息接收失敗: {}", tag); } } /** * 監聽(tīng)延時(shí)隊列(插件版) * * @param message * @param channel * @param msg */ @RabbitListener(queues = Constants.HORSE_PLUGIN_QUEUE) public void consumePluginListener(Message message, Channel channel, String msg) { long tag = message.getMessageProperties().getDeliveryTag(); try { logger.info(" ==> consumePluginListener" + msg); // 手動(dòng)ACK確認 channel.basicAck(tag, false); } catch (IOException e) { logger.error(" ==> 消息接收失敗: {}", tag); } } }
9. 測試接口
// 基于特性的延時(shí)隊列 @GetMapping("/delay/rabbit") public void delayMsg(@RequestParam("expire") Long expire) { for (int i = 1; i <= 10; i++) { String msg = "第" + i + "條消息"; logger.info("==> 發(fā)送" + msg); // 這里可以動(dòng)態(tài)的設置過(guò)期時(shí)間 rabbitTemplate.convertAndSend(Constants.HORSE_DELAY_EXCHANGE, Constants.HORSE_DELAY_KEY, msg, message -> { message.getMessageProperties().setExpiration(String.valueOf(expire)); return message; }, new CorrelationData(String.valueOf(i))); } } // 基于插件的延時(shí)隊列 @GetMapping("/delay/plugin") public void delayPluginMsg(@RequestParam("expire") Integer expire) { for (int i = 1; i <= 10; i++) { String msg = "第" + i + "條消息"; logger.info("==> 發(fā)送" + msg); // 動(dòng)態(tài)設置過(guò)期時(shí)間 rabbitTemplate.convertAndSend(Constants.HORSE_PLUGIN_EXCHANGE, Constants.HORSE_PLUGIN_KEY, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(expire); return message; }, new CorrelationData(String.valueOf(i))); } }
結果:你懂的:scream_cat::scream_cat::scream_cat:
RabbitMQ的基礎使用演示到此結束。
到此這篇關(guān)于SpringBoot整合RabbitMQ消息隊列的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ消息隊列內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。
Copyright ? 2009-2022 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 特網(wǎng)科技 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 百度云 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站