设为首页 收藏本站
查看: 968|回复: 0

[经验分享] spring boot实战(第十二篇)整合RabbitMQ

[复制链接]

尚未签到

发表于 2017-12-9 16:36:13 | 显示全部楼层 |阅读模式
前言
  本篇主要讲述Spring Boot与RabbitMQ的整合,内容非常简单,纯API的调用操作。 操作之间需要加入依赖Jar



[html] view plain copy

  • <dependency>  
  • <groupId>org.springframework.boot</groupId>  
  • <artifactId>spring-boot-starter-amqp</artifactId>  
  • lt;/dependency>  
消息生产者

不论是创建消息消费者或生产者都需要ConnectionFactory

ConnectionFactory配置

创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)




[html] view plain copy

  • @Configuration
  • public class AmqpConfig {

  •     public static final String EXCHANGE   = "spring-boot-exchange";  
  •     public static final String ROUTINGKEY = "spring-boot-routingKey";  

  •     @Bean
  •     public ConnectionFactory connectionFactory() {
  •         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
  •         connectionFactory.setAddresses("127.0.0.1:5672");
  •         connectionFactory.setUsername("guest");
  •         connectionFactory.setPassword("guest");
  •         connectionFactory.setVirtualHost("/");
  •         connectionFactory.setPublisherConfirms(true); //必须要设置
  •         return connectionFactory;
  •     }
  • }

这里需要显示调用



[html] view plain copy

  • connectionFactory.setPublisherConfirms(true);
才能进行消息的回调。


RabbitTemplate
通过使用RabbitTemplate来对开发者提供API操作




[html] view plain copy

  • @Bean
  • @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  • //必须是prototype类型
  • public RabbitTemplate rabbitTemplate() {
  •     RabbitTemplate template = new RabbitTemplate(connectionFactory());  
  •     return template;
  • }
这里设置为原型,具体的原因在后面会讲到

  在发送消息时通过调用RabbitTemplate中的如下方法



[html] view plain copy

  • public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)


  • exchange:交换机名称
  • routingKey:路由关键字
  • object:发送的消息内容
  • correlationData:消息ID


因此生产者代码详单简洁Send.java




[html] view plain copy

  • @Component
  • public class Send  {

  •     private RabbitTemplate rabbitTemplate;

  •     /**
  •      * 构造方法注入
  •      */
  •     @Autowired
  •     public Send(RabbitTemplate rabbitTemplate) {
  •         this.rabbitTemplate = rabbitTemplate;  
  •     }

  •     public void sendMsg(String content) {
  •         CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  
  •         rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
  •     }


  • }


如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate
实际的ConfirmCallback为最后一次申明的ConfirmCallback。
下面给出完整的生产者代码:




[html] view plain copy

  • package com.lkl.springboot.amqp;

  • import java.util.UUID;

  • import org.springframework.amqp.rabbit.core.RabbitTemplate;
  • import org.springframework.amqp.rabbit.support.CorrelationData;
  • import org.springframework.beans.factory.annotation.Autowired;
  • import org.springframework.stereotype.Component;

  • /**
  • * 消息生产者
  • *
  • * @author liaokailin
  • * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $
  • */
  • @Component
  • public class Send implements RabbitTemplate.ConfirmCallback {

  •     private RabbitTemplate rabbitTemplate;

  •     /**
  •      * 构造方法注入
  •      */
  •     @Autowired
  •     public Send(RabbitTemplate rabbitTemplate) {
  •         this.rabbitTemplate = rabbitTemplate;  
  •         rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
  •     }

  •     public void sendMsg(String content) {
  •         CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  
  •         rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
  •     }

  •     /**
  •      * 回调
  •      */
  •     @Override
  •     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  •         System.out.println(" 回调id:" + correlationData);
  •         if (ack) {
  •             System.out.println("消息成功消费");
  •         } else {
  •             System.out.println("消息消费失败:" + cause);
  •         }
  •     }

  • }

消息消费者
消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。
交换机




[html] view plain copy

  • /**
  •      * 针对消费者配置
  •         FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
  •         HeadersExchange :通过添加属性key-value匹配
  •         DirectExchange:按照routingkey分发到指定队列
  •         TopicExchange:多关键字匹配
  •      */
  •     @Bean
  •     public DirectExchange defaultExchange() {
  •         return new DirectExchange(EXCHANGE);
  •     }

在Spring Boot中交换机继承AbstractExchange类
  
队列





[html] view plain copy

  • @Bean
  •     public Queue queue() {
  •         return new Queue("spring-boot-queue", true); //队列持久

  •     }

绑定



[html] view plain copy

  • @Bean
  •   public Binding binding() {
  •       return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
  •   }

完成以上工作后,在spring boot中通过消息监听容器实现消息的监听,在消息到来时执行回调操作。
消息消费




[html] view plain copy

  • @Bean
  •   public SimpleMessageListenerContainer messageContainer() {
  •       SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
  •       container.setQueues(queue());
  •       container.setExposeListenerChannel(true);
  •       container.setMaxConcurrentConsumers(1);
  •       container.setConcurrentConsumers(1);
  •       container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
  •       container.setMessageListener(new ChannelAwareMessageListener() {

  •           @Override
  •           public void onMessage(Message message, Channel channel) throws Exception {
  •               byte[] body = message.getBody();  
  •               System.out.println("receive msg : " + new String(body));
  •               channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
  •           }
  •       });
  •       return container;
  •   }

下面给出完整的配置文件:




[html] view plain copy

  • package com.lkl.springboot.amqp;

  • import org.springframework.amqp.core.AcknowledgeMode;
  • import org.springframework.amqp.core.Binding;
  • import org.springframework.amqp.core.BindingBuilder;
  • import org.springframework.amqp.core.DirectExchange;
  • import org.springframework.amqp.core.Message;
  • import org.springframework.amqp.core.Queue;
  • import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  • import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  • import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
  • import org.springframework.amqp.rabbit.core.RabbitTemplate;
  • import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  • import org.springframework.beans.factory.config.ConfigurableBeanFactory;
  • import org.springframework.context.annotation.Bean;
  • import org.springframework.context.annotation.Configuration;
  • import org.springframework.context.annotation.Scope;

  • import com.rabbitmq.client.Channel;

  • /**
  • * Qmqp Rabbitmq
  • *
  • * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/
  • *
  • * @author lkl
  • * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $
  • */

  • @Configuration
  • public class AmqpConfig {

  •     public static final String EXCHANGE   = "spring-boot-exchange";  
  •     public static final String ROUTINGKEY = "spring-boot-routingKey";  

  •     @Bean
  •     public ConnectionFactory connectionFactory() {
  •         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
  •         connectionFactory.setAddresses("127.0.0.1:5672");
  •         connectionFactory.setUsername("guest");
  •         connectionFactory.setPassword("guest");
  •         connectionFactory.setVirtualHost("/");
  •         connectionFactory.setPublisherConfirms(true); //必须要设置
  •         return connectionFactory;
  •     }

  •     @Bean
  •     @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  •     //必须是prototype类型
  •     public RabbitTemplate rabbitTemplate() {
  •         RabbitTemplate template = new RabbitTemplate(connectionFactory());  
  •         return template;
  •     }

  •     /**
  •      * 针对消费者配置
  •      * 1. 设置交换机类型
  •      * 2. 将队列绑定到交换机
  •      *
  •      *
  •         FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
  •         HeadersExchange :通过添加属性key-value匹配
  •         DirectExchange:按照routingkey分发到指定队列
  •         TopicExchange:多关键字匹配
  •      */
  •     @Bean
  •     public DirectExchange defaultExchange() {
  •         return new DirectExchange(EXCHANGE);
  •     }

  •     @Bean
  •     public Queue queue() {
  •         return new Queue("spring-boot-queue", true); //队列持久

  •     }

  •     @Bean
  •     public Binding binding() {
  •         return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
  •     }

  •     @Bean
  •     public SimpleMessageListenerContainer messageContainer() {
  •         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
  •         container.setQueues(queue());
  •         container.setExposeListenerChannel(true);
  •         container.setMaxConcurrentConsumers(1);
  •         container.setConcurrentConsumers(1);
  •         container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
  •         container.setMessageListener(new ChannelAwareMessageListener() {

  •             @Override
  •             public void onMessage(Message message, Channel channel) throws Exception {
  •                 byte[] body = message.getBody();  
  •                 System.out.println("receive msg : " + new String(body));
  •                 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
  •             }
  •         });
  •         return container;
  •     }

  • }


以上完成 Spring Boot与RabbitMQ的整合

自动配置

在Spring Boot中实现了RabbitMQ的自动配置,在配置文件中添加如下配置信息



[html] view plain copy

  • spring.rabbitmq.host=localhost  
  • spring.rabbitmq.port=5672  
  • spring.rabbitmq.username=test  
  • spring.rabbitmq.password=test  
  • spring.rabbitmq.virtualHost=test  

后会自动创建ConnectionFactory以及RabbitTemplate对应Bean,为什么上面我们还需要手动什么呢?

自动创建的ConnectionFactory无法完成事件的回调,即没有设置下面的代码



[html] view plain copy

  • connectionFactory.setPublisherConfirms(true);


具体分析见后续文章的源码解读.

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422459-1-1.html 上篇帖子: RabbitMQ如何保证发送端消息的可靠投递-发生镜像队列发生故障转移时 下篇帖子: rabbitMQ教程(三) spring整合rabbitMQ代码实例
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表