jrgf 发表于 2018-5-29 12:13:17

ActiveMQ私有云、公有云以及Docker环境高可用集群方案汇总

ActiveMQ软件概述
  ActiveMQ提供消息队列服务。
ActiveMQ高可用原理
  ActiveMQ高可用由三部分组成。
1.ActiveMQ的master-slave
  两个运行的ActiveMQ instance如果同时使用一套持久化存储,那么这两个ActiveMQ instance就会构成master-slave关系。持久化数据放在一个单独的文件系统目录上或者放在一个共享的文件系统目录上,这个目录中会有一个lock锁文件。谁先启动ActiveMQ instance谁就会抢占这个锁,谁抢占了这个锁谁就是master,slave运行在standby状态,只有master服务停止或者中断后,slave就会立刻抢占这个锁,成为新的master,而另一个ActiveMQ instance启动后无法抢占这个锁,会以slave方式运行。
2.ActiveMQ的networkConnectors
  ActiveMQ的networkConnectors可以实现多个mq之间进行路由,假设有两个mq,分别为brokerA和brokerB,当有一条消息发送到brokerA的队列queueName中,brokerA中队列queueName的消息就会路由到brokerB的队列queueName上,反之brokerB的消息也会路由到brokerA。   
networkConnectors可以配置成静态的(固定的)也可以配置成动态的,取决于ActiveMQ instance运行的网络环境。
3.ActiveMQ的failover客户端连接协议
  与其说是协议不如说是策略。failover传输是指在客户端连接列表中配置多个连接配置信息(称之为URIs list),当list中的某一个连接信息不可用时,就会尝试下一个连接信息,直到找到可用的连接信息。
ActiveMQ集群环境
  ActiveMQ可以运行在主机内,也可以运行在虚拟机内、更可以运行在Docker中。但无论运行在哪种平台中,都需要考虑集群容错问题。例如集群节点应该尽可能地位于不同的host上,避免host发生故障导致整个集群不可用的情况。   
根据公有云、私有云环境的不同和ActiveMQ实现集群方式的不同分为两种集群方案。
1.私有云环境下的ActiveMQ高可用集群方案
  在私有云环境下,网络是受自己控制的,ActiveMQ集群可以运行在同一个局域网内,利用局域网内特有的一些局域网协议实现集群,这些协议包括多播、VRRP等。多播能使一个或多个多播源只把数据包发送给特定的多播组,而只有加入该多播组的主机才能接收到数据包。一些分布在各处的进程需要以组的方式协同工作,组中的进程通常要给其他所有的成员发送消息。即有这样的一种方法能够给一些明确定义的组发送消息,这些组的成员数量虽然很多,但是与整个网络规模相比却很小。给这样一个组发送消息称为多点点播送,简称多播。多播技术可以用于自动发现,因此它能实现集群中的自动发现节点功能。借助多播技术,可以使用动态发现实现集群节点的自动添加和移除。
2.公有云环境下的ActiveMQ高可用集群方案
  在公有云环境中,往往不是受自己控制的,多台云主机也可能不在同一个局域网内,此时就不能利用多播技术实现集群应用。除了利用多播的自动发现实现集群自动发现节点外,同样可以使用静态配置的方式,告诉集群内的每一个节点它的的对等节点有谁。当集群内的某个节点发生故障时,静态配置的节点信息可能发生改变,导致集群内存在一个或多个不可用的节点地址,从而导致集群对于客户端而言不可用。或者当集群内的某个节点故障恢复后,不能动态的告知它的对等节点它已恢复,同样可能造成集群对客户端而言不可用或者无法提供相应的服务标准。这种情况下可以使用一种特殊的方式配置集群与客户端的连接,虽然客户端的集群节点配置是静态的,但是客户端可以通过某种方式智能迅速的判断集群中的节点是否可用,从而实现高可用。   
公有云环境+docker环境还需要考虑多主机环境下的容器间通信问题。
ActiveMQ集群环境配置之私有云环境配置
  # ActiveMQ references   
# http://activemq.apache.org/version-5-run-broker.html   
# http://activemq.apache.org/topologies.html   
# http://activemq.apache.org/version-5-topologies.html   
# http://activemq.apache.org/clustering.html   
# http://activemq.apache.org/replicated-message-store.html   
# http://activemq.apache.org/shared-file-system-master-slave.html   
# http://activemq.apache.org/failover-transport-reference.html   
# http://activemq.apache.org/configuring-version-5-transports.html   
# http://dgd2010.blog.51cto.com/1539422/1680244   
# http://activemq.apache.org/version-5-performance-tuning.html   
# http://activemq.apache.org/networks-of-brokers.html   
# SharedFile System Master Slave and Dynamic Discovery Clustering Design   
192.168.1.241 server1.51devops.comactivemq master,activemq cluster A   
192.168.1.242 server2.51devops.comactivemq slave   
192.168.1.243 server3.51devops.comnfs server,activemq cluster B   
# add new disk to 192.168.1.243   
fdisk /dev/sdb   
n   
p   
1
  w   
mkfs.xfs /dev/sdb1   
mkdir -p /data   
mount /dev/sdb1 /data   
mkdir /data/ActivemqSharedBrokerData   
# install nfs on 192.168.1.243   
yum -y install nfs-utils nfs-utils-lib   
chkconfig --levels 235 nfs on   
cat >/etc/exports<<eof   
/data192.168.1.0/255.255.255.0(rw,no_root_squash,no_all_squash,sync)   
eof   
exportfs -a   
exportfs -r   
# http://www.linuxquestions.org/questions/linux-networking-3/rpc-statd-not-running-872348/   
/etc/init.d/rpcbind start   
/etc/init.d/nfslock start   
/etc/init.d/nfs restart   
# end install nfs on 192.168.1.243   
# mount nfs on 192.168.1.241 192.168.1.242   
yum -y install nfs-utils nfs-utils-lib   
mkdir /data   
mount -t nfs -o rw 192.168.1.243:/data /data   
# 192.168.1.243:/data on /data type nfs (rw,vers=4,addr=192.168.1.243,clientaddr=192.168.1.241)   
# install java   
yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel   
# install activemq   
wget -c http://www.apache.org/dist/activemq/KEYS   
gpg --import KEYS   
wget -c https://www.apache.org/dist/activemq/5.13.1/apache-activemq-5.13.1-bin.tar.gz.asc   
wget -c http://apache.fayea.com/activemq/5.13.1/apache-activemq-5.13.1-bin.tar.gz   
gpg --verify apache-activemq-5.13.1-bin.tar.gz.asc   
tar zxf apache-activemq-5.13.1-bin.tar.gz   
mv apache-activemq-5.13.1 /usr/local/activemq   
ls /usr/local/activemq   
cd /usr/local/activemq   
# end install activemq   
# on 192.168.1.241
<broker xmlns="http://activemq.apache.org/schema/core"   
dataDirectory="${activemq.data}"   
brokerName="192.168.1.241" useJmx="true" advisorySupport="false"   
persistent="true" deleteAllMessagesOnStartup="false"   
useShutdownHook="false" schedulerSupport="true">   
<networkConnectors>   
    <networkConnector uri="multicast://default" />   
</networkConnectors>   
<transportConnectors>   
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />   
</transportConnectors>   
<persistenceAdapter>   
<kahaDB directory="/data/ActivemqSharedBrokerData"/>   
</persistenceAdapter>  # end on 192.168.1.241   
# on 192.168.1.242   
vim conf/activemq.xml   

<broker xmlns="http://activemq.apache.org/schema/core"   
dataDirectory="${activemq.data}"   
brokerName="192.168.1.242" useJmx="true" advisorySupport="false"   
persistent="true" deleteAllMessagesOnStartup="false"   
useShutdownHook="false" schedulerSupport="true">   
<networkConnectors>   
    <networkConnector uri="multicast://default" />   
</networkConnectors>   
<transportConnectors>   
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />   
</transportConnectors>   
<persistenceAdapter>   
<kahaDB directory="/data/ActivemqSharedBrokerData"/>   
</persistenceAdapter>  # end on 192.168.1.242   
# on 192.168.1.241 and 192.168.1.242   
cd /usr/local/activemq   
bin/activemq start   
bin/activemq status   
bin/activemq stop   
true > data/activemq.log   
bin/activemq restart   
sleep 2   
tail -n30 data/activemq.log   
# end on 192.168.1.241 and 192.168.1.242   
# on 192.168.1.243   
cd /usr/local/activemq   
vim conf/activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core"   
dataDirectory="${activemq.data}"   
brokerName="192.168.1.243" useJmx="true" advisorySupport="false"   
persistent="true" deleteAllMessagesOnStartup="false"   
useShutdownHook="false" schedulerSupport="true">   
<networkConnectors>   
    <networkConnector uri="multicast://default" />   
</networkConnectors>   
<transportConnectors>   
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />   
</transportConnectors>  bin/activemq start   
bin/activemq status   
bin/activemq stop   
true > data/activemq.log   
bin/activemq restart   
sleep 2   
tail -n30 data/activemq.log   

ActiveMQ集群环境配置之共有云环境配置
  公有云环境的配置与私有云环境的差异就在必须将动态配置换成静态配置。
<networkConnectors>   
    <networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/>   
</networkConnectors>ActiveMQ集群环境配置之共有云+Docker环境配置
  在Docker的加入后就有很大的不同了。首先Docker的配置文件(如/etc/hosts)中必须灵活的指定主机名称与IP地址的映射关系,这样使得ActiveMQ的配置文件更加灵活,不依赖于IP地址、具有更好的应用范围。其次要解决主机间容器通信的问题,每一个主机上运行的Docker容器之间必须能相互访问。   
如果想使用Docker,建议使用Linux 3.10版本以上的内核的操作系统,这样的Linux内核在CentOS7、Ubuntu14以上都支持。   
此处以CentOS7 1511为例,跨主机网络互通靠docker native plugin中的overlay实现。此方案中会用到一个kv存储,这个kv存储可以使用Consul、etcd等实现,此处用Consul实现。   
yum -y update   
history -c && shutdown -r now   
uname -a   
# Linux localhost.localdomain 3.10.0-327.10.1.el7.x86_64 #1 SMP Tue Feb 16 17:03:50 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux   
# set FQDN hostname use edit file or hostname command or nmtui command   
vim /etc/hostname   
setenforce 0   
# for service docker   
# Refer: https://www.docker.com/   
# Refer: https://docs.docker.com/linux/step_one/   
which curl 2>/dev/null || yum -y -q install curl   
curl -fsSL https://get.docker.com/gpg | gpg --import   
curl -fsSL https://get.docker.com/ | sh   
# for program docker-enter   
# Refer: http://dockerpool.com/static/books/docker_practice/container/enter.html   
which curl > /dev/null || apt-get -qq install -y curl   
# # cd /tmp; curl https://www.kernel.org/pub/linux/utils/util-linux/v2.24/util-linux-2.24.tar.gz | tar -zxf-; cd util-linux-2.24;   
# cd /tmp; wget -q https://www.kernel.org/pub/linux/utils/util-linux/v2.24/util-linux-2.24.tar.gz; tar xzvf util-linux-2.24.tar.gz   
# cd util-linux-2.24   
# ./configure --without-ncurses && make nsenter   
# cp nsenter '/usr/local/bin'   
which nsenter   
cd   
which wget 2>/dev/null || yum -y -q install wget   
wget -P ~ https://github.com/yeasy/docker_practice/raw/master/_local/.bashrc_docker;   
echo "[ -f ~/.bashrc_docker ] && . ~/.bashrc_docker" >> ~/.bashrc; source ~/.bashrc   
service docker start   
docker version   
rpm -ql docker-engine   
# if there are only two docker hosts commmunicated each other, then nameserver set to each other   
# if there are more than 3 docker hosts, then first node's nameserver set to last one, second set to first one, third set to second one   
vim /etc/resolv.conf   
vim /usr/lib/systemd/system/docker.service   
-H tcp://0.0.0.0:2376 -H unix:///var/run/docker.sock --cluster-store=consul://consul.service.dc1.consul.:8500 --cluster-advertise=eno16777728:2376   
systemctl daemon-reload   
systemctl restart docker   
systemctl status docker -l   
docker network create -d overlay interconnection   
docker network ls   
# after this, start a docker container with "--net interconnection", then containers running on different hosts can communited each other.   
mkdir -p /data/docker/activemq/data   
mkdir -p /data/docker/activemq/data/kahadb   
mkdir -p /data/docker/activemq/log-master   
mkdir -p /data/docker/activemq/log-slave   
mkdir -p /data/docker/activemq/conf
  vim /data/docker/activemq/conf/activemq.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
      <value>file:${activemq.conf}/credentials.properties</value>
    </property>
</bean>
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"></bean>
<broker xmlns="http://activemq.apache.org/schema/core" dataDirectory="${activemq.data}" brokerName="localhost" useJmx="true" advisorySupport="false" persistent="true" deleteAllMessagesOnStartup="false" useShutdownHook="false" >
    <networkConnectors>
      <networkConnector uri="static:(tcp://server1-activemq-01-master:61616,tcp://server2-activemq-02-master:61616)"/>
    </networkConnectors>
    <destinationPolicy>
      <policyMap>
      <policyEntries>
          <policyEntry topic="&gt;">
            <pendingMessageLimitStrategy>
            <constantPendingMessageLimitStrategy limit="1000" />
            </pendingMessageLimitStrategy>
          </policyEntry>
      </policyEntries>
      </policyMap>
    </destinationPolicy>
    <managementContext>
      <managementContext createConnector="false" />
    </managementContext>
    <persistenceAdapter>
      <kahaDB directory="/data/activemq/kahadb" />
    </persistenceAdapter>
    <systemUsage>
      <systemUsage>
      <memoryUsage>
          <memoryUsage percentOfJvmHeap="70" />
      </memoryUsage>
      <storeUsage>
          <storeUsage limit="100 gb" />
      </storeUsage>
      <tempUsage>
          <tempUsage limit="50 gb" />
      </tempUsage>
      </systemUsage>
    </systemUsage>
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
    </transportConnectors>
    <shutdownHooks>
      <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
    </shutdownHooks>
</broker>
<import resource="jetty.xml" />
</beans>  docker run --restart="always" --name='server1-activemq-01-master' --net interconnection -d --hostname=server1-activemq-01-master \   
-e 'ACTIVEMQ_NAME=amqp-srv1-master' \   
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \   
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \   
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \   
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \   
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \   
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \   
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \   
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e'ACTIVEMQ_MAX_MEMORY=4096' \   
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \   
-v /data/docker/activemq/data:/data/activemq \   
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \   
-v /data/docker/activemq/log-master:/var/log/activemq \   
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \   
-p 8161:8161 \   
-p 61616:61616 \   
-p 61613:61613 \   
webcenter/activemq
  docker run --restart="always" --name='server1-activemq-01-slave' --net interconnection -d --hostname=server1-activemq-01-slave \   
-e 'ACTIVEMQ_NAME=amqp-srv1-slave' \   
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \   
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \   
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \   
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \   
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \   
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \   
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \   
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e'ACTIVEMQ_MAX_MEMORY=4096' \   
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \   
-v /data/docker/activemq/data:/data/activemq \   
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \   
-v /data/docker/activemq/log-slave:/var/log/activemq \   
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \   
-p 8171:8161 \   
-p 61626:61616 \   
-p 61623:61613 \   
webcenter/activemq
  rm -rf /data/docker/activemq   
docker stop server1-activemq-01-master && docker rm server1-activemq-01-master   
docker stop server1-activemq-01-slave && docker rm server1-activemq-01-slave   
mkdir -p /data/docker/activemq/data   
mkdir -p /data/docker/activemq/data/kahadb   
mkdir -p /data/docker/activemq/log-master   
mkdir -p /data/docker/activemq/log-slave   
mkdir -p /data/docker/activemq/conf
  docker run --restart="always" --name='server2-activemq-02-master' -d --hostname=server2-activemq-02-master \   
-e 'ACTIVEMQ_NAME=amqp-srv2-master' \   
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \   
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \   
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \   
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \   
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \   
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \   
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \   
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e'ACTIVEMQ_MAX_MEMORY=4096' \   
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \   
-v /data/docker/activemq/data:/data/activemq \   
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \   
-v /data/docker/activemq/log-master:/var/log/activemq \   
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \   
-p 8161:8161 \   
-p 61616:61616 \   
-p 61613:61613 \   
webcenter/activemq
  docker run --restart="always" --name='server2-activemq-02-slave' -d --hostname=server2-activemq-02-slave \   
-e 'ACTIVEMQ_NAME=amqp-srv2-slave' \   
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \   
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \   
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \   
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \   
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \   
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \   
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \   
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e'ACTIVEMQ_MAX_MEMORY=4096' \   
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \   
-v /data/docker/activemq/data:/data/activemq \   
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \   
-v /data/docker/activemq/log-slave:/var/log/activemq \   
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \   
-p 8171:8161 \   
-p 61626:61616 \   
-p 61623:61613 \   
webcenter/activemq
  rm -rf /data/docker/activemq   
docker stop server2-activemq-02-master && docker rm server2-activemq-02-master   
docker stop server2-activemq-02-slave && docker rm server2-activemq-02-slave   

  cat /data/docker/activemq/log-master/activemq.log   
cat /data/docker/activemq/log-slave/activemq.log
  

  客户端连接ActiveMQ集群
  # About: "Caught: javax.jms.JMSSecurityException: User name or password is invalid. javax.jms.JMSSecurityException: User name or password is invalid."   
# Refer to https://github.com/disaster37/activemq/issues/15   
# At last, I find that only set "-e 'ACTIVEMQ_ADMIN_LOGIN=yourName' -e 'ACTIVEMQ_ADMIN_PASSWORD=yourPassword' \" like this self can login success, then I got a success!
  

  Client connection example:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","your_password","failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false");  完整的例子如下:
  activemq hello world writen with java
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* Hello world!
*/
public class activemq5Failover {
    public static void main(String[] args) throws Exception {
      thread(new HelloWorldProducer(), false);
      thread(new HelloWorldProducer(), false);
      thread(new HelloWorldConsumer(), false);
      Thread.sleep(1000);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldProducer(), false);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldProducer(), false);
      Thread.sleep(1000);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldProducer(), false);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldProducer(), false);
      thread(new HelloWorldProducer(), false);
      Thread.sleep(1000);
      thread(new HelloWorldProducer(), false);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldProducer(), false);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldProducer(), false);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldProducer(), false);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldConsumer(), false);
      thread(new HelloWorldProducer(), false);
    }
    public static void thread(Runnable runnable, boolean daemon) {
      Thread brokerThread = new Thread(runnable);
      brokerThread.setDaemon(daemon);
      brokerThread.start();
    }
    public static class HelloWorldProducer implements Runnable {
      public void run() {
            try {
                // Create a ConnectionFactory
//                Refer: http://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","your_password","failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false");
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
                // Create a MessageProducer from the Session to the Topic or Queue
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                // Create a messages
                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
                TextMessage message = session.createTextMessage(text);
                // Tell the producer to send the message
                System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
                producer.send(message);
                // Clean up
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
      }
    }
    public static class HelloWorldConsumer implements Runnable, ExceptionListener {
      public void run() {
            try {
                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","your_password","failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false");
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
                connection.setExceptionListener(this);
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
                // Create a MessageConsumer from the Session to the Topic or Queue
                MessageConsumer consumer = session.createConsumer(destination);
                // Wait for a message
                Message message = consumer.receive(1000);
                if (message instanceof TextMessage) {
                  TextMessage textMessage = (TextMessage) message;
                  String text = textMessage.getText();
                  System.out.println("Received: " + text);
                } else {
                  System.out.println("Received: " + message);
                }
                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
      }
      public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occured.Shutting down client.");
      }
    }
}  tag:activemq clustering,ActiveMQ 集群,ActiveMQ 负载均衡,ActiveMQ 主备,ActiveMQ 高可用   
--end--
页: [1]
查看完整版本: ActiveMQ私有云、公有云以及Docker环境高可用集群方案汇总