Java消息服务(JMS)是用于编写使用异步消息传递的JEE应用程序的API。JMS的创建过程类似于Hibernate的工厂模式。JMS支持两种消息类型:点对点(PTP)和发布/订阅式(pub/sub)。PTP消息被生产者放入到一个队列中,消费者则从消息队列中取走消息。消息一旦被一个消息者取走,消息就从队列中移除,这意味着即使有多个消费者观察一个队列,但一条消息只能被一个消费者取走。PUB/SUB消息则需要先注册,然后消息异步发送到订阅者,可以有多个订阅者。
实现所包括的步骤:
- 创建JNDI初始上下文(context)。
- 从JNDI上下文获取一个队列连接工厂。
- 从队列连接工厂中获取一个Quene。
- 创建一个Session对象。
- 创建一个发送者(sender)或接收者(receiver)对象。
- 使用步骤5创建的发送者或接收者对象发送或接收消息。
- 处理完消息后,关闭所有JMS资源。
安装完成Weblogic9.2后,启动服务进入控制台, 点击JMS Modules—— >examples-jms
修改JNDI名称为myqueue,mytopic这样在之后的初始化过程中,我们就可以通过名称查找创建连接工厂
服务端代码如下:
package www.lring.net;
import java.util.Hashtable;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
public class Test {
public static void main(String[] args){
try{
// TODO Auto-generated method stub
Hashtable<String,String> table = new Hashtable<String,String>();
table.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
table.put(Context.PROVIDER_URL, "t3://localhost:7001");
Context cxt = new InitialContext(table);
//获得可以进行点对点操作的消息连接工厂,连接工厂一定要先设置好
QueueConnectionFactory factory = (QueueConnectionFactory)
cxt.lookup("weblogic.examples.jms.QueueConnectionFactory");
//获得连接,获得了可以使用消息服务的权利
QueueConnection conn = factory.createQueueConnection();
//获得一个队列对象
Queue q = (Queue) cxt.lookup("myqueue");//从连接工厂中获得一个连接,用这个连接去连接要暂存消息的队列
//获得一次消息发送的会话对象
QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
//建立消息的发送对象
QueueSender sender = session.createSender(q);
//我要发送的消息内容封装到一个message的实现者中。
TextMessage message = (TextMessage) session.createTextMessage();
//写入消息
message.setText("邢海峰hello world......");
//发送消息
sender.send(message);
sender.close();
session.close();
conn.close();
cxt.close();
//TopicConnectionFactory topicfactory = (TopicConnectionFactory) cxt.lookup("weblogic.examples.jms.TopicConnectionFactory");
}catch(Exception e ){
e.printStackTrace() ;
}
}
}
客户端代码如下:
package www.lring.net;
import java.util.Hashtable;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
public class GetMessage {
public GetMessage() {
// TODO Auto-generated constructor stub
}
/**
* @param args
*/
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Hashtable<String,String> table = new Hashtable<String,String>();
table.put(Context.INITIAL_CONTEXT_FACTORY, weblogic.jndi.WLInitialContextFactory.class.getName());
table.put(Context.PROVIDER_URL, "t3://localhost:7001");
Context context = new InitialContext(table);
QueueConnectionFactory factory = (QueueConnectionFactory)
context.lookup("weblogic.examples.jms.QueueConnectionFactory");
Queue q = (Queue) context.lookup("myqueue");
QueueConnection conn = factory.createQueueConnection();
//session.setMessageListener(arg0);
//消息的提取本身是异步的
QueueSession session = conn.createQueueSession(false, 0);
//获得一个消息的接受者
//获得消息,消费消息
conn.start();
QueueReceiver receiver = session.createReceiver(q);
//receiver.setMessageListener(new MyLinener());
//while(true)
//{
//Thread.sleep(1000);
//System.out.println("..............");
//}
TextMessage message = (TextMessage) receiver.receive();
System.out.println(message.getText());
conn.close();
}
}
|