设为首页 收藏本站
查看: 680|回复: 0

[经验分享] Spring AMQP + Rabbit 配置多数据源消息队列

[复制链接]

尚未签到

发表于 2017-12-9 14:33:42 | 显示全部楼层 |阅读模式
  一般在稍微大一点的项目中,需要配置多个数据库数据源,最简单的方式是用 Spring 来实现,只需要继承 AbstractRoutingDataSource 类,实现 determineCurrentLookupKey 方法,再配合使用 ThreadLocal 就可以实现。
  但是如何实现 MQ 的多数据源呢?假设有部署在不同服务器上的两个消息队列,或者是同一服务器,不同 vhost 的消息队列,在一个项目中,我如何自由地选择从哪个队列收发消息呢?下面说说用 Spring AMQP + Rabbit 的实现过程及踩过的坑。
  最开始的单数据源的实现很简单,网上有好多博文可以参考,官网也有介绍。主要就是创建一个 xml 的配置文件,添加各种必要的配置,声明 connection-factory、rabbitListenerContainerFactory、rabbitTemplate、queue、exchange、binding 等等。然后用 RabbitTemplate 来发消息,用 @RabbitListener 注解来监听,用 queue 指定队列来收消息,这里就不赘述了。主要说一下,在现有的基础上实现多数据源的收发。
  先说配置方面,为了对比,下面先给出单数据源配置:



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
       requested-heartbeat="30" virtual-host="${rabbit.vhost}" channel-cache-size="50"/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="concurrentConsumers" value="16"/>
<property name="maxConcurrentConsumers" value="50"/>
</bean>
<rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/>
<!-- queue declare -->
<rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test"/>
<!-- bind queue to exchange -->
<rabbit:direct-exchange name="exchange" auto-delete="false" durable="true">
<rabbit:bindings>
<rabbit:binding queue="queue.test" key="rkey.test"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory" retry-template="retryTemplate" reply-timeout="60000"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500"/>
<property name="multiplier" value="10.0"/>
<property name="maxInterval" value="10000"/>
</bean>
</property>
</bean>
</beans>
  为了实现双数据源,查阅了很多资料,最初实现的配置如下:



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

<rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}"
password="${rabbit.password}" requested-heartbeat="30" virtual-host="${rabbit.vhost}" channel-cache-size="50"/>
<!-- 添加了一个连接工厂,参数从 properties 文件中取 -->
<rabbit:connection-factory id="rabbitConnectionFactory1" host="${rabbit.host1}" port="${rabbit.port1}" username="${rabbit.username1}"
       password="${rabbit.password1}" requested-heartbeat="30" virtual-host="${rabbit.vhost1}" channel-cache-size="50"/>

<!-- 添加 SimpleRoutingConnectionFactory 配置,将两个 Connection factory 配置好-->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="rabbitConnectionFactory" value-ref="rabbitConnectionFactory"/>
<entry key="rabbitConnectionFactory1" value-ref="rabbitConnectionFactory1"/>
</map>
</property>
</bean>
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>

<!-- 由于增加了一个连接工厂,ContainerFactory 的连接工厂改为新增的 ConnectionFactory  -->
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<!-- <property name="connectionFactory" ref="rabbitConnectionFactory"/> -->
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers" value="16"/>
<property name="maxConcurrentConsumers" value="50"/>
</bean>

<!-- queue declare,增加一个消息队列 -->
<rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test"/>
<rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test1"/>
<!-- bind queue to exchange -->
<rabbit:direct-exchange name="exchange" auto-delete="false" durable="true">
<rabbit:bindings>
<rabbit:binding queue="queue.test" key="rkey.test"></rabbit:binding>
<rabbit:binding queue="queue.test1" key="rkey.test1"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<!-- connection-factory 改为新增的 ConnectionFactory -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" reply-timeout="60000"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500"/>
<property name="multiplier" value="10.0"/>
<property name="maxInterval" value="10000"/>
</bean>
</property>
</bean>
</beans>
  改动都写在注释里了,主要就是增加了一个连接工厂的配置,其他配置做了一些相应的适配。
  发消息的时候,需要指定连接工厂,也就是说,你要往哪个消息服务器发:



    @Test
public void testSendMsg() {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), "rabbitConnectionFactory");
rabbitTemplate.convertAndSend("exchange", "rkey.test", "test");
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), "rabbitConnectionFactory1");
rabbitTemplate.convertAndSend("exchange", "rkey.test1", "test1");
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
  在发消息之前调用 SimpleResourceHolder.bind 绑定要使用的工厂,发完之后,调用 unbind 解除绑定。将上述代码封装为两个工具类,更好。
  然后,有一个大坑在前面。。。如何收消息?
  发消息要绑定连接工厂,指明往哪个消息服务器上发,收的时候,同样得指定要从哪个消息服务器上收。最开始没想到这点,以为只要指定队列名称就可以,如下:



    @RabbitListener(queues = "queue.test")
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
System.out.println(msg);
}
  然并卵,报了异常:



java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:116) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:94) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:456) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1158) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
  这个问题不好解决,查了很多资料都没用,比如这种方式:https://stackoverflow.com/questions/42784471/spring-amqp-mix-simpleroutingconnectionfactory-with-rabbitlistener  。
  无奈之下,只能试着看看 Spring 的 AMQP 怎么实现,看看有没有解决的办法,最开始想的是继承 Spring 的某个类来实现。然而,看来看去,很是头大,没有结果。
  最后无意间点到了 @RabbitListener 这个注解中,发现了有一个属性,瞬间感觉很兴奋,如下图:
DSC0000.jpg

  看了下注释,这里可以指定一个 containerFactory,感觉可以试试。首先只有一个 containerFactory,那就加一个吧。为了看的比较清晰,我把第一次添加的注释去掉了,于是配置成了这样:



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

<rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}"
password="${rabbit.password}" requested-heartbeat="30" virtual-host="${rabbit.vhost}" channel-cache-size="50"/>
<rabbit:connection-factory id="rabbitConnectionFactory1" host="${rabbit.host1}" port="${rabbit.port1}" username="${rabbit.username1}"
       password="${rabbit.password1}" requested-heartbeat="30" virtual-host="${rabbit.vhost1}" channel-cache-size="50"/>
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="rabbitConnectionFactory" value-ref="rabbitConnectionFactory"/>
<entry key="rabbitConnectionFactory1" value-ref="rabbitConnectionFactory1"/>
</map>
</property>
</bean>
<rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/>
<!-- 添加一个 rabbitAdmin-->
<rabbit:admin id="rabbitAdmin1" connection-factory="rabbitConnectionFactory1"/>

<!-- 把原有的 ContainerFactory 的连接工厂改为 rabbitConnectionFactory-->
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="concurrentConsumers" value="16"/>
<property name="maxConcurrentConsumers" value="50"/>
</bean>
<!-- 添加一个 ContainerFactory, 连接工厂为 rabbitConnectionFactory1-->
<bean id="rabbitListenerContainerFactory1"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="rabbitConnectionFactory1"/>
<property name="concurrentConsumers" value="16"/>
<property name="maxConcurrentConsumers" value="50"/>
</bean>
<rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test"/>
<rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test1"/>
<rabbit:direct-exchange name="exchange" auto-delete="false" durable="true">
<rabbit:bindings>
<rabbit:binding queue="queue.test" key="rkey.test"></rabbit:binding>
<rabbit:binding queue="queue.test1" key="rkey.test1"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" reply-timeout="60000"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500"/>
<property name="multiplier" value="10.0"/>
<property name="maxInterval" value="10000"/>
</bean>
</property>
</bean>
</beans>
  收消息的时候指定 container factory 即可:



    @RabbitListener(queues = "queue.test", containerFactory = "rabbitListenerContainerFactory")
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
System.out.println(msg);
}
  测试通过!
  以上配置、解决办法是尝试过多次以后得出的,所以还是要有耐心,多尝试。
  由于在网上没有找到解决办法,只有自己摸索着解决,如果大家有其他解决方案,欢迎留言讨论!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422433-1-1.html 上篇帖子: RabbitMQ 消息队列 应用 下篇帖子: windows下 安装 rabbitMQ 及操作常用命令(操作创建用户密码 角色等)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表