设为首页 收藏本站
查看: 814|回复: 0

[经验分享] RabbitMq入门与基本使用

[复制链接]

尚未签到

发表于 2017-7-2 11:13:45 | 显示全部楼层 |阅读模式
  这两天工作项目中用到了rabbitmq,顺便学习了一下。
  RabbitMq主要的使用模式有三种:工作队列,发布订阅和RPC远程调用。

1.工作队列
DSC0000.png

  生产者:



using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
        //一定要声明队列,向队列发送消息
channel.QueueDeclare(queue: "task_queue",
durable: true,    //队列是否持久化
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();    
properties.SetPersistent(true);    //消息是否持久化    

channel.BasicPublish(exchange: "",    //没有定义exchange,会使用系统默认的exchange
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
}
  在方法





channel.BasicPublish("", "task_queue", null, bytes);

中的第一个参数是需要输入一个exchange。在RabbitMQ中,所有的消息都必须要通过exchange发送到各个queue里面去。发送者发送消息,其实也就是把消息放到exchange中去。而exchange知道应该把消息放到哪里去。在这个方法中,我们没有输入exchange的名称,只是定义了一个空的echange,而在第二个参数routeKey中输入了我们目标队列的名称。RabbitMQ会帮我定义一个默认的exchange,这个exchange会把消息直接投递到

我们输入的队列中,这样服务端只需要直接去这个定义了的队列中获取消息就可以了。  消费者:



using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//修改分发机制(原先是轮询分发), prefetchCount = 1 变为 不向正在处理的worker发发任务,谁先有空就给谁
//In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting.
//This tells RabbitMQ not to give more than one message to a worker at a time.
//Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one.
//Instead, it will dispatch it to the next worker that is not still busy.
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine("
  • Waiting for messages.");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);
    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);
    Console.WriteLine(" [x] Done");
              
              //当noAck为false起作用,手动告知应答处理完成
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    };
    channel.BasicConsume(queue: "task_queue",
    noAck: false,   //是否不要手动应答(no manual Ack),ture自动应答,自动删除处理消息;false手动应答,服务器的消息会等待应答结果才消除
                                     consumer: consumer);
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
      这里要注意的,如果没有宿主进程,比如一个Console的后台程序,这个 Console.ReadLine(); 不能少,而且一定要加在这里。否则:
      1.程序自动退出。2.相关的变量出了生命周期范围,已经释放!笔者在这里吃过亏,找了半天才发现。

    2.发布订阅
      Exchange类型为四种:direct,fanout,topic,headers。此模式中,由于是通过exchange和routingkey发送给多个队列,所以Publish中不用声明队列,只需声明exchange。
      1、Routing - Exchange类型direct
    DSC0001.png

      他是根据交换器名称与routingkey来找队列的。
      Publish:



    using System;
    using System.Linq;
    using RabbitMQ.Client;
    using System.Text;
    class EmitLogDirect
    {
    public static void Main(string[] args)
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using(var connection = factory.CreateConnection())
    using(var channel = connection.CreateModel())
    {
    channel.ExchangeDeclare(exchange: "direct_logs",
    type: "direct");
    var severity = (args.Length > 0) ? args[0] : "info";
    var message = (args.Length > 1)
    ? string.Join(" ", args.Skip( 1 ).ToArray())
    : "Hello World!";
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "direct_logs",
    routingKey: severity,        //传来参数,指定的routekey
    basicProperties: null,
    body: body);
    Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
    }
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
      subscribe



    using System;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    class ReceiveLogsDirect
    {
    public static void Main(string[] args)
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using(var connection = factory.CreateConnection())
    using(var channel = connection.CreateModel())
    {
    channel.ExchangeDeclare(exchange: "direct_logs",
    type: "direct");
    var queueName = channel.QueueDeclare().QueueName;
    if(args.Length < 1)
    {
    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
    Environment.GetCommandLineArgs()[0]);
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    Environment.ExitCode = 1;
    return;
    }
    //同时绑定多个指定的routekey
    foreach(var severity in args)
    {
    channel.QueueBind(queue: queueName,
    exchange: "direct_logs",
    routingKey: severity);
    }
    Console.WriteLine("
  • Waiting for messages.");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    var routingKey = ea.RoutingKey;
    Console.WriteLine(" [x] Received '{0}':'{1}'",
    routingKey, message);
    };
    channel.BasicConsume(queue: queueName,
    noAck: true,
    consumer: consumer);
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
      2、Publish/Subscribe - Exchange类型fanout

       DSC0002.png
      这个类型忽略Routingkey,他为广播模式。
      广播式时,Publish可以不指定queue和routekey。



    using System;
    using RabbitMQ.Client;
    using System.Text;
    class EmitLog
    {
    public static void Main(string[] args)
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using(var connection = factory.CreateConnection())
    using(var channel = connection.CreateModel())
    {
    channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    var message = GetMessage(args);
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "logs",
    routingKey: "",
    basicProperties: null,
    body: body);
    Console.WriteLine(" [x] Sent {0}", message);
    }
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    private static string GetMessage(string[] args)
    {
    return ((args.Length > 0)
    ? string.Join(" ", args)
    : "info: Hello World!");
    }
    }
      subscribe可以只用临时队列接收



    using System;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    class ReceiveLogs
    {
    public static void Main()
    {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using(var connection = factory.CreateConnection())
    using(var channel = connection.CreateModel())
    {
    channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    //这里生成了一个随机队列(string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true)
    //In the .NET client, when we supply no parameters to queueDeclare() we create a non-durable, exclusive,
    //autodelete queue with a generated name:
    var queueName = channel.QueueDeclare().QueueName;
    channel.QueueBind(queue: queueName,
    exchange: "logs",
    routingKey: "");
    Console.WriteLine("
  • Waiting for logs.");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] {0}", message);
    };
    channel.BasicConsume(queue: queueName,
    noAck: true,
    consumer: consumer);
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
    }
    }
    }
      有一种简写的方式,用Subscription类



    /// <summary>
    /// 获取消息并处理
    /// </summary>
    /// <param name="queueName">队列名称</param>
    /// <param name="action">接收到消息后的Action</param>
    public void Receive(string queueName, Action<byte[]> action, bool multThread = true)
    {
    ConnectionFactory cf = new ConnectionFactory();
    cf.UserName = this.UserName;
    cf.Password = this.PassWord;
    cf.HostName = this.HostName;
    cf.Port = this.Port;
    cf.VirtualHost = this.VitualHost;
    using (IConnection conn = cf.CreateConnection())
    {
    using (IModel ch = conn.CreateModel())
    {
    //声明交换器
    ch.ExchangeDeclare(exchange: "e_linke1", type: "direct",durable: false);
    ch.QueueDeclare(queue: queueName,
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null);
    //将队列绑定到交换器上
                        ch.QueueBind(queue: queueName,
    exchange: "e_linke1",
    routingKey: "elk");
    using (Subscription sub = new Subscription(ch, queueName, true))
    {
    foreach (BasicDeliverEventArgs e in sub)
    {
    // handle the message contained in e ...
    // ... and finally acknowledge it
    if (multThread)
    {
    System.Threading.Tasks.Task.Factory.StartNew(() => { action(e.Body); });
    }
    else
    {
    action(e.Body);
    }
    sub.Ack(e);
    }
    }
    }
    }
    }
      注:
      如果有两个接收程序都是用了同一个的queue和相同的routingKey去绑定direct exchange的话,分发的行为是负载均衡的,也就是说第一个是程序1收到,第二个是程序2收到,以此类推。
      如果有两个接收程序用了各自的queue,但使用相同的routingKey去绑定direct exchange的话,分发的行为是复制的,也就是说每个程序都会收到这个消息的副本。行为相当于fanout类型的exchange。
      3、Exchange类型topic
       DSC0003.png
      这个类型的路由规则如果你掌握啦,那是相当的好用,与灵活。他是根据RoutingKey的设置,来做匹配的,其中这里还有两个通配符为:
      *,代表任意的一个词。例如topic.zlh.*,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
      #,代表任意多个词。例如topic.#,他能够匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
      4、Headers Exchange
      Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。之前的几种exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型。代码示例如下:
      发送端:



    channel.ExchangeDeclare("X1", "headers");
    IBasicProperties properties = channel.CreateBasicProperties();
    properties.Headers = new Hashtable();
    properties.Headers.Add("Key1", 123);
    properties.Headers.Add("Key2", 345);
    XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
    MemoryStream ms = new MemoryStream();
    xs.Serialize(ms, message);
    byte[] bytes = ms.ToArray();
    channel.BasicPublish("X1", "", properties, bytes);
      接收端:



    channel.ExchangeDeclare("X1", "headers");
    //随机创建一个队列
    string queue_name = channel.QueueDeclare("headerssubscriber2", true, false, false, null);
    //绑定
    IDictionary ht = new Hashtable();
    ht.Add("x-match", "any");
    ht.Add("Key1", 12345);
    ht.Add("Key2", 34567);
    channel.QueueBind(queue_name, "X1", "", ht);
    //定义这个队列的消费者
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    channel.BasicConsume(queue_name, true, consumer);
    while (true)
    {
    BasicDeliverEventArgs ea =
    (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    byte[] bytes = ea.Body;
    XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
    using (MemoryStream ms = new MemoryStream(bytes))
    {
    RequestMessage message = (RequestMessage)xs.Deserialize(ms);
    Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);
    }
    }

    3.RPC远程调用
    DSC0004.png

      参考链接:
      .Net使用RabbitMQ详解
      RabbitMQ消息队列(三):任务分发机制[转]
      一个winform带你玩转rabbitMQ

      .Net下RabbitMQ的使用(4) -- 订阅和发布  *
      .Net下RabbitMQ的使用(5) -- 路由机制 *
      .Net下RabbitMQ的使用(6) -- 持久化 *
      .Net下RabbitMQ的使用(7) -- 消息的传输控制 *
      .NET/C# Client API Guide [官网]
      RabbitMQ Tutorials [官网]
      .NET/C# Client API Guide [官网]

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390282-1-1.html 上篇帖子: rabbitMQ实战(一)---------使用pika库实现hello world 下篇帖子: RabbitMQ与Spring集成配置
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表