|
前面讲到Apache ActiveMQ 集群配置方法 的实现,主要是为了解决点单故障的情况,如果要对AMQ进行分流、提高吞吐率,那么可以尝试搭建一个负载均衡的AMQ集群。
要搭建这样的一个环境也是非常的简单,我们只需要增加几行配置项到activemq.xml里就好,剩下的事情全部都由AMQ去做。
AMQ负载均衡的实现有三种方案:
1、static
2、Multicast Discovery
3、MasterSlave Discovery
可以参考官网实现:
http://activemq.apache.org/networks-of-brokers.html
本例使用的是静态路由static,他的缺点就是需要把已知的节点都要预先配置进去,不像动态路由灵活。
在Apache ActiveMQ单点基本配置基础上,用新的activemq.xml替换:
activemq.xml:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerTester3" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- 属性enableAudit=false,是防止消息在回流后被当做重复消息而不被转发 -->
<policyEntry queue=">" producerFlowControl="false" memoryLimit="10mb" enableAudit="false">
<!-- 属性replayWhenNoConsumers=true,保证在该节点断开,并重启后,且consumers已经连接到另外一个节点上的情况下,消息自动回流到原始节点 -->
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!-- 该节点的配置必须要在persistenceAdapter节点之前 -->
<networkConnectors>
<!-- 静态路由 -->
<networkConnector uri="static:(tcp://192.168.0.87:61617)" duplex="true"/>
</networkConnectors>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"
enableIndexWriteAsync="true"
enableJournalDiskSyncs="false"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="1400 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="nio://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5671?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
</beans>
同样需要注意的是,如果在一台设备上部署多个AMQ实例,要注意修改对应的端口号。
e.g.:这里会存在一个问题,假设这里配置的节点数为3个节点(A,B,C),且配置了消息回流。
1、假设节点A不接收消费者,只接受生产者;
2、当消费者X连接到B节点消费A节点上的消息(由于AMQ的机制,B节点会预先消费A节点上一定数量的消息到B节点,默认值1000,且假设只有1000个消息待消费),在消费过程中B节点断开,且B节点上有未消费的消息存在,这时消费者X自动路由到C节点上想继续消费未消费完成的消息;
3、由于当前的未消费的消息都存储于B节点上,当B节点重启后按照当前的配置方式,AMQ的逻辑应该是当X与A节点建立连接后B节点消息会回流,且可以正常消费,而当前消费者X是连接到了C节点上。
如果在业务上发生上面的这个场景,以当前的集群负载配置方式是无法满足需求的。
版权声明:本文为博主原创文章,未经博主允许不得转载。 |
|
|
|
|
|
|