clh899 发表于 2017-2-16 13:06:32

基于spring和weblogic的jms

  环境



  1.weblogic 9.2.3
  已经创建JMS Connection Factory:jms/connectionFactory
  JMS Queue:jms/TestQueue
  JMS Topic:jms/TestTopic
  实现一



  通过spring的JmsTemplate发送消息到Queue,然后通过messageListener实现消息的异步接收.
  完整的springl配置文件applicationContext-jms.xml如下:

<beans>
<bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName">
<value>jms/connectionFactory</value>
</property>
<property name="jndiTemplate">
<ref bean="jmsJndiTemplate"></ref>
</property>
</bean>
<!-- define jms queue jndi -->
<bean id="jmsDestination" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName">
<value>jms/TestQueue</value>
</property>
<property name="jndiTemplate">
<ref bean="jmsJndiTemplate"></ref>
</property>
</bean>
<!-- define jms queue url -->
<bean id="jmsJndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<!-- com.sun.enterprise.naming.SerialInitContextFactory -->
<prop key="java.naming.factory.initial">weblogic.jndi.WLInitialContextFactory</prop>
<prop key="java.naming.provider.url">t3://16.158.51.221:7001</prop>
</props>
</property>
</bean>
<!-- JMS Queue Send Template-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsConnectionFactory" />
</property>
<property name="defaultDestination">
<ref local="jmsDestination" />
</property>
</bean>
<bean id="jmsProducer" class="org.springframework.samples.jpetstore.jms.JmsProducer">
<constructor-arg ref="jmsTemplate" />
</bean>
<bean id="messageListener" class="org.springframework.samples.jpetstore.jms.JpetstoreJmsListener" />
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!--property name="concurrentConsumers" value="5" /-->
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="jmsDestination" />
<property name="messageListener" ref="messageListener" />
<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>
</beans>
  消息发送JmsProducer代码如下

package org.springframework.samples.jpetstore.jms;
import org.springframework.jms.core.JmsTemplate;
public class JmsProducer {
private JmsTemplate jmsTemplate;
public JmsProducer(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessage() {
jmsTemplate.convertAndSend("Hello world!("+System.currentTimeMillis()+")");
}
}
  消息接收JpetstoreJmsListener代码如下

package org.springframework.samples.jpetstore.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class JpetstoreJmsListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println("ok--"+((TextMessage)message).getText());
} catch (JMSException e) {
throw new RuntimeException(e);
}
} else {
throw new IllegalArgumentException(
"Message must be of type TestMessage");
}
}
}
   测试代码

public class JmsConsole {
public static void main(String[] args) throws IOException {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-jms.xml");
JmsProducer sender = (JmsProducer) context.getBean("jmsProducer");
sender.sendMessage();
}
}
  运行JmsConsole,控制台输入如下:

ok--Hello world!(1250560961109)
  将测试程序修改一下,如下

public static void main(String[] args) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-jms.xml");
JmsProducer sender = (JmsProducer) context.getBean("jmsProducer");
int i = 1;
System.out.println(System.currentTimeMillis());
while(i<10){
System.out.println("line:"+i);
sender.sendMessage(i);
i++;
}
}
  同时修改一下JpetstoreJmsListener

public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println("ok--"+((TextMessage)message).getText());
} catch (JMSException e) {
throw new RuntimeException(e);
}
} else {
throw new IllegalArgumentException(
"Message must be of type TestMessage");
}
System.out.println(System.currentTimeMillis());
}
  将spring配置中的

<property name="concurrentConsumers" value="1" />

  设置为1,运行测试程序,从开始往Queue发送消息开始到消息全部处理完毕结束平均耗时大概有600ms左右。
  将spring配置中的


<property name="concurrentConsumers" value="9" />

  
设置为9,运行测试程序,从开始往Queue发送消息开始到消息全部处理完毕结束平均耗时大概也在600ms左右。
  将循环加到100,设置为并发消费者数为1和为9的时间有10倍左右的差距。
  因此对于大并发的Queue处理,设置concurrentConsumers是非常有必要的。
  实现二



  上面的例子是传输简单类型的消息,下面让我们看一下对象消息如何传输。
  传输的对象如下

package org.springframework.samples.jpetstore.domain;
import java.io.Serializable;

public class Product implements Serializable {
/* Private Fields */
private String productId;
private String categoryId;
private String name;
private String description;
/* JavaBeans Properties */
public String getProductId() { return productId; }
public void setProductId(String productId) { this.productId = productId.trim(); }
public String getCategoryId() { return categoryId; }
public void setCategoryId(String categoryId) { this.categoryId = categoryId; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
/* Public Methods*/
public String toString() {
return getName();
}
}

   spring的配置不变,需要改变的是发送端代码和messageListener代码,如下
  JmsProducer

package org.springframework.samples.jpetstore.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.converter.SimpleMessageConverter;
import org.springframework.samples.jpetstore.domain.Product;
public class JmsProducer {
private JmsTemplate jmsTemplate;
public JmsProducer(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessage() {   
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Product p = new Product();
p.setCategoryId("1-2-3");
p.setName("TestP");
p.setProductId("P-1-2-3");
Message message = new SimpleMessageConverter().toMessage(p, session);
message.setLongProperty("startTime", System.currentTimeMillis());
return message;
}
});
}
}
  JpetstoreJmsListener

package org.springframework.samples.jpetstore.jms;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import org.springframework.jms.support.converter.SimpleMessageConverter;
import org.springframework.samples.jpetstore.domain.Product;
public class JpetstoreJmsListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof ObjectMessage) {
try {
Product p = (Product)(new SimpleMessageConverter().fromMessage(message));
System.out.println("ok--"+p.getProductId());
} catch (Exception e) {
throw new RuntimeException(e);
}
} else{
throw new IllegalArgumentException(
"Message must be of type TestMessage");
}
System.out.println(System.currentTimeMillis());
}
}
  测试代码

public class JmsConsole {
public static void main(String[] args) throws IOException {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-jms.xml");
JmsProducer sender = (JmsProducer) context.getBean("jmsProducer");
sender.sendMessage();
}
}
  输入如下信息:

ok--P-1-2-3
1250585109375
   附录



  概念


  JMS(Java Message Service)是Java程序与企业消息系统进行交互的规范,它提供了一组API用于创建、发送、接收和读取消息。JMS定义了一组通用的消息概念和功能,旨在降低开发者访问企业消息系统的难度,同时最大化应用程序的可移植性。

  


  JMS消息模型




1、Pub/Sub模型:



一对多广播消息模式。在Pub/Sub模型中,消息发送方称为Publisher
,消息接收方称为Subscriber
。主题Topic
充当Publisher和Subscriber之间的中介,对某个Topic感兴趣的Subscriber在Topic上注册,Publisher将消息发送到指定的Topic,Topic上的消息将递送给所有注册的Subscriber。

Pub/Sub模型的主要特点:

对Topic上的一个消息,注册的每个Subscriber均得到该消息的一个拷贝。

Topic上的消息是自动递送给Subscriber的,不需要Subscriber去主动获取新消息,这种方式也称为推模式(Push Model)。

Publisher和Subscriber可以在运行时刻动态添加和删除。

  2、P2P模型:


一对一消息模式。在P2P模型中,消息发送方称为Sender
,消息接收方称为Receiver
。队列Queue
充当Sender和Receiver之间的中介,Sender将消息发送到指定的Queue,Receiver从指定的Queue上获取消息。

P2P模型的主要特点:

对Queue上的每个消息,即使有多个Receiver请求该消息,有且只有一个Receiver接收到该消息。即每个消息只能被一个Receiver消费,消费之后消息就从Queue上删除了。

Receiver需要到Queue上请求消息,而不是自动递给它的,这种方式也称为拉模式(Pull Model)。

Queue上的消息是有顺序的,消息按照它发送到Queue上的顺序被Receiver取走。
页: [1]
查看完整版本: 基于spring和weblogic的jms