设为首页 收藏本站
查看: 1159|回复: 0

[经验分享] 基于ActiveMQ的点对点收发消息

[复制链接]

尚未签到

发表于 2017-3-2 12:36:15 | 显示全部楼层 |阅读模式
  ActiveMQ是apache的一个开源消息引擎。可以作为即通引擎或者消息中间件引擎。

准备
  下载ActiveMQ
  http://activemq.apache.org/download.html
  进入\bin\win64双击InstallService.bat安装为系统服务。然后启动这个系统服务
  访问 http://localhost:8161/admin/queues.jsp 可以看到消息队列列表 账号密码默认就是admin admin,配置在conf/jetty-realm.properties中
  然后将根目录下的activemq-all-5.13.2.jar引入项目,就可以进行开发了
  首先对ActiveMQ进行简单封装



1 package activeMQStu;
2
3 import org.apache.activemq.ActiveMQConnectionFactory;
4
5 import javax.jms.*;
6 import java.util.Arrays;
7
8 /**
9  * Created by lvyahui on 2016/4/23.
10  */
11 public class ActiveMQServer {
12
13
14     private String user;
15     private String pass;
16     private String url;
17
18     private Session session;
19     private Connection connection;
20     private ActiveMQConnectionFactory connectionFactory;
21
22     public ActiveMQConnectionFactory getConnectionFactory() {
23         return connectionFactory;
24     }
25
26     public ActiveMQServer(String user, String pass, String url) throws JMSException {
27         this.user = user;
28         this.pass = pass;
29         this.url = url;
30
31         this.connectionFactory = new ActiveMQConnectionFactory(this.user, this.pass, this.url);
32         /*
33         * 必须指明哪些包的类是可以序列化的
34         * 否则会报错:ActiveMQ Serializable class not available to broker. Reason: ClassNotFoundException
35         * 参考:http://activemq.apache.org/objectmessage.html
36         * */
37         connectionFactory.setTrustedPackages(Arrays.asList("activeMQStu"));
38         this.connection = connectionFactory.createConnection();
39         connection.start();
40     }
41
42     public Session getSession() throws JMSException {
43         if (session == null) {
44             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
45         }
46         return session;
47     }
48
49     public Queue createQueue(String name) throws JMSException {
50         return getSession().createQueue(name);
51     }
52
53     public void close() throws JMSException {
54         if (session != null) {
55             session.commit();
56             session.close();
57         }
58         if (connection != null) {
59             connection.close();
60         }
61     }
62
63 }
消息



1 package activeMQStu;
2
3 import java.io.Serializable;
4
5 /**
6  * Created by lvyahui on 2016/4/23.
7  */
8 public class User implements Serializable {
9
10     private String username ;
11     private String password ;
12     private String salt ;
13
14     public String getUsername() {
15         return username;
16     }
17
18     public void setUsername(String username) {
19         this.username = username;
20     }
21
22     public String getPassword() {
23         return password;
24     }
25
26     public void setPassword(String password) {
27         this.password = password;
28     }
29
30     public String getSalt() {
31         return salt;
32     }
33
34     public void setSalt(String salt) {
35         this.salt = salt;
36     }
37
38     @Override
39     public String toString() {
40         return "User{" +
41                 "username='" + username + '\'' +
42                 ", password='" + password + '\'' +
43                 ", salt='" + salt + '\'' +
44                 '}';
45     }
46 }
消息生产者



1 package activeMQStu.queue;
2
3 import activeMQStu.ActiveMQServer;
4 import activeMQStu.User;
5 import org.apache.activemq.command.ActiveMQObjectMessage;
6
7 import javax.jms.JMSException;
8 import javax.jms.MessageProducer;
9 import javax.jms.ObjectMessage;
10 import javax.jms.Queue;
11
12 /**
13  * Created by lvyahui on 2016/4/23.
14  */
15 public class UserProducer {
16
17     private MessageProducer producer;
18
19     public UserProducer(ActiveMQServer activeMQServer, String queueName) throws JMSException {
20         Queue queue = activeMQServer.createQueue(queueName);
21         producer = activeMQServer.getSession().createProducer(queue);
22     }
23
24     public void produce(User user) throws JMSException {
25         ObjectMessage objectMessage = new ActiveMQObjectMessage();
26         objectMessage.setObject(user);
27         producer.send(objectMessage);
28     }
29 }
发送消息



1 package activeMQStu;
2
3 import activeMQStu.queue.UserProducer;
4 import org.apache.activemq.ActiveMQConnection;
5
6 import javax.jms.JMSException;
7
8 /**
9  * Created by lvyahui on 2016/4/23.
10  */
11 public class ProduceApp {
12     public static void main(String[] args) throws JMSException {
13         ActiveMQServer activeMQServer = new ActiveMQServer(
14                 ActiveMQConnection.DEFAULT_USER,
15                 ActiveMQConnection.DEFAULT_PASSWORD,
16                 "tcp://localhost:61616"
17         );
18         UserProducer producer = new UserProducer(activeMQServer,"queue.devlyh");
19         for(int i =0 ;i < 100;i++){
20             User user = new User();
21             user.setUsername("lvyahui".concat(String.valueOf(i)));
22             user.setPassword("admin888" + i);
23             user.setSalt("salt"+i);
24             producer.produce(user);
25         }
26         activeMQServer.close();
27     }
28 }
  运行成功后再页面可以看到有100条消息进入了名字为queue.devlyh的队列中
   DSC0000.png

消息接受者



1 package activeMQStu.queue;
2
3 import activeMQStu.ActiveMQServer;
4 import activeMQStu.User;
5
6 import javax.jms.JMSException;
7 import javax.jms.MessageConsumer;
8 import javax.jms.ObjectMessage;
9
10 /**
11  * Created by lvyahui on 2016/4/23.
12  */
13 public class UserConsumer {
14     private MessageConsumer consumer;
15
16     public UserConsumer(ActiveMQServer activeMQServer,String queueName) throws JMSException {
17         this.consumer = activeMQServer.getSession()
18                 .createConsumer(activeMQServer.createQueue(queueName));
19     }
20
21     public User consume() throws JMSException {
22         ObjectMessage objectMessage  = (ObjectMessage) consumer.receive();
23         User user = (User) objectMessage.getObject();
24         return user;
25     }
26 }
接收消息



1 package activeMQStu;
2
3 import activeMQStu.queue.UserConsumer;
4 import org.apache.activemq.ActiveMQConnection;
5 import org.apache.activemq.ActiveMQConnectionFactory;
6
7 import javax.jms.JMSException;
8 import java.util.Arrays;
9
10 /**
11  * Created by lvyahui on 2016/4/23.
12  */
13 public class ConsumeApp {
14     public static void main(String[] args) throws JMSException {
15         ActiveMQServer activeMQServer = new ActiveMQServer(
16                 ActiveMQConnection.DEFAULT_USER,
17                 ActiveMQConnection.DEFAULT_PASSWORD,
18                 "tcp://localhost:61616"
19         );
20
21 //        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) activeMQServer.getConnectionFactory();
22
23         UserConsumer consumer = new UserConsumer(activeMQServer,"queue.devlyh");
24         while(true){
25             User user = consumer.consume();
26             System.out.println(user);
27         }
28     }
29 }
  执行效果如下
DSC0001.png

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-349344-1-1.html 上篇帖子: 机器学习------精心总结 下篇帖子: 可执行jar包
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表