设为首页 收藏本站
查看: 1075|回复: 0

[经验分享] ActiveMQ私有云、公有云以及Docker环境高可用集群方案汇总

[复制链接]

尚未签到

发表于 2018-5-29 12:13:17 | 显示全部楼层 |阅读模式
ActiveMQ软件概述
  ActiveMQ提供消息队列服务。
ActiveMQ高可用原理
  ActiveMQ高可用由三部分组成。
1.ActiveMQ的master-slave
  两个运行的ActiveMQ instance如果同时使用一套持久化存储,那么这两个ActiveMQ instance就会构成master-slave关系。持久化数据放在一个单独的文件系统目录上或者放在一个共享的文件系统目录上,这个目录中会有一个lock锁文件。谁先启动ActiveMQ instance谁就会抢占这个锁,谁抢占了这个锁谁就是master,slave运行在standby状态,只有master服务停止或者中断后,slave就会立刻抢占这个锁,成为新的master,而另一个ActiveMQ instance启动后无法抢占这个锁,会以slave方式运行。
2.ActiveMQ的networkConnectors
  ActiveMQ的networkConnectors可以实现多个mq之间进行路由,假设有两个mq,分别为brokerA和brokerB,当有一条消息发送到brokerA的队列queueName中,brokerA中队列queueName的消息就会路由到brokerB的队列queueName上,反之brokerB的消息也会路由到brokerA。   
networkConnectors可以配置成静态的(固定的)也可以配置成动态的,取决于ActiveMQ instance运行的网络环境。
3.ActiveMQ的failover客户端连接协议
  与其说是协议不如说是策略。failover传输是指在客户端连接列表中配置多个连接配置信息(称之为URIs list),当list中的某一个连接信息不可用时,就会尝试下一个连接信息,直到找到可用的连接信息。
ActiveMQ集群环境
  ActiveMQ可以运行在主机内,也可以运行在虚拟机内、更可以运行在Docker中。但无论运行在哪种平台中,都需要考虑集群容错问题。例如集群节点应该尽可能地位于不同的host上,避免host发生故障导致整个集群不可用的情况。   
根据公有云、私有云环境的不同和ActiveMQ实现集群方式的不同分为两种集群方案。
1.私有云环境下的ActiveMQ高可用集群方案
  在私有云环境下,网络是受自己控制的,ActiveMQ集群可以运行在同一个局域网内,利用局域网内特有的一些局域网协议实现集群,这些协议包括多播、VRRP等。多播能使一个或多个多播源只把数据包发送给特定的多播组,而只有加入该多播组的主机才能接收到数据包。一些分布在各处的进程需要以组的方式协同工作,组中的进程通常要给其他所有的成员发送消息。即有这样的一种方法能够给一些明确定义的组发送消息,这些组的成员数量虽然很多,但是与整个网络规模相比却很小。给这样一个组发送消息称为多点点播送,简称多播。多播技术可以用于自动发现,因此它能实现集群中的自动发现节点功能。借助多播技术,可以使用动态发现实现集群节点的自动添加和移除。
2.公有云环境下的ActiveMQ高可用集群方案
  在公有云环境中,往往不是受自己控制的,多台云主机也可能不在同一个局域网内,此时就不能利用多播技术实现集群应用。除了利用多播的自动发现实现集群自动发现节点外,同样可以使用静态配置的方式,告诉集群内的每一个节点它的的对等节点有谁。当集群内的某个节点发生故障时,静态配置的节点信息可能发生改变,导致集群内存在一个或多个不可用的节点地址,从而导致集群对于客户端而言不可用。或者当集群内的某个节点故障恢复后,不能动态的告知它的对等节点它已恢复,同样可能造成集群对客户端而言不可用或者无法提供相应的服务标准。这种情况下可以使用一种特殊的方式配置集群与客户端的连接,虽然客户端的集群节点配置是静态的,但是客户端可以通过某种方式智能迅速的判断集群中的节点是否可用,从而实现高可用。   
公有云环境+docker环境还需要考虑多主机环境下的容器间通信问题。
ActiveMQ集群环境配置之私有云环境配置
  # ActiveMQ references   
# http://activemq.apache.org/version-5-run-broker.html     
# http://activemq.apache.org/topologies.html     
# http://activemq.apache.org/version-5-topologies.html     
# http://activemq.apache.org/clustering.html     
# http://activemq.apache.org/replicated-message-store.html     
# http://activemq.apache.org/shared-file-system-master-slave.html     
# http://activemq.apache.org/failover-transport-reference.html     
# http://activemq.apache.org/configuring-version-5-transports.html     
# http://dgd2010.blog.51cto.com/1539422/1680244     
# http://activemq.apache.org/version-5-performance-tuning.html     
# http://activemq.apache.org/networks-of-brokers.html     
# SharedFile System Master Slave and Dynamic Discovery Clustering Design     
192.168.1.241 server1.51devops.com  activemq master,activemq cluster A     
192.168.1.242 server2.51devops.com  activemq slave     
192.168.1.243 server3.51devops.com  nfs server,activemq cluster B     
# add new disk to 192.168.1.243     
fdisk /dev/sdb     
n     
p     
1
  w   
mkfs.xfs /dev/sdb1     
mkdir -p /data     
mount /dev/sdb1 /data     
mkdir /data/ActivemqSharedBrokerData     
# install nfs on 192.168.1.243     
yum -y install nfs-utils nfs-utils-lib     
chkconfig --levels 235 nfs on     
cat >/etc/exports<<eof     
/data  192.168.1.0/255.255.255.0(rw,no_root_squash,no_all_squash,sync)     
eof     
exportfs -a     
exportfs -r     
# http://www.linuxquestions.org/questions/linux-networking-3/rpc-statd-not-running-872348/     
/etc/init.d/rpcbind start     
/etc/init.d/nfslock start     
/etc/init.d/nfs restart     
# end install nfs on 192.168.1.243     
# mount nfs on 192.168.1.241 192.168.1.242     
yum -y install nfs-utils nfs-utils-lib     
mkdir /data     
mount -t nfs -o rw 192.168.1.243:/data /data     
# 192.168.1.243:/data on /data type nfs (rw,vers=4,addr=192.168.1.243,clientaddr=192.168.1.241)     
# install java     
yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel     
# install activemq     
wget -c http://www.apache.org/dist/activemq/KEYS     
gpg --import KEYS     
wget -c https://www.apache.org/dist/activemq/5.13.1/apache-activemq-5.13.1-bin.tar.gz.asc     
wget -c http://apache.fayea.com/activemq/5.13.1/apache-activemq-5.13.1-bin.tar.gz     
gpg --verify apache-activemq-5.13.1-bin.tar.gz.asc     
tar zxf apache-activemq-5.13.1-bin.tar.gz     
mv apache-activemq-5.13.1 /usr/local/activemq     
ls /usr/local/activemq     
cd /usr/local/activemq     
# end install activemq     
# on 192.168.1.241
<broker xmlns="http://activemq.apache.org/schema/core"     
dataDirectory="${activemq.data}"     
brokerName="192.168.1.241" useJmx="true" advisorySupport="false"     
persistent="true" deleteAllMessagesOnStartup="false"     
useShutdownHook="false" schedulerSupport="true">     
<networkConnectors>     
    <networkConnector uri="multicast://default" />     
</networkConnectors>     
<transportConnectors>     
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />     
</transportConnectors>     
<persistenceAdapter>     
  <kahaDB directory="/data/ActivemqSharedBrokerData"/>     
</persistenceAdapter>  # end on 192.168.1.241     
# on 192.168.1.242     
vim conf/activemq.xml     

<broker xmlns="http://activemq.apache.org/schema/core"     
dataDirectory="${activemq.data}"     
brokerName="192.168.1.242" useJmx="true" advisorySupport="false"     
persistent="true" deleteAllMessagesOnStartup="false"     
useShutdownHook="false" schedulerSupport="true">     
<networkConnectors>     
    <networkConnector uri="multicast://default" />     
</networkConnectors>     
<transportConnectors>     
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />     
</transportConnectors>     
<persistenceAdapter>     
  <kahaDB directory="/data/ActivemqSharedBrokerData"/>     
</persistenceAdapter>  # end on 192.168.1.242     
# on 192.168.1.241 and 192.168.1.242     
cd /usr/local/activemq     
bin/activemq start     
bin/activemq status     
bin/activemq stop     
true > data/activemq.log     
bin/activemq restart     
sleep 2     
tail -n30 data/activemq.log     
# end on 192.168.1.241 and 192.168.1.242     
# on 192.168.1.243     
cd /usr/local/activemq     
vim conf/activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core"     
dataDirectory="${activemq.data}"     
brokerName="192.168.1.243" useJmx="true" advisorySupport="false"     
persistent="true" deleteAllMessagesOnStartup="false"     
useShutdownHook="false" schedulerSupport="true">     
<networkConnectors>     
    <networkConnector uri="multicast://default" />     
</networkConnectors>     
<transportConnectors>     
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />     
</transportConnectors>  bin/activemq start     
bin/activemq status     
bin/activemq stop     
true > data/activemq.log     
bin/activemq restart     
sleep 2     
tail -n30 data/activemq.log     

ActiveMQ集群环境配置之共有云环境配置
  公有云环境的配置与私有云环境的差异就在必须将动态配置换成静态配置。
<networkConnectors>     
    <networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/>   
</networkConnectors>ActiveMQ集群环境配置之共有云+Docker环境配置
  在Docker的加入后就有很大的不同了。首先Docker的配置文件(如/etc/hosts)中必须灵活的指定主机名称与IP地址的映射关系,这样使得ActiveMQ的配置文件更加灵活,不依赖于IP地址、具有更好的应用范围。其次要解决主机间容器通信的问题,每一个主机上运行的Docker容器之间必须能相互访问。   
如果想使用Docker,建议使用Linux 3.10版本以上的内核的操作系统,这样的Linux内核在CentOS7、Ubuntu14以上都支持。     
此处以CentOS7 1511为例,跨主机网络互通靠docker native plugin中的overlay实现。此方案中会用到一个kv存储,这个kv存储可以使用Consul、etcd等实现,此处用Consul实现。     
yum -y update     
history -c && shutdown -r now     
uname -a     
# Linux localhost.localdomain 3.10.0-327.10.1.el7.x86_64 #1 SMP Tue Feb 16 17:03:50 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux     
# set FQDN hostname use edit file or hostname command or nmtui command     
vim /etc/hostname     
setenforce 0     
# for service docker     
# Refer: https://www.docker.com/     
# Refer: https://docs.docker.com/linux/step_one/     
which curl 2>/dev/null || yum -y -q install curl     
curl -fsSL https://get.docker.com/gpg | gpg --import     
curl -fsSL https://get.docker.com/ | sh     
# for program docker-enter     
# Refer: http://dockerpool.com/static/books/docker_practice/container/enter.html     
which curl > /dev/null || apt-get -qq install -y curl     
# # cd /tmp; curl https://www.kernel.org/pub/linux/utils/util-linux/v2.24/util-linux-2.24.tar.gz | tar -zxf-; cd util-linux-2.24;     
# cd /tmp; wget -q https://www.kernel.org/pub/linux/utils/util-linux/v2.24/util-linux-2.24.tar.gz; tar xzvf util-linux-2.24.tar.gz     
# cd util-linux-2.24     
# ./configure --without-ncurses && make nsenter     
# cp nsenter '/usr/local/bin'     
which nsenter     
cd     
which wget 2>/dev/null || yum -y -q install wget     
wget -P ~ https://github.com/yeasy/docker_practice/raw/master/_local/.bashrc_docker;     
echo "[ -f ~/.bashrc_docker ] && . ~/.bashrc_docker" >> ~/.bashrc; source ~/.bashrc     
service docker start     
docker version     
rpm -ql docker-engine     
# if there are only two docker hosts commmunicated each other, then nameserver set to each other     
# if there are more than 3 docker hosts, then first node's nameserver set to last one, second set to first one, third set to second one     
vim /etc/resolv.conf     
vim /usr/lib/systemd/system/docker.service     
-H tcp://0.0.0.0:2376 -H unix:///var/run/docker.sock --cluster-store=consul://consul.service.dc1.consul.:8500 --cluster-advertise=eno16777728:2376     
systemctl daemon-reload     
systemctl restart docker     
systemctl status docker -l     
docker network create -d overlay interconnection     
docker network ls     
# after this, start a docker container with "--net interconnection", then containers running on different hosts can communited each other.     
mkdir -p /data/docker/activemq/data     
mkdir -p /data/docker/activemq/data/kahadb     
mkdir -p /data/docker/activemq/log-master     
mkdir -p /data/docker/activemq/log-slave     
mkdir -p /data/docker/activemq/conf
  vim /data/docker/activemq/conf/activemq.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
      <value>file:${activemq.conf}/credentials.properties</value>
    </property>
  </bean>
  <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"></bean>
  <broker xmlns="http://activemq.apache.org/schema/core" dataDirectory="${activemq.data}" brokerName="localhost" useJmx="true" advisorySupport="false" persistent="true" deleteAllMessagesOnStartup="false" useShutdownHook="false" >
    <networkConnectors>
      <networkConnector uri="static:(tcp://server1-activemq-01-master:61616,tcp://server2-activemq-02-master:61616)"/>
    </networkConnectors>
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic="&gt;">
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="1000" />
            </pendingMessageLimitStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>
    <managementContext>
      <managementContext createConnector="false" />
    </managementContext>
    <persistenceAdapter>
      <kahaDB directory="/data/activemq/kahadb" />
    </persistenceAdapter>
    <systemUsage>
      <systemUsage>
        <memoryUsage>
          <memoryUsage percentOfJvmHeap="70" />
        </memoryUsage>
        <storeUsage>
          <storeUsage limit="100 gb" />
        </storeUsage>
        <tempUsage>
          <tempUsage limit="50 gb" />
        </tempUsage>
      </systemUsage>
    </systemUsage>
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
    </transportConnectors>
    <shutdownHooks>
      <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
    </shutdownHooks>
  </broker>
  <import resource="jetty.xml" />
</beans>  docker run --restart="always" --name='server1-activemq-01-master' --net interconnection -d --hostname=server1-activemq-01-master \   
-e 'ACTIVEMQ_NAME=amqp-srv1-master' \     
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \     
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \     
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \     
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \     
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \     
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \     
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \     
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e  'ACTIVEMQ_MAX_MEMORY=4096' \     
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \     
-v /data/docker/activemq/data:/data/activemq \     
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \     
-v /data/docker/activemq/log-master:/var/log/activemq \     
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \     
-p 8161:8161 \     
-p 61616:61616 \     
-p 61613:61613 \     
webcenter/activemq
  docker run --restart="always" --name='server1-activemq-01-slave' --net interconnection -d --hostname=server1-activemq-01-slave \   
-e 'ACTIVEMQ_NAME=amqp-srv1-slave' \     
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \     
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \     
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \     
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \     
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \     
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \     
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \     
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e  'ACTIVEMQ_MAX_MEMORY=4096' \     
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \     
-v /data/docker/activemq/data:/data/activemq \     
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \     
-v /data/docker/activemq/log-slave:/var/log/activemq \     
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \     
-p 8171:8161 \     
-p 61626:61616 \     
-p 61623:61613 \     
webcenter/activemq
  rm -rf /data/docker/activemq   
docker stop server1-activemq-01-master && docker rm server1-activemq-01-master     
docker stop server1-activemq-01-slave && docker rm server1-activemq-01-slave     
mkdir -p /data/docker/activemq/data     
mkdir -p /data/docker/activemq/data/kahadb     
mkdir -p /data/docker/activemq/log-master     
mkdir -p /data/docker/activemq/log-slave     
mkdir -p /data/docker/activemq/conf
  docker run --restart="always" --name='server2-activemq-02-master' -d --hostname=server2-activemq-02-master \   
-e 'ACTIVEMQ_NAME=amqp-srv2-master' \     
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \     
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \     
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \     
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \     
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \     
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \     
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \     
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e  'ACTIVEMQ_MAX_MEMORY=4096' \     
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \     
-v /data/docker/activemq/data:/data/activemq \     
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \     
-v /data/docker/activemq/log-master:/var/log/activemq \     
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \     
-p 8161:8161 \     
-p 61616:61616 \     
-p 61613:61613 \     
webcenter/activemq
  docker run --restart="always" --name='server2-activemq-02-slave' -d --hostname=server2-activemq-02-slave \   
-e 'ACTIVEMQ_NAME=amqp-srv2-slave' \     
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \     
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \     
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \     
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \     
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \     
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \     
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \     
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e  'ACTIVEMQ_MAX_MEMORY=4096' \     
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \     
-v /data/docker/activemq/data:/data/activemq \     
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \     
-v /data/docker/activemq/log-slave:/var/log/activemq \     
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \     
-p 8171:8161 \     
-p 61626:61616 \     
-p 61623:61613 \     
webcenter/activemq
  rm -rf /data/docker/activemq   
docker stop server2-activemq-02-master && docker rm server2-activemq-02-master     
docker stop server2-activemq-02-slave && docker rm server2-activemq-02-slave     

  cat /data/docker/activemq/log-master/activemq.log   
cat /data/docker/activemq/log-slave/activemq.log
  

  客户端连接ActiveMQ集群
  # About: "Caught: javax.jms.JMSSecurityException: User name [xxx] or password is invalid. javax.jms.JMSSecurityException: User name [xxx] or password is invalid."   
# Refer to https://github.com/disaster37/activemq/issues/15     
# At last, I find that only set "-e 'ACTIVEMQ_ADMIN_LOGIN=yourName' -e 'ACTIVEMQ_ADMIN_PASSWORD=yourPassword' \" like this self can login success, then I got a success!
  

  Client connection example:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","your_password","failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false");  完整的例子如下:
  activemq hello world writen with java
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* Hello world!
*/
public class activemq5Failover {
    public static void main(String[] args) throws Exception {
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
    }
    public static void thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }
    public static class HelloWorldProducer implements Runnable {
        public void run() {
            try {
                // Create a ConnectionFactory
//                Refer: http://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","your_password","failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false");
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
                // Create a MessageProducer from the Session to the Topic or Queue
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                // Create a messages
                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
                TextMessage message = session.createTextMessage(text);
                // Tell the producer to send the message
                System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
                producer.send(message);
                // Clean up
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }
    public static class HelloWorldConsumer implements Runnable, ExceptionListener {
        public void run() {
            try {
                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","your_password","failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false");
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
                connection.setExceptionListener(this);
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
                // Create a MessageConsumer from the Session to the Topic or Queue
                MessageConsumer consumer = session.createConsumer(destination);
                // Wait for a message
                Message message = consumer.receive(1000);
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                } else {
                    System.out.println("Received: " + message);
                }
                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
        public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occured.  Shutting down client.");
        }
    }
}  tag:activemq clustering,ActiveMQ 集群,ActiveMQ 负载均衡,ActiveMQ 主备,ActiveMQ 高可用   
--end--

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-482596-1-1.html 上篇帖子: 技术干货 | Docker容器中需要避免的十种常见误区 下篇帖子: Docker容器简用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表