设为首页 收藏本站
查看: 963|回复: 0

[经验分享] (转) RabbitMQ学习之延时队列

[复制链接]

尚未签到

发表于 2017-12-9 07:34:36 | 显示全部楼层 |阅读模式
  http://blog.csdn.net/zhu_tianwei/article/details/53563311
  在实际的业务中我们会遇见生产者产生的消息,不立即消费,而是延时一段时间在消费。RabbitMQ本身没有直接支持延迟队列功能,但是我们可以根据其特性Per-Queue Message TTL和 Dead Letter Exchanges实现延时队列。也可以通过改特性设置消息的优先级。
  1.Per-Queue Message TTL
RabbitMQ可以针对消息和队列设置TTL(过期时间)。队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。
2.Dead Letter Exchanges
当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。消息变成Dead Letter一向有以下几种情况:
消息被拒绝(basic.reject or basic.nack)并且requeue=false
消息TTL过期
队列达到最大长度
实际上就是设置某个队列的属性,当这个队列中有Dead Letter时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列,publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。
  一、在队列上设置TTL
DSC0000.png

  1.建立delay.exchange
DSC0001.png

  这里Internal设置为NO,否则将无法接受dead letter,YES表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定。
  2.建立延时队列(delay queue)
DSC0002.png

  如上配置延时5min队列(x-message-ttl=300000)
  x-max-length:最大积压的消息个数,可以根据自己的实际情况设置,超过限制消息不会丢失,会立即转向delay.exchange进行投递
  x-dead-letter-exchange:设置为刚刚配置好的delay.exchange,消息过期后会通过delay.exchange进行投递
  这里不需要配置"dead letter routing key"否则会覆盖掉消息发送时携带的routingkey,导致后面无法路由为刚才配置的delay.exchange
  3.配置延时路由规则
  需要延时的消息到exchange后先路由到指定的延时队列
  1)创建delaysync.exchange通过Routing key将消息路由到延时队列
  2.配置delay.exchange 将消息投递到正常的消费队列
  配置完成。
  下面使用代码测试一下:
  生产者:



[java] view plain copy
  print?

  • package cn.slimsmart.study.rabbitmq.delayqueue.queue;  

  • import java.io.IOException;  

  • import com.rabbitmq.client.Channel;  
  • import com.rabbitmq.client.Connection;  
  • import com.rabbitmq.client.ConnectionFactory;  

  • public class Producer {  

  •     private static String queue_name = "test.queue";  

  •     public static void main(String[] args) throws IOException {  
  •         ConnectionFactory factory = new ConnectionFactory();  
  •         factory.setHost("10.1.199.169");  
  •         factory.setUsername("admin");  
  •         factory.setPassword("123456");  
  •         Connection connection = factory.newConnection();
  •         Channel channel = connection.createChannel();
  •         // 声明队列  
  •         channel.queueDeclare(queue_name, true, false, false, null);  
  •         String message = "hello world!" + System.currentTimeMillis();  
  •         channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes());  
  •         System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());  
  •         // 关闭频道和连接  
  •         channel.close();
  •         connection.close();
  •     }
  • }
  消费者:



[java] view plain copy
  print?

  • package cn.slimsmart.study.rabbitmq.delayqueue.queue;  

  • import com.rabbitmq.client.Channel;  
  • import com.rabbitmq.client.Connection;  
  • import com.rabbitmq.client.ConnectionFactory;  
  • import com.rabbitmq.client.QueueingConsumer;  

  • public class Consumer {  

  •     private static String queue_name = "test.queue";  

  •     public static void main(String[] args) throws Exception {  
  •         ConnectionFactory factory = new ConnectionFactory();  
  •         factory.setHost("10.1.199.169");  
  •         factory.setUsername("admin");  
  •         factory.setPassword("123456");  
  •         Connection connection = factory.newConnection();
  •         Channel channel = connection.createChannel();
  •         // 声明队列  
  •         channel.queueDeclare(queue_name, true, false, false, null);  
  •         QueueingConsumer consumer = new QueueingConsumer(channel);  
  •         // 指定消费队列  
  •         channel.basicConsume(queue_name, true, consumer);  
  •         while (true) {  
  •             // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)  
  •             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  •             String message = new String(delivery.getBody());  
  •             System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());  
  •         }
  •     }

  • }
  二、在消息上设置TTL
DSC0003.png

  实现代码:
  生产者:



[java] view plain copy
  print?

  • package cn.slimsmart.study.rabbitmq.delayqueue.message;  

  • import java.io.IOException;  
  • import java.util.HashMap;  

  • import com.rabbitmq.client.AMQP;  
  • import com.rabbitmq.client.Channel;  
  • import com.rabbitmq.client.Connection;  
  • import com.rabbitmq.client.ConnectionFactory;  

  • public class Producer {  

  •     private static String queue_name = "message_ttl_queue";  

  •     public static void main(String[] args) throws IOException {  
  •         ConnectionFactory factory = new ConnectionFactory();  
  •         factory.setHost("10.1.199.169");  
  •         factory.setUsername("admin");  
  •         factory.setPassword("123456");  
  •         Connection connection = factory.newConnection();
  •         Channel channel = connection.createChannel();
  •         HashMap<String, Object> arguments = new HashMap<String, Object>();  
  •         arguments.put("x-dead-letter-exchange", "amq.direct");  
  •         arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");  
  •         channel.queueDeclare("delay_queue", true, false, false, arguments);  

  •         // 声明队列  
  •         channel.queueDeclare(queue_name, true, false, false, null);  
  •         // 绑定路由  
  •         channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");  

  •         String message = "hello world!" + System.currentTimeMillis();  
  •         // 设置延时属性  
  •         AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();  
  •         // 持久性 non-persistent (1) or persistent (2)  
  •         AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build();  
  •         // routingKey =delay_queue 进行转发  
  •         channel.basicPublish("", "delay_queue", properties, message.getBytes());  
  •         System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());  
  •         // 关闭频道和连接  
  •         channel.close();
  •         connection.close();
  •     }
  • }
  消费者:



[java] view plain copy
  print?

  • package cn.slimsmart.study.rabbitmq.delayqueue.message;  

  • import java.util.HashMap;  

  • import com.rabbitmq.client.Channel;  
  • import com.rabbitmq.client.Connection;  
  • import com.rabbitmq.client.ConnectionFactory;  
  • import com.rabbitmq.client.QueueingConsumer;  

  • public class Consumer {  

  •     private static String queue_name = "message_ttl_queue";  

  •     public static void main(String[] args) throws Exception {  
  •         ConnectionFactory factory = new ConnectionFactory();  
  •         factory.setHost("10.1.199.169");  
  •         factory.setUsername("admin");  
  •         factory.setPassword("123456");  
  •         Connection connection = factory.newConnection();
  •         Channel channel = connection.createChannel();
  •         HashMap<String, Object> arguments = new HashMap<String, Object>();  
  •         arguments.put("x-dead-letter-exchange", "amq.direct");  
  •         arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");  
  •         channel.queueDeclare("delay_queue", true, false, false, arguments);  

  •         // 声明队列  
  •         channel.queueDeclare(queue_name, true, false, false, null);  
  •         // 绑定路由  
  •         channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");  

  •         QueueingConsumer consumer = new QueueingConsumer(channel);  
  •         // 指定消费队列  
  •         channel.basicConsume(queue_name, true, consumer);  
  •         while (true) {  
  •             // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)  
  •             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  •             String message = new String(delivery.getBody());  
  •             System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());  
  •         }
  •     }

  • }
  查看资料:
  http://www.rabbitmq.com/ttl.html
http://www.rabbitmq.com/dlx.html
http://www.rabbitmq.com/maxlength.html
https://www.cloudamqp.com/docs/delayed-messages.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422319-1-1.html 上篇帖子: RabbitMQ基础概念详解(转载) 下篇帖子: PHP使用RabbitMQ
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表