设为首页 收藏本站
查看: 1683|回复: 0

[经验分享] Apache ActiveMQ Queue Topic 详解 教程 加入代码解释说明

[复制链接]

尚未签到

发表于 2017-1-11 10:16:18 | 显示全部楼层 |阅读模式
  一、特性及优势




1、实现 JMS1.1 规范,支持 J2EE1.4以上

2、可运行于任何 jvm和大部分 web 容器(ActiveMQ works great in any JVM)

3、支持多种语言客户端(java, C, C++, AJAX, ACTIONSCRIPT 等等)

4、支持多种协议(stomp,openwire,REST)

5、良好的 spring 支持(ActiveMQ has great Spring Support)

6、速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than

JBossMQ.)

7、与 OpenJMS、JbossMQ等开源jms provider 相比,ActiveMQ有 Apache 的支

持,持续发展的优势明显。
  二、下载部署



1、下载
http://activemq.apache.org/activemq-510-release.html
,下载 5.1.0 Windows

Distribution版本

2、安装

直接解压至任意目录(如:d:\ apache-activemq-5.1.0)

3、启动 ActiveMQ服务器

方法 1:

直接运行 bin\activemq.bat

方法 2(在 JVM 中嵌套启动):

cd example

ant embedBroker

4、ActiveMQ消息管理后台系统:
http://localhost:8161/admin

  三、运行附带的示例程序



1、Queue 消息示例:(点对点)

*  启动 Queue 消息消费者

cd example ant consumer

*  启动 Queue 消息生产者

cd example

ant producer

简要说明:生产者(producer)发消息,消费者(consumer)接消息,发送/接

收 2000 个消息后自动关闭

2、Topic 消息示例:(群组订阅)

*  启动 Topic 消息消费者

cd example

ant topic-listener

*  启动 Topic 消息生产者

cd example

ant topic-publisher

简要说明:重复 10 轮,publisher每轮发送2000 个消息,并等待获取 listener

的处理结果报告,然后进入下一轮发送,最后统计全局发送时间。
  

  四、Queue与 Topic 的比较

  

1、JMS Queue 执行 load balancer语义:

一条消息仅能被一个 consumer(消费者) 收到。如果在 message 发送的时候没有可用的

consumer,那么它将被保存一直到能处理该 message 的 consumer 可用。如果一

个 consumer 收到一条 message 后却不响应它,那么这条消息将被转到另一个

consumer 那儿。一个 Queue 可以有很多 consumer,并且在多个可用的 consumer

中负载均衡。 
  注:
  


  点对点消息传递域的特点如下:

·  每个消息只能有一个消费者。 

·  消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发

送消息的时候是否处于运行状态,它都可以提取消息。


  

2、Topic 实现 publish和 subscribe 语义:

一条消息被 publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber

将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的

subscriber能够获得消息的一个拷贝。
  注:
  发布/订阅消息传递域的特点如下:

·  每个消息可以有多个消费者。 

·  生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费

自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程

度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激

活状态时发送的消息。

  

3、分别对应两种消息模式:

Point-to-Point (点对点),Publisher/Subscriber Model (发布/订阅者) 其中在 Publicher/Subscriber 模式下又有Nondurable subscription(非持久订阅)

和 durable subscription (持久化订阅)2种消息处理方式(支持离线消息)。
  注:
  在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递

域中,目的地被成为主题(topic)。

  

  五、Point-to-Point (点对点)消息模式开发流程

  

1、生产者(producer)开发流程(ProducerTool.java):
  1.1  创建 Connection:

根据 url,user 和 password 创建一个 jms Connection。

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
connection.start();
   1.2  创建 Session:

在 connection的基础上创建一个 session,同时设置是否支持事务和

ACKNOWLEDGE 标识。

 
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
   

1.3  创建 Destination对象:

需指定其对应的主题(subject)名称,producer 和 consumer 将根据 subject

来发送/接收对应的消息。

            if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}

   

1.4  创建 MessageProducer:

根据 Destination创建MessageProducer 对象,同时设置其持久模式。

            MessageProducer producer = session.createProducer(destination);          
  

1.5  发送消息到队列(Queue):

封装 TextMessage 消息, 使用 MessageProducer 的 send 方法将消息发送出去。

            TextMessage message = session.createTextMessage(createMessageText(i));
producer.send(message);

  

2、消费者(consumer)开发流程(ConsumerTool.java):

2.1  实现 MessageListener 接口:

消费者类必须实现MessageListener 接口,然后在onMessage()方法中监听消息的

到达并处理。

public class ConsumerTool extends Thread implements MessageListener, ExceptionListener
  实现 onMessage(Message message)方法,实现监听消息的到达
  

2.2  创建 Connection:

根据 url,user 和 password 创建一个 jms Connection,如果是durable 模式,

还需要给 connection设置一个 clientId。

            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Connection connection = connectionFactory.createConnection();
//是否是 durable 模式.(离线消息持久化支持)
if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
connection.setClientID(clientId);
}
connection.setExceptionListener(this);
connection.start();

   

2.3  创建 Session 和 Destination:

与 ProducerTool.java 中的流程类似,不再赘述。

session = connection.createSession(transacted, ackMode);
  2.4 创建 replyProducer【可选】:

可以用来将消息处理结果发送给 producer。
  2.5  创建 MessageConsumer: 

根据 Destination创建MessageConsumer 对象。

            MessageConsumer consumer = null;
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic) destination, consumerName);
} else {
consumer = session.createConsumer(destination);
}

   2.6  消费 message:

 在 onMessage()方法中接收producer 发送过来的消息进行处理,并可以通过

replyProducer 反馈信息给 producer

 
            if (message.getJMSReplyTo() != null) {
replyProducer.send(message.getJMSReplyTo()
, session.createTextMessage("Reply: "
+ message.getJMSMessageID()));
}

   

  六、Publisher/Subscriber(发布/订阅者)消息模式开发流程

  

1、订阅者(Subscriber)开发流程(TopicListener.java):



1.1  实现 MessageListener 接口:

在 onMessage()方法中监听发布者发出的消息队列,并做相应处理。

    public void onMessage(Message message) {
if (checkText(message, "SHUTDOWN")) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace(System.out);
}
} else if (checkText(message, "REPORT")) {
// send a report:
try {
long time = System.currentTimeMillis() - start;
String msg = "Received " + count + " in " + time + "ms";
producer.send(session.createTextMessage(msg));
} catch (Exception e) {
e.printStackTrace(System.out);
}
count = 0;
} else {
if (count == 0) {
start = System.currentTimeMillis();
}
if (++count % 1000 == 0) {
System.out.println("Received " + count + " messages.");
}
}
}

   

1.2  创建 Connection:

根据 url,user 和 password 创建一个 jms Connection。

 
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();

   

1.3  创建 Session:

在 connection的基础上创建一个 session,同时设置是否支持事务和

ACKNOWLEDGE 标识。

       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   

1.4  创建 Topic:

 创建 2 个Topic,  topictest.messages用于接收发布者发出的消息,

topictest.control用于向发布者发送消息,实现双方的交互。

        topic = session.createTopic("topictest.messages");
control = session.createTopic("topictest.control");

   

1.5  创建 consumer 和 producer 对象:

 根据topictest.messages创建consumer,根据topictest.control创建producer。

        MessageConsumer consumer = session.createConsumer(topic);//创建消费者
consumer.setMessageListener(this);
connection.start();
producer = session.createProducer(control);//创建生产者
   

1.6  接收处理消息:

 在 onMessage()方法中,对收到的消息进行处理,可直接简单在本地显示消

息,或者根据消息内容不同处理对应的业务逻辑(比如:数据库更新、文件操作

等等),并且可以使用 producer对象将处理结果返回给发布者。

    //可以先检查消息类型
private static boolean checkText(Message m, String s) {
try {
return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);
} catch (JMSException e) {
e.printStackTrace(System.out);
return false;
}
}

 
//然后
if (checkText(message, "SHUTDOWN")) {
//关机
} else if (checkText(message, "REPORT")) {
// 打印

} else {
//别的操作
}
   

2、发布者(Publisher)开发流程(TopicPublisher.java):


  

2.1  实现 MessageListener 接口:

在 onMessage()方法中接收订阅者的反馈消息。

    public void onMessage(Message message) {
synchronized (mutex) {
System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");
if (remaining == 0) {
mutex.notify();
}
}
}

   

2.2  创建 Connection:

根据 url  创建一个 jms Connection。

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();

   2.3  创建 Session:

在 connection的基础上创建一个 session,同时设置是否支持事务和

ACKNOWLEDGE 标识。

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   

2.4  创建 Topic:

 创建 2 个Topic,topictest.messages用于向订阅者发布消息,topictest.control用

于接收订阅者反馈的消息。这2个topic与订阅者开发流程中的topic是一一对应

的。

        topic = session.createTopic("topictest.messages");
control = session.createTopic("topictest.control");

  2.5  创建 consumer 和 producer 对象:

 根据topictest.messages创建publisher;

根据topictest.control创建consumer,同时监听订阅者反馈的消息。

        publisher = session.createProducer(topic);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化模式
session.createConsumer(control).setMessageListener(this);//加入监听
connection.start();

   

2.6  给所有订阅者发送消息,并接收反馈消息:

 示例代码中,一共重复 10 轮操作。

        for (int i = 0; i < batch; i++) {
if (i > 0) {
Thread.sleep(delay * 1000);
}
times = batch(messages);
System.out.println("Batch " + (i + 1) + " of " + batch + " completed in " + times + " ms.");
}

   

每轮先向所有订阅者发送 2000 个消息;

    private long batch(int msgCount) throws Exception {
long start = System.currentTimeMillis();
remaining = subscribers;
publish();
waitForCompletion();
return System.currentTimeMillis() - start;
}


    private void publish() throws Exception {
// send events
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(payload);
for (int i = 0; i < messages; i++) {
publisher.send(msg);
if ((i + 1) % 1000 == 0) {
System.out.println("Sent " + (i + 1) + " messages");
}
}
// request report
publisher.send(session.createTextMessage("REPORT"));
}

  然后堵塞线程,开始等待;

    private void waitForCompletion() throws Exception {
System.out.println("Waiting for completion...");
synchronized (mutex) {
while (remaining > 0) {
mutex.wait();//赌赛线程
}
}
}

   

最后通过 onMessage()方法,接收到订阅者反馈的“REPORT”类信息后,才

print 反馈信息并解除线程堵塞,进入下一轮。

    public void onMessage(Message message) {
synchronized (mutex) {
System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");
if (remaining == 0) {
mutex.notify();//唤醒线程
}
}
}

   

注:可同时运行多个订阅者测试查看此模式效果

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-326903-1-1.html 上篇帖子: 文件上传与下载Apache的Servlet实现 下篇帖子: android连网详解——android.net、org.apache.http联网实现
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表