|
Apache Active MQ 之Queue
Pre-Condition: 安装Apache ActiveMQ并启动服务
消费者:
package com.wx.jms.queue;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
/**
* @Class name:Topic_Test.java
*
* Short description on the purpose of the program.
*
* @author:wangxiang
* @modified:Mar 22, 2011
*
*/
public class Queue_Test {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = factory.createConnection();
conn.start();
Queue queue = new ActiveMQQueue("myqueue");
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 = session.createConsumer(queue);
consumer1.setMessageListener(new MessageListener(){
public void onMessage(Message msg) {
try {
System.out.println("consumer1:" + ((TextMessage)msg).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
MessageConsumer consumer2 = session.createConsumer(queue);
consumer2.setMessageListener(new MessageListener(){
public void onMessage(Message msg) {
try {
System.out.println("consumer2:" + ((TextMessage)msg).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
生产者:
package com.wx.jms.queue;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
/**
* @Class name: TopicProducer_Main.java
*
* Short description on the purpose of the program.
*
* @author: wangxiang
* @modified: Mar 22, 2011
*
*/
public class QueueProducer_Main {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
Connection conn = factory.createConnection();
conn.start();
Queue queue = new ActiveMQQueue("myqueue");
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("msg " + i));
}
}
}
结果:
consumer2:msg 0
consumer1:msg 1
consumer2:msg 2
consumer1:msg 3
consumer2:msg 4
consumer1:msg 5
consumer2:msg 6
consumer1:msg 7
consumer2:msg 8
consumer1:msg 9 |
|
|
|
|
|
|