设为首页 收藏本站
查看: 1631|回复: 0

[经验分享] RabbitMQ原理与相关操作(三)消息持久化

[复制链接]

尚未签到

发表于 2017-7-4 20:20:20 | 显示全部楼层 |阅读模式
  现在聊一下RabbitMQ消息持久化:

问题及方案描述
  1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。
  这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。
  2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。
  这种情况可以使用RabbitMQ提供的消息队列的持久化机制。

相关理论描述

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我个人觉得大多数开发人员都会选择持久化。

队列和交换机有一个创建时候指定的标志durabledurable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。

消息队列持久化包括3个部分:

1、exchange持久化,在声明时指定durable => true
2、queue持久化,在声明时指定durable => true
3、消息持久化,在投递时指定delivery_mode=> 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

注意:一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建


程序示例

生产者


DSC0000.gif DSC0001.gif


class Producter
{
const string ExchangeName = "eric.exchange";
const string QueueName = "eric.queue";
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
                channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
string message = "Eric is very handsome";
var body = Encoding.UTF8.GetBytes(message);
//将队列设置为持久化之后,还需要将消息也设为可持久化的
var props = channel.CreateBasicProperties();
props.SetPersistent(true);
channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body);
Console.WriteLine("Producter Sent: {0}", message);
Console.ReadKey();
}
}
}
View Code  注:ack是 acknowledgments 的缩写,noAck 是("no manual acks")
  因为我前段时间换了笔记本,所以用户的“eric”的操作出踩了个坑,下面进行介绍下:
  如果调试运行时报错:None of the specified endpoints were reachable
  innerException是:



{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text=\"Unexpected Exception\", classId=0, methodId=0, cause=System.IO.IOException: 无法从传输连接中读取数据: 远程主机强迫关闭了一个现有的连接。。 ---> System.Net.Sockets.SocketException: 远程主机强迫关闭了一个现有的连接。\r\n   在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)\r\n   在 System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)\r\n   --- 内部异常堆栈跟踪的结尾 ---\r\n   在 RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)\r\n   在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()\r\n   在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()\r\n   在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}
  这说明我们使用的用户 不是 系统默认的 guest 而是我们自己创建的用户,但是没有足够的权限进行操作。
  解决办法:



rabbitmqctl set_user_tags username administrator
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
  执行结果:
DSC0002.png

  相关其他操作见:windows下 安装 rabbitMQ 及操作常用命令
  程序运行结果:
DSC0003.png


消费者





class Recevice
{
const string ExchangeName = "eric.exchange";
const string QueueName = "eric.queue";
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
                channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);
//NoAck:true 告诉RabbitMQ立即从队列中删除消息,另一个非常受欢迎的方式是从队列中删除已经确认接收的消息,可以通过单独调用BasicAck 进行确认:
//BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false);
var msgContent = Encoding.UTF8.GetString(msgResponse.Body);
Console.WriteLine("The received content:"+msgContent);
channel.BasicAck(msgResponse.DeliveryTag, multiple: false);
//使用BasicAck方式来告之是否从队列中移除该条消息
//需要额外注意,比如从队列中获取消息并用它来操作数据库或日志文件时,如果出现操作失败时,则该条消息应该保留在队列中,只到操作成功时才从队列中移除。
                Console.ReadKey();
}
}
}
View Code  接受消息还有一种方法,就是通过基于推送的事件订阅。可以使用内置的 QueueingBasicConsumer 提供简化的编程模型,允许在共享队列上阻塞,直到收到一条消息。





var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
var msgResponse = consumer.Queue.Dequeue();
var msgContent = Encoding.UTF8.GetString(msgResponse.Body);
View Code  程序运行结果:
DSC0004.png

  参考:http://www.cnblogs.com/shanyou/p/4067250.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390770-1-1.html 上篇帖子: 1,排序算法 下篇帖子: linux中ssh可以登录sftp不能登录解决办法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表