设为首页 收藏本站
查看: 1071|回复: 0

[经验分享] RabbitMQ如何保证发送端消息的可靠投递

[复制链接]

尚未签到

发表于 2017-12-9 19:31:22 | 显示全部楼层 |阅读模式
  消息发布者向RabbitMQ进行消息投递时默认情况下是不返回发布者该条消息在broker中的状态的,也就是说发布者不知道这条消息是否真的抵达RabbitMQ的broker之上,也因此会发生消息丢失的情况。
  对此,RabbitmQ提供了两种解决方案(以官方提供的SDK为例)
  1.通过AMOP提供的事务机制:
  C#代码:



                try
{
channel.TxSelect();
channel.BasicPublish("yu.exchange", "yu.1", props, msg);
channel.TxCommit();
}
catch (Exception ex)
{
channel.TxRollback();
}
  java代码是一样的操作。。。



        byte[] msg = "hello".getBytes();
try {
channel.txSelect();
channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
channel.txCommit();
} catch (Exception ex) {
channel.txRollback();
}
  事务开启,提交,回滚都有了。。。
  2.Conform模式,也就是官网推荐的消息批量提交的方式
  Conform模式主要包含两种编程模式,一种同步的,一种异步通知的:
  同步回调的调用方式与事务模式差不多
  C#代码:



                channel.ConfirmSelect();
byte[] msg = Encoding.UTF8.GetBytes("hello");
channel.BasicPublish("yu.exchange1", "yu.1", props, msg);
bool success = channel.WaitForConfirms(new TimeSpan(0, 0, 10));
Console.WriteLine(success);
  Java代码:



        byte[] msg = "hello".getBytes();
channel.confirmSelect();
channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
boolean success = channel.waitForConfirms(10);
  通道Channel开启Conform,在发送完消息后可以通过WaitForConfirm等待消息的投递结果,这里有个可选参数,就是阻塞等待的时间,如果返回结果为false则表示消息投递失败,则发送端这时候也就可以采取重试之类的策略了。
  异步回调的方式也就是通道订阅RabbitMQ的发送完毕确认事件,消息投递成功会回调这个方法给发送方,回调的参数包含当前消息在该通道中发送的编号DeliveryTag(批量提交的时候可以根据这个编号对应提交集合的索引,保证对应集合索引上的消息可靠投递),的最大值是9223372036854775807
  C#代码:



          channel.BasicAcks += (sender, eventArgs) =>
{
ulong tag = eventArgs.DeliveryTag;
};
channel.BasicReturn += (sender, eventArgs) =>
{
};
byte[] msg = Encoding.UTF8.GetBytes("hello");
channel.BasicPublish("yu.exchange1", "yu.1", true, props, msg);
  Java代码:



channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long l, boolean b) throws IOException {
System.out.println(l);
}
public void handleNack(long l, boolean b) throws IOException {
System.out.println(l);
}
});
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
System.out.println("响应状态码-ReplyCode:"+i);
System.out.println("响应内容-ReplyText:"+s);
System.out.println("Exchange:"+s1);
System.out.println("RouteKey"+s2);
                System.out.println("投递失败的消息:"+ new String(bytes,"UTF-8") );
            }
});
byte[] msg = "hello".getBytes();
channel.confirmSelect();
channel.basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
  代码中多订阅了一个BasicReturn事件(addReturnListener),当消息被RabbitMQ拒绝或者说没有成功投递的时候,则会触发这个方法,当然想要获取详细信息则需要设置mandatory参数为true,也就是basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);中的第三个参数。
  队列中新建一个yu.exchange1的交换机然后不绑定队列的情况下则会投递失败的时间;
DSC0000.png

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422493-1-1.html 上篇帖子: 【RabbitMQ】6、rabbitmq生产者的消息确认 下篇帖子: Spring RabbbitMq 整合
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表