窝窝插件 发表于 2015-11-14 06:38:21

Apache ActiveMQ 负载均衡

  前面讲到Apache ActiveMQ 集群配置方法 的实现,主要是为了解决点单故障的情况,如果要对AMQ进行分流、提高吞吐率,那么可以尝试搭建一个负载均衡的AMQ集群。
  要搭建这样的一个环境也是非常的简单,我们只需要增加几行配置项到activemq.xml里就好,剩下的事情全部都由AMQ去做。
  


  AMQ负载均衡的实现有三种方案:
  1、static
  2、Multicast Discovery
  3、MasterSlave Discovery


  可以参考官网实现:
  http://activemq.apache.org/networks-of-brokers.html


  


  本例使用的是静态路由static,他的缺点就是需要把已知的节点都要预先配置进去,不像动态路由灵活。
  在Apache ActiveMQ单点基本配置基础上,用新的activemq.xml替换:
  activemq.xml:


  

<beans
xmlns=&quot;http://www.springframework.org/schema/beans&quot;
xmlns:amq=&quot;http://activemq.apache.org/schema/core&quot;
xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot;
xsi:schemaLocation=&quot;http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd&quot;>
<bean class=&quot;org.springframework.beans.factory.config.PropertyPlaceholderConfigurer&quot;>
<property name=&quot;locations&quot;>
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<broker xmlns=&quot;http://activemq.apache.org/schema/core&quot; brokerName=&quot;brokerTester3&quot; dataDirectory=&quot;${activemq.data}&quot;>
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- 属性enableAudit=false,是防止消息在回流后被当做重复消息而不被转发 -->
<policyEntry queue=&quot;>&quot; producerFlowControl=&quot;false&quot; memoryLimit=&quot;10mb&quot; enableAudit=&quot;false&quot;>
<!-- 属性replayWhenNoConsumers=true,保证在该节点断开,并重启后,且consumers已经连接到另外一个节点上的情况下,消息自动回流到原始节点 -->
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers=&quot;true&quot;/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector=&quot;false&quot;/>
</managementContext>
<!-- 该节点的配置必须要在persistenceAdapter节点之前 -->
<networkConnectors>
<!-- 静态路由 -->
      <networkConnector uri=&quot;static:(tcp://192.168.0.87:61617)&quot; duplex=&quot;true&quot;/>
</networkConnectors>
<persistenceAdapter>
<kahaDB directory=&quot;${activemq.data}/kahadb&quot;
enableIndexWriteAsync=&quot;true&quot;
enableJournalDiskSyncs=&quot;false&quot;/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit=&quot;1400 mb&quot;/>
</memoryUsage>
<storeUsage>
<storeUsage limit=&quot;100 gb&quot;/>
</storeUsage>
<tempUsage>
<tempUsage limit=&quot;50 gb&quot;/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name=&quot;openwire&quot; uri=&quot;nio://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600&quot;/>
<transportConnector name=&quot;amqp&quot; uri=&quot;amqp://0.0.0.0:5671?maximumConnections=1000&wireformat.maxFrameSize=104857600&quot;/>
</transportConnectors>
<shutdownHooks>
<bean xmlns=&quot;http://www.springframework.org/schema/beans&quot; class=&quot;org.apache.activemq.hooks.SpringContextHook&quot; />
</shutdownHooks>
</broker>
<import resource=&quot;jetty.xml&quot;/>
</beans>



  
  同样需要注意的是,如果在一台设备上部署多个AMQ实例,要注意修改对应的端口号。


  


  e.g.:这里会存在一个问题,假设这里配置的节点数为3个节点(A,B,C),且配置了消息回流。
  1、假设节点A不接收消费者,只接受生产者;
  2、当消费者X连接到B节点消费A节点上的消息(由于AMQ的机制,B节点会预先消费A节点上一定数量的消息到B节点,默认值1000,且假设只有1000个消息待消费),在消费过程中B节点断开,且B节点上有未消费的消息存在,这时消费者X自动路由到C节点上想继续消费未消费完成的消息;
  3、由于当前的未消费的消息都存储于B节点上,当B节点重启后按照当前的配置方式,AMQ的逻辑应该是当X与A节点建立连接后B节点消息会回流,且可以正常消费,而当前消费者X是连接到了C节点上。
  


  如果在业务上发生上面的这个场景,以当前的集群负载配置方式是无法满足需求的。
  


  


  


  



版权声明:本文为博主原创文章,未经博主允许不得转载。
页: [1]
查看完整版本: Apache ActiveMQ 负载均衡