设为首页 收藏本站
查看: 1237|回复: 0

[经验分享] Spring RabbitMQ 延迟队列

[复制链接]

尚未签到

发表于 2017-7-4 20:27:40 | 显示全部楼层 |阅读模式
  一、说明
  在实际业务场景中可能会用到延时消息发送,例如异步回调失败时的重发机制。 RabbitMQ本身不具有延时消息队列的功能,但是可以通过rabbitmq-delayed-message-exchange来实现(也可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现,我们主要讲解通过延迟插件来实现的方法)。利用RabbitMQ的这种特性,应该可以实现很多现实中的业务,我们可以发挥想象。
  二、安装插件
  RabbitMQ的安装请参考我的文章“RabbitMQ安装与使用”,这里我们重点讲插件的安装。
  首先到http://www.rabbitmq.com/community-plugins.html网页下载适合的“rabbitmq_delayed_message_exchange插件”。下载完成后将它放到RabbitMQ插件安装目录({rabbitmq-server}/plugins/),然后执行命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange启用插件,执行命令rabbitmq-plugins disable rabbitmq_delayed_message_exchange也可以关闭插件。具体过程可以查看参考文档2。
  三、spring集成RabbitMQ
  1、maven配置


  • <dependency>  
  •     <groupId>org.springframework.amqp</groupId>  
  •     <artifactId>spring-amqp</artifactId>  
  •     <version>1.6.6.RELEASE</version>  
  •     <exclusions>  
  •         <exclusion>  
  •             <groupId>org.springframework</groupId>  
  •             <artifactId>spring-core</artifactId>  
  •             <version>4.1.6.RELEASE</version>  
  •         </exclusion>  
  •     </exclusions>  
  • </dependency>  
  • <dependency>  
  •     <groupId>org.springframework.amqp</groupId>  
  •     <artifactId>spring-rabbit</artifactId>  
  •     <version>1.6.6.RELEASE</version>  
  •     <exclusions>  
  •         <exclusion>  
  •             <groupId>org.springframework</groupId>  
  •             <artifactId>spring-core</artifactId>  
  •             <version>4.1.6.RELEASE</version>  
  •         </exclusion>  
  •         <exclusion>  
  •             <groupId>org.springframework</groupId>  
  •             <artifactId>spring-messaging</artifactId>  
  •             <version>4.1.6.RELEASE</version>  
  •         </exclusion>  
  •         <exclusion>  
  •             <groupId>org.springframework</groupId>  
  •             <artifactId>spring-tx</artifactId>  
  •             <version>4.1.6.RELEASE</version>  
  •         </exclusion>  
  •         <exclusion>  
  •             <groupId>org.springframework</groupId>  
  •             <artifactId>spring-context</artifactId>  
  •             <version>4.1.6.RELEASE</version>  
  •         </exclusion>  
  •     </exclusions>  
  • </dependency>  
  说明:实现延迟队列需要Spring在4.1以上,spring-amqp在1.6以上。
  2、xml配置





  • <?xml version="1.0" encoding="UTF-8"?>  
  • <beans xmlns="http://www.springframework.org/schema/beans"  
  •     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"  
  •     xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context"  
  •     xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
  •     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
  •                             http://www.springframework.org/schema/context
  •                             http://www.springframework.org/schema/context/spring-context-3.1.xsd
  •                             http://www.springframework.org/schema/tx
  •                             http://www.springframework.org/schema/tx/spring-tx.xsd
  •                             http://www.springframework.org/schema/aop
  •                             http://www.springframework.org/schema/aop/spring-aop.xsd
  •                             http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
  •                             http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">  
  •     <context:property-placeholder location="classpath:rmq-config.properties" ignore-unresolvable="true"/>  

  •     <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
  •         <property name="host" value="${rabbitmq.host}" />  
  •         <property name="port" value="${rabbitmq.port}" />  
  •         <property name="username" value="${rabbitmq.username}" />  
  •         <property name="password" value="${rabbitmq.password}" />  
  •         <property name="channelCacheSize" value="${rabbitmq.channel.cacheSize}" />  
  •     </bean>  

  •     <bean id="orderConsumer" class="com.xxx.rmq.OrderConsumer"></bean>  
  •     <bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />  
  •     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />  

  •     <rabbit:admin connection-factory="connectionFactory" />  

  •     <!-- 延迟消息start -->  
  •     <rabbit:topic-exchange name="delay_exchange" delayed="true">  
  •         <rabbit:bindings>  
  •             <rabbit:binding queue="delay_queue" pattern="order.delay.notify" />  
  •         </rabbit:bindings>  
  •     </rabbit:topic-exchange>  

  •     <rabbit:queue name="delay_queue" durable="true" auto-declare="true" auto-delete="false" />  

  •     <rabbit:template id="delayMsgTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" exchange="delay_exchange" />  

  •     <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false" acknowledge="auto" message-converter="jsonMessageConverter">  
  •         <rabbit:listener queues="delay_queue" ref="orderConsumer" method="delayMsg" />  
  •     </rabbit:listener-container>  
  •     <!-- 延迟消息end -->  

  • </beans>  
  说明:spring-rabbit-1.6.xsd必须是1.6及以上版本,否则会报“元素 'rabbit:topic-exchange' 中不允许出现属性 'delayed'”错误。具体请查看参考文档3。
  四、延迟队列的使用
  1、发送消息Producer





  • import net.sf.json.JSONObject;  

  • import org.apache.commons.lang.StringUtils;  
  • import org.springframework.amqp.AmqpException;  
  • import org.springframework.amqp.core.AmqpTemplate;  
  • import org.springframework.amqp.core.Message;  
  • import org.springframework.amqp.core.MessagePostProcessor;  
  • import org.springframework.beans.factory.annotation.Autowired;  
  • import org.springframework.stereotype.Service;  
  • /**  
  • *  
  • * @author Horace  
  • * @version 创建时间:2016年10月26日 下午6:34:31  
  • */  
  • @Service  
  • public class MessageProducerServiceImpl implements MessageProducerService{  
  •     @Autowired  
  •     private AmqpTemplate delayMsgTemplate;  
  •     @Override  
  •     public void delayMsg(JSONObject msg,int delay) {  
  •         // TODO Auto-generated method stub   
  •         final int xdelay= delay*1000;   
  •         delayMsgTemplate.convertAndSend("order.delay.notify", (Object) msg,  
  •                 new MessagePostProcessor() {  

  •                     @Override  
  •                     public Message postProcessMessage(Message message)  
  •                             throws AmqpException {  
  •                         // TODO Auto-generated method stub  
  •                         message.getMessageProperties().setDelay(xdelay);
  •                         return message;  
  •                     }
  •                 });
  •     }
  • }
  
2、异步接收消息Consumer





  • import net.sf.json.JSONObject;  
  • import org.apache.commons.lang.StringUtils;  
  • import org.slf4j.Logger;  
  • import org.slf4j.LoggerFactory;  
  • import org.springframework.beans.factory.annotation.Autowired;  

  • /**  
  • *
  • * @author Horace  
  • * @version 创建时间:2016年10月26日 下午2:48:14  
  • */  
  • public class OrderConsumer {  

  •     private static Logger logger = LoggerFactory.getLogger(OrderConsumer.class);  

  •     @Autowired  
  •     private MessageProducerService messageProducerService;  


  •     public void delayMsg(Object obj) {  
  •         logger.info("[延时消息]" + obj);  
  •         if (obj != null) {  
  •             JSONObject notifyJson = JSONObject.fromObject(obj);
  •             String notifyUrl = notifyJson.getString("notifyUrl");  
  •             String notifyContent = notifyJson.getString("notifyContent");  
  •             String result = HttpUtil.postMessage(notifyUrl, notifyContent);
  •             if (StringUtils.isBlank(result)) { // 通知失败 进入重发机制  
  •                 int newNotifyCount = notifyJson.getInt("notifyCount") + 1; //已经通知的次数  
  •                 if (newNotifyCount < 5) {  
  •                     notifyJson.put("notifyCount", newNotifyCount);  
  •                     int spacingInterval = getSpacingInterval(newNotifyCount);  
  •                     messageProducerService
  •                             .delayMsg(notifyJson, spacingInterval);
  •                 } else {  
  •                     logger.info("通知5次都失败,等待后台手工处理!");  
  •                 }
  •             }
  •         }
  •     }

  •     /**
  •      * 重复通知间隔时间(单位为秒)
  •      * @param notifyCount 已经通知的次数
  •      * @return
  •      */  
  •     private int getSpacingInterval(int notifyCount) {  
  •         // TODO Auto-generated method stub  
  •         int spacingInterval = 0;  
  •         switch (notifyCount) {  
  •         case 1:  
  •             spacingInterval = 10;  
  •             break;  
  •         case 2:  
  •             spacingInterval = 20;  
  •             break;  
  •         case 3:  
  •             spacingInterval = 30;  
  •             break;  
  •         case 4:  
  •             spacingInterval = 60;  
  •             break;  
  •         case 5:  
  •             spacingInterval = 90;  
  •             break;  
  •         default:  
  •             break;  
  •         }
  •         return spacingInterval;  
  •     }

  • }


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390776-1-1.html 上篇帖子: RabbitMQ消息队列(二):”Hello, World“ 下篇帖子: hdu4941 Magical Forest
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表