设为首页 收藏本站
查看: 1809|回复: 0

[经验分享] 【RabbitMQ】RabbitMQ在Windows的安装和简单的使用

[复制链接]
YunVN网友  发表于 2017-6-29 09:36:27 |阅读模式
版本说明
  使用当前版本:3.5.4

安装与启动
  在官网上下载其Server二进制安装包,在Windows上的安装时简单的,与一般软件没什么区别。
  
安装前会提示你,还需要安装Erlang,并打开下载页面。把他们都下载安装就ok了。(当然也可先行下载安装)
  
安装完,服务默认是启动的。
  
Erlang,应该是一个在并发编程方面很厉害的语言吧。
  后期可通过开始菜单启动。

简单的Java客户端连接
  编码中有些配置需要特别注意配置,比如:


  • 选择什么交换器,各种交换器的分发策略不一样。
  • 是否自动确认消息。如果RabbitMQ服务器收到该消息的确认消息,会认为该消息已经处理OK了,会把它从队列中删除。
  • 队列是否持久、消息是否持久,就是队列和消息在RabbitMQ服务器重启时是否恢复。
  • 队列是否自动删除,就是队列在无监听的情况下是否自动删除。
  引入的客户端包:
<dependency>  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>3.5.6</version>
  
</dependency>
  消费者:
package com.nicchagil.rabbit.No001MyFirstDemo;  

  
import java.io.IOException;
  
import java.util.concurrent.TimeoutException;
  

  
import com.rabbitmq.client.AMQP;
  
import com.rabbitmq.client.Channel;
  
import com.rabbitmq.client.Connection;
  
import com.rabbitmq.client.ConnectionFactory;
  
import com.rabbitmq.client.Consumer;
  
import com.rabbitmq.client.DefaultConsumer;
  
import com.rabbitmq.client.Envelope;
  


  
public>  

  private final static String QUEUE_NAME = &quot;hello world&quot;;
  

  public static void main(String[] argv) throws java.io.IOException,
  java.lang.InterruptedException, TimeoutException {
  

  /* 创建连接工厂 */
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost(&quot;localhost&quot;);
  /* 创建连接 */
  Connection connection = factory.newConnection();
  /* 创建信道 */
  Channel channel = connection.createChannel();
  

  // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
  channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  System.out.println(&quot;Waiting for messages.&quot;);
  

  /* 定义消费者 */
  Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope,
  AMQP.BasicProperties properties, byte[] body)
  throws IOException {
  String message = new String(body, &quot;UTF-8&quot;);
  System.out.println(&quot;Received the message -> &quot; + message);
  }
  };
  // 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
  channel.basicConsume(QUEUE_NAME, true, consumer);
  }
  
}
  生产者:
package com.nicchagil.rabbit.No001MyFirstDemo;  

  
import java.util.concurrent.TimeoutException;
  

  
import com.rabbitmq.client.Channel;
  
import com.rabbitmq.client.Connection;
  
import com.rabbitmq.client.ConnectionFactory;
  


  
public>  

  private final static String QUEUE_NAME = &quot;hello world&quot;;
  

  public static void main(String[] argv) throws java.io.IOException, TimeoutException {
  Connection connection = null;
  Channel channel = null;
  try {
  /* 创建连接工厂 */
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost(&quot;localhost&quot;);
  /* 创建连接 */
  connection = factory.newConnection();
  /* 创建信道 */
  channel = connection.createChannel();
  

  // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
  channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  String message = &quot;hello world...&quot;; // 需发送的信息
  /* 发送消息,使用默认的direct交换器 */
  channel.basicPublish(&quot;&quot;, QUEUE_NAME, null, message.getBytes());
  System.out.println(&quot;Send message -> &quot; + message);
  } finally {
  /* 关闭连接、通道 */
  channel.close();
  connection.close();
  System.out.println(&quot;Closed the channel and conn.&quot;);
  }
  

  }
  

  
}
  如无意外,每运行一次生产者(发送一次消息),消费者都会执行一次业务(接收到一次消息)。
  
执行了两次生产者后,日志如下:
Waiting for messages.  
Received the message -> hello world...
  
Received the message -> hello world...
  注:


  • MQ服务器收到确认(ack)信息后,会在队列中删除该消息。如果未收到确认消息,则会继续等待(不存在超时的概念),它直到执行该消息的消费者挂了,才把此遗留的消息重新分发给其它的消费者。
发布与订阅(fanout交换器)
  发布与订阅,类似于广播。
  
下面代码演示:两个消费者创建临时队列后绑定一个fanout类型的交换器,然后生产者往该交换器发送消息,消息被广播到两个绑定的队列中,队列将消息发送给各自的消费者,两个消费者接收到消息完成任务。
  消费者A:
package com.nicchagil.rabbit.No003Fadout;  

  
import java.io.IOException;
  
import java.util.concurrent.TimeoutException;
  

  
import com.rabbitmq.client.AMQP;
  
import com.rabbitmq.client.Channel;
  
import com.rabbitmq.client.Connection;
  
import com.rabbitmq.client.ConnectionFactory;
  
import com.rabbitmq.client.Consumer;
  
import com.rabbitmq.client.DefaultConsumer;
  
import com.rabbitmq.client.Envelope;
  


  
public>  

  public static void main(String[] argv) throws java.io.IOException,
  java.lang.InterruptedException, TimeoutException {
  

  /* 创建连接工厂 */
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost(&quot;localhost&quot;);
  /* 创建连接 */
  Connection connection = factory.newConnection();
  /* 创建信道 */
  Channel channel = connection.createChannel();
  

  // 创建一个临时的、私有的、自动删除、随机名称的临时队列
  String queueName = channel.queueDeclare().getQueue();
  System.out.println(&quot;queue : &quot; + queueName);
  channel.queueBind(queueName, &quot;amq.fanout&quot;, &quot;&quot;);
  System.out.println(FanoutCustomerA.class.getName() + &quot;, waiting for messages.&quot;);
  

  /* 定义消费者 */
  Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope,
  AMQP.BasicProperties properties, byte[] body)
  throws IOException {
  String message = new String(body, &quot;UTF-8&quot;);
  System.out.println(&quot;Received the message -> &quot; + message);
  }
  };
  // 开始消费(设置自动确认消息)
  channel.basicConsume(&quot;&quot;, true, consumer);
  }
  
}
  消费者B:
package com.nicchagil.rabbit.No003Fadout;  

  
import java.io.IOException;
  
import java.util.concurrent.TimeoutException;
  

  
import com.rabbitmq.client.AMQP;
  
import com.rabbitmq.client.Channel;
  
import com.rabbitmq.client.Connection;
  
import com.rabbitmq.client.ConnectionFactory;
  
import com.rabbitmq.client.Consumer;
  
import com.rabbitmq.client.DefaultConsumer;
  
import com.rabbitmq.client.Envelope;
  


  
public>  

  public static void main(String[] argv) throws java.io.IOException,
  java.lang.InterruptedException, TimeoutException {
  

  /* 创建连接工厂 */
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost(&quot;localhost&quot;);
  /* 创建连接 */
  Connection connection = factory.newConnection();
  /* 创建信道 */
  Channel channel = connection.createChannel();
  

  // 创建一个临时的、私有的、自动删除、随机名称的临时队列
  String queueName = channel.queueDeclare().getQueue();
  System.out.println(&quot;queue : &quot; + queueName);
  channel.queueBind(queueName, &quot;amq.fanout&quot;, &quot;&quot;);
  System.out.println(FanoutCustomerB.class.getName() + &quot;, waiting for messages.&quot;);
  

  /* 定义消费者 */
  Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope,
  AMQP.BasicProperties properties, byte[] body)
  throws IOException {
  String message = new String(body, &quot;UTF-8&quot;);
  System.out.println(&quot;Received the message -> &quot; + message);
  }
  };
  // 开始消费(设置自动确认消息)
  channel.basicConsume(&quot;&quot;, true, consumer);
  }
  
}
  

  生产者:
package com.nicchagil.rabbit.No003Fadout;  

  
import java.util.concurrent.TimeoutException;
  

  
import com.rabbitmq.client.Channel;
  
import com.rabbitmq.client.Connection;
  
import com.rabbitmq.client.ConnectionFactory;
  


  
public>  

  public static void main(String[] argv) throws java.io.IOException, TimeoutException {
  Connection connection = null;
  Channel channel = null;
  try {
  /* 创建连接工厂 */
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost(&quot;localhost&quot;);
  /* 创建连接 */
  connection = factory.newConnection();
  /* 创建信道 */
  channel = connection.createChannel();
  

  String message = &quot;hello world...&quot;; // 需发送的信息
  /* 发送消息,使用默认的fanout交换器 */
  channel.basicPublish(&quot;amq.fanout&quot;, &quot;&quot;, null, message.getBytes());
  System.out.println(&quot;Send message -> &quot; + message);
  } finally {
  /* 关闭连接、通道 */
  channel.close();
  connection.close();
  System.out.println(&quot;Closed the channel and conn.&quot;);
  }
  

  }
  

  
}
  先将两个消费者跑起来,然后运行生产者发送一条消息。
  
正常来说,消费者A、消费者B都收到消息并执行。
  消费者A的日志:
  

queue : amq.gen-F3EYfr68AHvfZTIJUcN_Ug  
com.nicchagil.rabbit.No003Fadout.FanoutCustomerA, waiting for messages.
  
Received the message -> hello world...
  

  

  消费者B的日志:
  

queue : amq.gen-AV_XDQtB-LFPK8bDy31PTw  
com.nicchagil.rabbit.No003Fadout.FanoutCustomerB, waiting for messages.
  
Received the message -> hello world...
  

  

管理控制台
  我们可以通过以下命令启用管理控制台:
rabbitmq-plugins enable rabbitmq_management  然后由此地址(http://localhost:15672)进入,默认端口是15672,默认账户是guest/guest。
  
进入后,可以看到Overview、Connections、Channels、Exchanges、Queues、Admin几个页签,此控制台的功能各种强大,不仅可以查看信息,还可以增、删信息,非常棒。

消费者的异常处理器
  如果消费者方法体中发生异常没被捕捉并处理,如果使用默认的异常处理器,消费者的信道会关闭,不继续执行任务。
  比如以下例子,遇到空字符串则抛出运行时异常:
package com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办;  

  
import java.io.IOException;
  
import java.util.concurrent.TimeoutException;
  

  
import com.rabbitmq.client.AMQP;
  
import com.rabbitmq.client.Channel;
  
import com.rabbitmq.client.Connection;
  
import com.rabbitmq.client.ConnectionFactory;
  
import com.rabbitmq.client.Consumer;
  
import com.rabbitmq.client.DefaultConsumer;
  
import com.rabbitmq.client.Envelope;
  


  
public>  

  private final static String QUEUE_NAME = &quot;hello world&quot;;
  

  public static void main(String[] argv) throws java.io.IOException,
  java.lang.InterruptedException, TimeoutException {
  

  /* 创建连接工厂、连接、通道 */
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost(&quot;localhost&quot;);
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  

  // 声明消息队列
  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  System.out.println(&quot;Waiting for messages.&quot;);
  

  /* 定义消费者 */
  Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope,
  AMQP.BasicProperties properties, byte[] body)
  throws IOException {
  String message = new String(body, &quot;UTF-8&quot;);
  if (message == null || message.length() == 0) {
  throw new RuntimeException(&quot;The input str is null or empty...&quot;);
  }
  System.out.println(&quot;Received the message -> &quot; + message);
  }
  };
  // 将消费者绑定到队列
  channel.basicConsume(QUEUE_NAME, true, consumer);
  }
  
}
  然后,我们用这个消费者监听一个队列,且此队列只有这个消费者,用于测试队列是否堵塞,也就是这个消费者是否不继续消费。
  先用生产者发送“hello world...”(正常参数),再发送“”(异常参数),最后发送“hello world...”(正常参数)。
  
可见如下日志,消费者发生异常后,没有响应第三个“hello world...”的消息,也可进入控制台,会发现此消息为Ready状态,等待消费。
  
原因在于,默认的异常处理器为DefaultExceptionHandler,其继承StrictExceptionHandler,从源码看,遇到异常它会关闭信道。
  
日志如下:
Waiting for messages.  
Received the message -> hello world...
  
com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办.Customer$1@630bd3f1 (amq.ctag-QzWI1jxh4h23rOFJM63cBA) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1):
  
java.lang.RuntimeException: The input str is null or empty...
  at com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办.Customer$1.handleDelivery(Customer.java:40)
  at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
  at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
  遇到异常,如果需使用别的处理方式,可以设置自定义的异常处理器。
  
以下的异常处理器,只是Demo。各方法体中只打印相关信息供查看:
package com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办;  

  
import com.rabbitmq.client.Channel;
  
import com.rabbitmq.client.Connection;
  
import com.rabbitmq.client.Consumer;
  
import com.rabbitmq.client.ExceptionHandler;
  
import com.rabbitmq.client.TopologyRecoveryException;
  


  
public>  

  public void handleUnexpectedConnectionDriverException(Connection conn,
  Throwable exception) {
  System.out.println(&quot;MyExceptionHandler.handleUnexpectedConnectionDriverException&quot;);
  }
  

  public void handleReturnListenerException(Channel channel,
  Throwable exception) {
  System.out.println(&quot;MyExceptionHandler.handleReturnListenerException&quot;);
  }
  

  public void handleFlowListenerException(Channel channel, Throwable exception) {
  System.out.println(&quot;MyExceptionHandler.handleFlowListenerException&quot;);
  }
  

  public void handleConfirmListenerException(Channel channel,
  Throwable exception) {
  System.out.println(&quot;MyExceptionHandler.handleConfirmListenerException&quot;);
  }
  

  public void handleBlockedListenerException(Connection connection,
  Throwable exception) {
  System.out.println(&quot;MyExceptionHandler.handleBlockedListenerException&quot;);
  }
  

  public void handleConsumerException(Channel channel, Throwable exception,
  Consumer consumer, String consumerTag, String methodName) {
  // 正常渠道应该有专业的LOG框架打印,此处简单处理
  exception.printStackTrace();
  System.out.println(&quot;MyExceptionHandler.handleConsumerException&quot;);
  }
  

  public void handleConnectionRecoveryException(Connection conn,
  Throwable exception) {
  System.out.println(&quot;MyExceptionHandler.handleConnectionRecoveryException&quot;);
  }
  

  public void handleChannelRecoveryException(Channel ch, Throwable exception) {
  System.out.println(&quot;MyExceptionHandler.handleChannelRecoveryException&quot;);
  }
  

  public void handleTopologyRecoveryException(Connection conn, Channel ch,
  TopologyRecoveryException exception) {
  System.out.println(&quot;MyExceptionHandler.handleTopologyRecoveryException&quot;);
  }
  

  
}
  设置自定义的异常处理器:
factory.setExceptionHandler(new MyExceptionHandler());  像上述那样,先传递“hello world...”,再传递“”(空字符串),最后传递“hello world...”。观察如下日志,可见发生异常后,消费者正常响应消息。
Waiting for messages.  
Received the message -> hello world...
  
java.lang.RuntimeException: The input str is null or empty...
  at com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办.Customer$1.handleDelivery(Customer.java:41)
  at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
  at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  
MyExceptionHandler.handleConsumerException
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
  
Received the message -> hello world...
  具体应该根据什么策略进行异常处理,这是个是值得深思的问题,与业务的性质有关。什么情况下消费者应不继续响应请求,什么情况下消费者应继续相应,这个在于业务的性质而定。

参考的优秀文章


  • RabbitMQ
  • The simplest thing that does something
  • Java Client API Guide

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-389195-1-1.html 上篇帖子: Windows下安装Confluence并破解汉化 下篇帖子: windows 下编译log4cxx(x64)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表