设为首页 收藏本站
查看: 919|回复: 0

[经验分享] Azure Messaging-ServiceBus Messaging消息队列技术系列3-消息顺序保证

[复制链接]

尚未签到

发表于 2017-6-30 13:29:14 | 显示全部楼层 |阅读模式
  上一篇:Window Azure ServiceBus Messaging消息队列技术系列2-编程SDK入门  http://www.cnblogs.com/tianqing/p/5944573.html
  介绍了Azure Service Bus的编程SDK(主要的编程接口)
  本文中我们以实际的使用场景来说明Azure Messaging是否支持以及如何编码实现:消息的收发顺序保证
  消息的收发在实际业务中往往是有顺序的:发送时1-2-3-4-5,接收时也必须是1-2-3-4-5,即FIFO特性。
  在本文的Demo中,我们模拟销售订单消息队列异步处理场景,消息体是一条SalesOrder,顺序发送,顺序接收。
  1. 我们还是使用上篇博客中在Windows Azure的Portal上建立好的NameSpaceservicebustest
  销售订单队列名称:OrderQueue
  2.简单封装一个Service Bus的工具类:ServiceBusUtils: 用于创建队列、删除队列、创建QueueClient、创建BrokerdMessage



using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
namespace AzureMessaging.FIFO
{
/// <summary>
/// ServiceBus工具类
/// </summary>
class ServiceBusUtils
{
//Namespace名称
private static readonly string namespaceName = "servicebustest";
/// <summary>
/// 创建队列
/// </summary>
/// <param name="queueName">队列名称</param>
/// <param name="isSession">是否支持会话</param>
public void CreateQueue(string queueName, bool isSession = true)
{
var namespaceClient = NamespaceManager.Create();
if (namespaceClient.QueueExists(queueName))
{
namespaceClient.DeleteQueue(queueName);
}
var queue = new QueueDescription(queueName) { RequiresSession = isSession };
namespaceClient.CreateQueue(queue);
}
/// <summary>
/// 删除队列
/// </summary>
/// <param name="queueName">队列名称</param>        
public void DeleteQueue(string queueName)
{
var namespaceClient = NamespaceManager.Create();
if (namespaceClient.QueueExists(queueName))
{
namespaceClient.DeleteQueue(queueName);
}
}

/// <summary>
/// 创建队列客户端
/// </summary>
/// <returns>队列客户端</returns>
public QueueClient GetQueueClient(string queueName, bool isSession = false, ReceiveMode mode = ReceiveMode.ReceiveAndDelete)
{
return QueueClient.Create(queueName, mode);
}
/// <summary>
/// 创建队列客户端
/// </summary>
/// <returns>队列客户端</returns>
public QueueClient GetReceiveQueueClient(string queueName, ReceiveMode mode = ReceiveMode.PeekLock)
{
var namespaceClient = NamespaceManager.Create();
return QueueClient.Create(queueName, mode);
}
/// <summary>
/// 构造消息
/// </summary>
/// <param name="serializableObject">可序列化的对象</param>
/// <returns>消息</returns>
public BrokeredMessage Create(Object serializableObject)
{
var serializer = new DataContractSerializer(serializableObject.GetType(),
new DataContractSerializerSettings() { IgnoreExtensionDataObject = true, PreserveObjectReferences = false });
var message = new BrokeredMessage(serializableObject);
message.Properties.Add("Type", serializableObject.GetType().ToString());
return message;
}
}
}
  2. 示例SalesOrder实体类



using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AzureMessaging.FIFO
{
/// <summary>
/// 销售订单类
/// </summary>
public class SalesOrder
{
/// <summary>
/// 订单ID
/// </summary>
public string OrderID { get; set; }
/// <summary>
/// 订单编号
/// </summary>
public string Code { get; set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreateTime { get; set; }
/// <summary>
/// 总价格
/// </summary>
public Decimal TotalPrice { get; set; }
/// <summary>
/// 产品ID
/// </summary>
public int ProductID { get; set; }
}
}
  3. 消息顺序发送
  向OrderQueue发送10条消息订单消息,输出每条消息的顺序号以及MessageID



private static readonly string queueName = "OrderQueue";
/// <summary>
/// 发送消息
/// </summary>
private static void MessageSend()
{
var sbUtils = new ServiceBusUtils();
//创建队列
sbUtils.CreateQueue(queueName, false);
//顺序发送消息到OrderQueue
var queueSendClient = sbUtils.GetQueueClient(queueName);
for (int i = 0; i < 10; i++)
{
var order = new SalesOrder() { OrderID = i.ToString(), Code = "SalesOrder_" + i, CreateTime = DateTime.Now, ProductID = 17967, TotalPrice = new decimal(19999) };
var message = sbUtils.Create(order);
queueSendClient.Send(message);
Console.WriteLine(string.Format("Send {0} Message: {1}", i, message.MessageId));
}
Console.WriteLine("Send Completed!");
}
  程序输出:
DSC0000.png

  4. 消息顺序接收
  消费OrderQueue中的消息,验证消息的接收顺序



private static readonly string queueName = "OrderQueue";
/// <summary>
/// 接收消息
/// </summary>
private static void MessageReceive()
{
int index = 0;
BrokeredMessage msg = null;
var sbUtils = new ServiceBusUtils();
var queueReveiveClient = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.ReceiveAndDelete);
while ((msg = queueReveiveClient.Receive(TimeSpan.FromMilliseconds(3))) != null)
{
Console.WriteLine(string.Format("Received {0} Message: {1}", index, msg.MessageId));
index++;
}
////删除队列
//sbUtils.DeleteQueue(queueName);

Console.WriteLine("Receive Completed!");
}
  程序输出:
DSC0001.png

  可以看出,Azure Messaging中ServiceBus对消息的收发是有顺序保证的。
  下一篇我们继续其他特性的验证和介绍。
  周国庆
  2017/3/9

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-389677-1-1.html 上篇帖子: Azure中block和Page的比较 Azure: Did You Know? Block vs Page Blobs 下篇帖子: Azure机器学习入门(四)模型发布为Web服务
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表