设为首页 收藏本站
查看: 888|回复: 0

[经验分享] RabbitMQ入门_11_DLX

[复制链接]

尚未签到

发表于 2017-12-9 14:14:04 | 显示全部楼层 |阅读模式
  参考资料:https://www.rabbitmq.com/dlx.html
  

  队列中的消息可能会成为死信消息(dead lettered)。让消息成为死信消息的事件有:


  • 消息被取消确认(nack 或 reject),且设置为不重入队列(requeue = false)
  • 消息TTL过期
  • 队列达到长度限制
  死信消息会被死信交换机(Dead Letter Exchange, DLX)重新发布。
  gordon.study.rabbitmq.dlx.TestDlx.java
public>  private static final String DLX_EXCHANGE_NAME = "exchangeDLX";
  private static final String DLX_QUEUE_NAME = "queueDLX";
  private static final String QUEUE_NAME = "queue";
  public static void main(String[] argv) throws Exception {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel senderChannel = connection.createChannel();
  Channel consumerChannel = connection.createChannel();
  Map<String, Object> args = new HashMap<String, Object>();
  args.put(&quot;x-message-ttl&quot;, 3000); // 设置队列中消息存活时间为3秒
  args.put(&quot;x-max-length&quot;, 5); // 设置队列最大消息数量为5
  args.put(&quot;x-dead-letter-exchange&quot;, DLX_EXCHANGE_NAME); // 设置DLX
  senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
  senderChannel.queueDeclare(DLX_QUEUE_NAME, false, false, true, null);
  senderChannel.exchangeDeclare(DLX_EXCHANGE_NAME, &quot;direct&quot;, false, true, null);
  // 将死信队列绑定到死信交换机上,绑定键为 QUEUE_NAME。消息发送时使用的绑定键也会是 QUEUE_NAME
  senderChannel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, QUEUE_NAME);
  // 发布6个消息
  for (int i = 0; i < 6;) {
  String message = &quot;NO. &quot; + ++i;
  senderChannel.basicPublish(&quot;&quot;, QUEUE_NAME, null, message.getBytes(&quot;UTF-8&quot;));
  }
  // 监视死信队列
  Consumer dlxConsumer = new DefaultConsumer(consumerChannel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
  throws IOException {
  String message = new String(body, &quot;UTF-8&quot;);
  System.out.printf(&quot;consume: %s, envelop: %s, properties: %s\n&quot;, message, envelope, properties);
  }
  };
  consumerChannel.basicConsume(DLX_QUEUE_NAME, true, dlxConsumer);
  Thread.sleep(100);
  GetResponse resp = consumerChannel.basicGet(QUEUE_NAME, false);
  consumerChannel.basicReject(resp.getEnvelope().getDeliveryTag(), false);
  }
  
}
  代码第19行通过 x-dead-letter-exchange 参数定义了当前队列指定的死信交换机是 DLX_EXCHANGE_NAME,当前队列中所有的死信消息都将交由 DLX_EXCHANGE_NAME 再一次分发到新的队列。
  代码第23行创建了死信交换机 DLX_EXCHANGE_NAME,可见,死信交换机就是普通的交换机。
  从以上两部分代码顺序可知,死信交换机可以在被队列引用后才创建。RabbitMQ 不会去验证死信交换机设置是否有效,当死信消息找不到指定的交换机时,死信消息会被RabbitMQ安静的丢弃,而不是抛出异常。
  既然死信交换机就是普通的交换机,那么它就需要根据消息的绑定键来分发消息。死信消息的绑定键遵守以下规则:当队列指定了死信路由键(x-dead-letter-routing-key)参数时,死信消息使用该参数指定的路由键作为自己的路由键;否则使用消息原来的路由键。所以,示例代码中,死信消息的路由键是代码第30行发布消息时指定的路由键 QUEUE_NAME。如果想修改死信消息路由键,可以在第19行下面增加
  

        args.put(&quot;x-dead-letter-routing-key&quot;, &quot;some-routing-key&quot;);  

  观察日志输出:
  

consume: NO. 1, envelop: Envelope(deliveryTag=1, redeliver=false, exchange=exchangeDLX, routingKey=queue), properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{queue=queue, time=Sat Jun 10 11:51:48 CST 2017, count=1, reason=maxlen, routing-keys=[queue], exchange=}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)  
consume: NO. 2, envelop: Envelope(deliveryTag=3, redeliver=false, exchange=exchangeDLX, routingKey=queue), properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{queue=queue, time=Sat Jun 10 11:51:48 CST 2017, count=1, reason=rejected, routing-keys=[queue], exchange=}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  
consume: NO. 3, envelop: Envelope(deliveryTag=4, redeliver=false, exchange=exchangeDLX, routingKey=queue), properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{queue=queue, time=Sat Jun 10 11:51:51 CST 2017, count=1, reason=expired, routing-keys=[queue], exchange=}]}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
  
......
  

  死信消息相比原来的消息发生了一些改变。除了上面提到的 exchange 变为死信交换机名称,routingKey 可能变为新的路由键(由 x-dead-letter-routing-key 参数决定),死信处理过程还在死信消息头中增加了 x-death 数组信息。每一次死信事件对应一个数组项,包含以下字段,


  • queue:本次死信事件发生前,消息所属队列
  • reason:死信原因,分为 rejected、expired 与 maxlen
  • time:死信事件发生时间
  • exchange:死信事件前,消息发布时指定的交换机
  • routing-keys:死信事件前,消息发布时指定的路由键
  • count:当前 queue 与 当前 reason 表述的死信事件发生的次数
  • original-expiration:对于消息 TTL,因为超时导致死信时,会移除 TTL(否则永远触发超时),该字段记录原来设定的消息 TTL 值
  当死信消息再次触发死信事件时,一般会产生一个新的数组项,插到数组的最前头。但是,如果 x-death 数组已经包含一个相同 queue 与 reason 的数组项,则直接将该数组项移到数组最前头,并将其 count 值加一。
  

  未确认问题:
  
1. 当消息被 reject 回队列头,同时又超过队列长度限制时,怎么处理?
  
试验结果好像是直接变为 maxlen reason 的死信消息
  2. DLX很可能形成环(最简单的场景就是DLX与原交换机相同),这时消息有可能无限触发死信事件吗(例如超过队列长度限制)?
  
官方说法是处在DLX环中的消息,如果经历了整个环都没有触发过 rejected reason 的死信事件,则抛弃该消息。
  这些问题有点偏,目前就不花时间研究了。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422423-1-1.html 上篇帖子: 关于高并发下多线程数据处理 下篇帖子: SpringBoot ( 八 ) :RabbitMQ 详解
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表