设为首页 收藏本站
查看: 892|回复: 0

[经验分享] (转) RabbitMQ学习之工作队列(java)

[复制链接]

尚未签到

发表于 2017-12-8 23:34:26 | 显示全部楼层 |阅读模式
  http://blog.csdn.net/zhu_tianwei/article/details/40887717
  参考:http://blog.csdn.NET/lmj623565791/article/details/37620057
  1.生产任务Task.Java



[java] view plain copy
  print?

  • package cn.slimsmart.rabbitmq.demo.workqueue;  

  • import com.rabbitmq.client.AMQP;  
  • import com.rabbitmq.client.Channel;  
  • import com.rabbitmq.client.Connection;  
  • import com.rabbitmq.client.ConnectionFactory;  
  • import com.rabbitmq.client.MessageProperties;  

  • public class Task {  

  •     //队列名称   
  •     private final static String QUEUE_NAME = "workqueue-durable";   

  •     public static void main(String[] args) throws Exception {  
  •          //创建连接和频道   
  •         ConnectionFactory factory = new ConnectionFactory();   
  •         factory.setHost("192.168.101.174");   
  •         //指定用户 密码  
  •         factory.setUsername("admin");  
  •         factory.setPassword("admin");  
  •         //指定端口  
  •         factory.setPort(AMQP.PROTOCOL.PORT);
  •         Connection connection = factory.newConnection();
  •         Channel channel = connection.createChannel();
  •         boolean durable = true; //设置消息持久化  RabbitMQ不允许使用不同的参数重新定义一个队列,所以已经存在的队列,我们无法修改其属性。  
  •         //声明队列   
  •         channel.queueDeclare(QUEUE_NAME, durable, false, false, null);   

  •         //发送10条消息,依次在消息后面附加1-10个点   
  •         for (int i = 5; i > 0; i--)   
  •         {
  •             String dots = "";   
  •             for (int j = 0; j <= i; j++)   
  •             {
  •                 dots += ".";   
  •             }
  •             String message = "helloworld" + dots+dots.length();   
  •             //MessageProperties.PERSISTENT_TEXT_PLAIN 标识我们的信息为持久化的  
  •             channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());   
  •             System.out.println("Sent Message:'" + message + "'");   
  •         }
  •         //关闭频道和资源   
  •         channel.close();
  •         connection.close();
  •     }

  • }
  2.消费工作队列



[java] view plain copy
  print?

  • package cn.slimsmart.rabbitmq.demo.workqueue;  

  • import com.rabbitmq.client.AMQP;  
  • import com.rabbitmq.client.Channel;  
  • import com.rabbitmq.client.Connection;  
  • import com.rabbitmq.client.ConnectionFactory;  
  • import com.rabbitmq.client.QueueingConsumer;  

  • public class Work {  
  •     //队列名称   
  •     private final static String QUEUE_NAME = "workqueue-durable";   

  •     public static void main(String[] args) throws Exception {  
  •          //区分不同工作进程的输出   
  •         int hashCode = Work.class.hashCode();   
  •         //创建连接和频道   
  •         ConnectionFactory factory = new ConnectionFactory();   
  •         factory.setHost("192.168.101.174");   
  •         //指定用户 密码  
  •         factory.setUsername("admin");  
  •         factory.setPassword("admin");  
  •         //指定端口  
  •         factory.setPort(AMQP.PROTOCOL.PORT);
  •         Connection connection = factory.newConnection();
  •         Channel channel = connection.createChannel();
  •         boolean durable = true; //设置消息持久化  RabbitMQ不允许使用不同的参数重新定义一个队列,所以已经存在的队列,我们无法修改其属性。  
  •         //声明队列   
  •         channel.queueDeclare(QUEUE_NAME, durable, false, false, null);   

  •         QueueingConsumer consumer = new QueueingConsumer(channel);   

  •         /**
  •          * ack= true: Round-robin 转发   消费者被杀死,消息会丢失
  •          * ack=false:消息应答 ,为了保证消息永远不会丢失,RabbitMQ支持消息应答(message acknowledgments)。
  •          * 消费者发送应答给RabbitMQ,告诉它信息已经被接收和处理,然后RabbitMQ可以自由的进行信息删除。
  •          * 如果消费者被杀死而没有发送应答,RabbitMQ会认为该信息没有被完全的处理,然后将会重新转发给别的消费者。
  •          * 通过这种方式,你可以确认信息不会被丢失,即使消者偶尔被杀死。
  •          * 消费者需要耗费特别特别长的时间是允许的。
  •          *  
  •          */  

  •         boolean ack = false ; //打开应答机制   
  •         // 指定消费队列   
  •         channel.basicConsume(QUEUE_NAME, ack, consumer);


  •         //公平转发  设置最大服务转发消息数量    只有在消费者空闲的时候会发送下一条信息。  
  •         int prefetchCount = 1;  
  •         channel.basicQos(prefetchCount);

  •         while (true)   
  •         {
  •             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  •             String message = new String(delivery.getBody());   

  •             System.out.println(hashCode + " Received Message:'" + message + "'");   
  •             doWork(message);
  •             System.out.println(hashCode + " Received Done");   
  •             //发送应答   
  •             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);   

  •         }
  •     }

  •     /**  
  •      * 每个点耗时1s  
  •      * @param task  
  •      * @throws InterruptedException  
  •      */   
  •     private static void doWork(String task) throws InterruptedException   
  •     {
  •         for (char ch : task.toCharArray())   
  •         {
  •             if (ch == '.')   
  •                 Thread.sleep(1000);   
  •         }
  •     }

  • }
  多启动几个消费者工作进程,使用生产者发送消息,可以观察消费情况。
  要了解RabbitMQ的路由机制,exchange是一个关键。exchange可以叫做交换机,也似乎可以叫做路由器,反正它是用来选择路由的。RabbitMQ的核心思想就是消息的发布者不是直接把消息发送到目标队列中的,事实上,通常它并不知道消息要发到哪个队列中,它只知道把消息队列发送到exchange中。exchange一边接收发送者发过来的消息,而另一边则把消息发送到目标队列中去。exchange一定知道哪些队列需要接收这个消息,是加到一个队列里还是加到好几个队列里,还是直接扔掉。

如果用空字符串去申明一个exchange,那么系统就会使用"amq.direct"这个exchange。前面我们使用的都是amq.direct类型。  channel.BasicPublish("", "TaskQueue", properties, bytes);
  direct exchange 发送消息是要看routingKey的。举个例子,定义了一个direct exchange 名字是X1,然后一个queue名字为Q1 用routingKey=K1 绑定到exchange X1上,当一个routeKey为 K2 的消息到达X1上,那么只有K1=K2的时候,这个消息才能到达Q1上。
  fanout类型的exchange就比较好理解。就是简单的广播,而且是忽略routingKey的。所以只要是有queue绑定到fanout exchange上,通过这个exchange发送的消息都会被发送到那些绑定的queue中,不管你有没有输入routingKey。
  Topic类型的exchange给与我们更大的灵活性。通过定义routingKey可以有选择的订阅某些消息,此时routingKey就会是一个表达式。exchange会通过匹配绑定的routingKey来决定是否要把消息放入对应的队列中。有两种表达式符号可以让我们选择:#和*。
  *(星号):代表任意的一个词。 例:*.a会匹配a.a,b.a,c.a等
  #(井号):代码任意的0个或多个词。 例:#.a会匹配a.a,aa.a,aaa.a等
  topic exchange 有时候的行为会像其他类型的exchange,比如说:
  当routingKey只是有#号的时候,它的行为和fanout的行为是一样的。
  当routingKey什么的没有,空字符串的时候,它的行为是和direct是一样的。
  要注意的是,符号代表的是词不是字符。RabbitMQ中在表达式中词的定义是以.(点号)分隔的。
  Headers类型的exchange使用的比较少。以后再说。
  下面主要用代码,实现一下direct、fanout、topic的效果。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422293-1-1.html 上篇帖子: python网络编程--RabbitMQ 下篇帖子: Java并发工具类之线程间数据交换工具Exchanger
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表