|
这种方式有个问题,activemq1有消息没消费完但是突然宕机,虽然程序会自动连到activemq2。但是activemq1的消息只有等机器恢复后才会被消费。
1.启动:我这里使用的是apache-activemq-5.13.3,是在windows下使用的,发现根据文档说的双击activemq.bat启动不了,那就只好使用命令启动,CMD进入到apache-activemq-5.13.3\bin下,输入activemqbat start。这样就可以启动了。
2.主从配置:第一个activemq解压到apache-activemq-5.13.3,第二个解压到apache-activemq-5.13.3-2
第一个activemq直接输入命令启动
第二个需要修改参数:a.打开apache-activemq-5.13.3-2\conf\activemq.xml,修改broker标签里面的brokerName,不要和第一个相同就行
b.修改activemq.xml中的transportConnectors,删除其他,只留一个openwire就行,修改uri里面的端口号
c.在transportConnectors上面添加(如果一会儿启动的时候这里报错,请手动敲打下面三行,不要复制)
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/>
</networkConnectors>
d.修改\conf\jetty.xml文件的115行,端口号随便写一个。(这里是jetty的访问端口)
配置文件修改完成,启动第一个activemq,启动第二个activemq。
接下来是代码中brokerURL需要改成使用failover。这样启动生产者和消费者后,程序就可以在主从直接自动切换(可以尝试轮流关闭主从)。
生产者代码如下:
1 import javax.jms.Connection;
2 import javax.jms.ConnectionFactory;
3 import javax.jms.DeliveryMode;
4 import javax.jms.Destination;
5 import javax.jms.MessageProducer;
6 import javax.jms.Session;
7 import javax.jms.TextMessage;
8
9 import org.apache.activemq.ActiveMQConnection;
10 import org.apache.activemq.ActiveMQConnectionFactory;
11
12 public class Sender {
13 public static void main(String[] args) {
14 // ConnectionFactory :连接工厂,JMS 用它创建连接
15 ConnectionFactory connectionFactory;
16 // Connection :JMS 客户端到JMS Provider 的连接
17 Connection connection = null;
18 // Session: 一个发送或接收消息的线程
19 Session session;
20 // Destination :消息的目的地;消息发送给谁.
21 Destination destination;
22 // MessageProducer:消息发送者
23 MessageProducer producer;
24 // TextMessage message;
25 // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
26 String brokerURL = "failover://(tcp://localhost:61616,tcp://localhost:61617)";
27 connectionFactory = new ActiveMQConnectionFactory(
28 ActiveMQConnection.DEFAULT_USER,
29 ActiveMQConnection.DEFAULT_PASSWORD, brokerURL);
30 try {
31 // 构造从工厂得到连接对象
32 connection = connectionFactory.createConnection();
33 // 启动
34 connection.start();
35 // 获取操作连接
36 session = connection.createSession(Boolean.TRUE,
37 Session.AUTO_ACKNOWLEDGE);
38 destination = session.createQueue("FirstQueue");
39 // 得到消息生成者
40 producer = session.createProducer(destination);
41 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
42 while (true) {
43 sendMessage(session, producer);
44 session.commit();// commit后消息才会发出去
45 Thread.sleep(1000);
46 }
47 } catch (Exception e) {
48 e.printStackTrace();
49 } finally {
50 try {
51 if (null != connection)
52 connection.close();
53 } catch (Throwable ignore) {
54 }
55 }
56 }
57
58 static int i = 1;
59
60 public static void sendMessage(Session session, MessageProducer producer)
61 throws Exception {
62 TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);
63 // 发送消息到目的地方
64 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
65 producer.send(message);
66 i++;
67 }
68 }
producer 消费者代码如下:
1 import javax.jms.Connection;
2 import javax.jms.ConnectionFactory;
3 import javax.jms.Destination;
4 import javax.jms.JMSException;
5 import javax.jms.Message;
6 import javax.jms.MessageConsumer;
7 import javax.jms.MessageListener;
8 import javax.jms.Session;
9 import javax.jms.TextMessage;
10
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13
14 public class Receiver {
15 public static void main(String[] args) {
16 // ConnectionFactory :连接工厂,JMS 用它创建连接
17 ConnectionFactory connectionFactory;
18 // Connection :JMS 客户端到JMS Provider 的连接
19 Connection connection = null;
20 // Session: 一个发送或接收消息的线程
21 Session session;
22 // Destination :消息的目的地;消息发送给谁.
23 Destination destination;
24 // 消费者,消息接收者
25 MessageConsumer consumer;
26 String brokerURL = "failover://(tcp://localhost:61616,tcp://localhost:61617)";
27 // String brokerURL = "tcp://localhost:61616";
28 connectionFactory = new ActiveMQConnectionFactory(
29 ActiveMQConnection.DEFAULT_USER,
30 ActiveMQConnection.DEFAULT_PASSWORD,
31 brokerURL);
32 try {
33 // 构造从工厂得到连接对象
34 connection = connectionFactory.createConnection();
35 // 启动
36 connection.start();
37 // 获取操作连接
38 session = connection.createSession(Boolean.FALSE,
39 Session.AUTO_ACKNOWLEDGE);
40 destination = session.createQueue("FirstQueue");
41 consumer = session.createConsumer(destination);
42 consumer.setMessageListener(new MyListener());
43 System.out.println("started...");
44 while(true){
45 }
46 } catch (Exception e) {
47 e.printStackTrace();
48 } finally {
49 try {
50 if (null != connection)
51 connection.close();
52 } catch (Throwable ignore) {
53 }
54 }
55 }
56 }
57 class MyListener implements MessageListener{
58
59 public void onMessage(Message message) {
60 TextMessage textMessage = (TextMessage) message;
61 try {
62 System.out.println("收到消息:"+textMessage.getText());
63 } catch (JMSException e) {
64 e.printStackTrace();
65 }
66 }
67 }
Receiver 以上代码部分摘自网络
这是配置主从的一个方案,还有一种方案是使用文件系统。 |
|
|