设为首页 收藏本站
查看: 1255|回复: 0

[经验分享] Azure Messaging-ServiceBus Messaging消息队列技术系列5-重复消息:at-least-once at-most-once

[复制链接]

尚未签到

发表于 2017-6-30 18:39:21 | 显示全部楼层 |阅读模式
  上篇博客中,我们用实际的业务场景和代码示例了Azure Messaging-ServiceBus Messaging对复杂对象消息的支持和消息的持久化:
  Azure Messaging-ServiceBus Messaging消息队列技术系列4-复杂对象消息是否需要支持序列化和消息持久化
  本文中我们主要研究并介绍Azure Messaging对重复消息的支持。
  MessageReceiver 对象创建时可以指定消息接收模式: ReceiveAndDeletePeekLock (默认),其中:
  1. 使用 ReceiveAndDelete 模式时,接收是单步操作,即当 Service Bus 收到请求时,它将消息标记为“正在使用”,然后将其返回给应用程序。ReceiveAndDelete 模式是最简
  单的模型,并且最适合在出现故障时应用程序能够容许不处理消息的场景。理解此模式时,可考虑这种情况:使用者发出了接收请求,但在处理消息之前发生崩溃。由于 Service B
  us已将消息标记为“正在使用”,因此当应用程序重新启动并重新开始使用消息时,它就会错过在崩溃前已使用的消息。
  2. 在 PeekLock 模式下,接收变成两阶段操作,因此可以支持不能容许错过消息的应用程序。当 Service Bus 收到请求时,它会找到下一条要使用的消息,将其锁定以防止其他使
  用者接收它,然后将其返回给应用程序。应用程序完成消息处理(或将消息可靠地存储以便将来处理)后,会对收到的消息调用 Complete 以完成接收过程的第二阶段。当Service
  Bus 看到 Complete 时,会将该消息标记为“正在使用”。另外两个结果也是可能的。第一个结果,如果由于某种原因应用程序无法处理该消息,它可以对收到的消息Abandon(而
  不是 Complete)。这将导致 Service Bus 解锁该消息,并使该消息可以重新被同一使用者或其他竞争的使用者接收。第二个结果,即存在与锁定关联的超时,如果应用程序在锁
  定超时到期前无法处理改消息(例如,应用程序崩溃)则 Service Bus 将解锁该消息并使其可以重新被接收。如果应用程序在处理该消息后崩溃,但此时尚未发出 Complete 请
  求,则在应用程序重新启动时,该消息将重新传递给应用程序。这通常称为“至少一次”处理。这意味着每条消息都将至少处理一次,但在某些情况下可能会重新传递同一消息。如果
  方案不能容许重复处理,则需要在应用程序中添加检测重复项的逻辑。这可以基于消息的 MessageId 属性来实现。此属性的值在传递尝试过程中保持不变。这称为“恰好一次”处、
  理。
  接下来,我们通过Code show一下消息的重复发送和重复接收。
  消息重复发送:同一个消息BrokeredMessage发送两次



/// <summary>
/// 发送消息
/// </summary>
private static void MessageMultiSendTest()
{
var sbUtils = new ServiceBusUtils();
//创建队列
sbUtils.CreateQueue(queueName, false);
//多次发送消息到OrderQueue
var queueSendClient = sbUtils.GetQueueClient(queueName);
var order = CreateSalesOrder(1);
var message = sbUtils.Create(order);
queueSendClient.Send(message);
queueSendClient.Send(message);

Console.WriteLine("Send Completed!");
}
  实际执行过程中是出错的:
DSC0000.png

  由此可以得出:
  Azure Messaging 不支持同一个消息发送多次,必须通过new多个BrokeredMessage实例实现。
  同时,消息的唯一性由消息的MessageID来标识!
  PeekAndLock模式下消息的重复接收:
  接收模式PeekAndLock,同一个队列,第一个Consumer接收消息,但是不Complete;然后第二个Consumer继续接收消息,此时第一个Consumer未Complete的消息有一个
  TTL,在TTL时间区间之内,第二个Consumer可以继续接收当前队列未锁定的消息,当TTL时间到达后,释放第一个Consumer锁定的消息,第二个Consumer读取到了第一个
  Consumer未Complete的消息。
  The duration of a peek lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes;
  the default value is 1 minute.



/// <summary>
/// 接收消息
/// </summary>
private static void MessageReceive()
{
int index = 0;            
var sbUtils = new ServiceBusUtils();
var queueReveiveClient1 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);
for (int i = 0; i < 10; i++)
{
var msg = queueReveiveClient1.Peek();
Console.WriteLine(string.Format("Received {0} MessageID: {1}", i, msg.MessageId));
}
var queueReveiveClient2 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);
for (int i = 0; i < 10; i++)
{
var msg = queueReveiveClient2.Receive();
Console.WriteLine(string.Format("Second received {0} MessageID: {1}", i, msg.MessageId));
msg.Complete();
}
////删除队列
//sbUtils.DeleteQueue(queueName);

Console.WriteLine("Receive Completed!");
}
  第一个队列Consumer使用Peek模式接收到消息,只是取出消息,不从消息队列中移出。
  第二个队列Consumer使用Receive模式同样可以接收消息。
DSC0001.png

  如果两个队列Consumer都使用Receive模式接收消息,只有第一个Consumer可以接收到,第二个Consumer则接收不到,一直在等待消息的入队!



/// <summary>
/// 接收消息
/// </summary>
private static void MessageReceive()
{
int index = 0;            
var sbUtils = new ServiceBusUtils();
var queueReveiveClient1 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);
for (int i = 0; i < 10; i++)
{
var msg = queueReveiveClient1.Receive();
Console.WriteLine(string.Format("Received {0} MessageID: {1}", i, msg.MessageId));
}
var queueReveiveClient2 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);
for (int i = 0; i < 10; i++)
{
var msg = queueReveiveClient2.Receive();
Console.WriteLine(string.Format("Second received {0} MessageID: {1}", i, msg.MessageId));
msg.Complete();
}
////删除队列
//sbUtils.DeleteQueue(queueName);

Console.WriteLine("Receive Completed!");
}
DSC0002.png

  因为消息已经被第一个Consumer消费。
  通过本篇我们了解了Azure Messaging-ServiceBus Messaging对重复消息的处理机制。
  周国庆
  2017/3/16

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-389750-1-1.html 上篇帖子: Azure Redis Cache (4) 配置和管理Redis Cache 下篇帖子: 在Docker上部署使用Azure CLI镜像
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表