swsrl 发表于 2017-3-2 12:36:15

基于ActiveMQ的点对点收发消息

  ActiveMQ是apache的一个开源消息引擎。可以作为即通引擎或者消息中间件引擎。

准备
  下载ActiveMQ
  http://activemq.apache.org/download.html
  进入\bin\win64双击InstallService.bat安装为系统服务。然后启动这个系统服务
  访问 http://localhost:8161/admin/queues.jsp 可以看到消息队列列表 账号密码默认就是admin admin,配置在conf/jetty-realm.properties中
  然后将根目录下的activemq-all-5.13.2.jar引入项目,就可以进行开发了
  首先对ActiveMQ进行简单封装



1 package activeMQStu;
2
3 import org.apache.activemq.ActiveMQConnectionFactory;
4
5 import javax.jms.*;
6 import java.util.Arrays;
7
8 /**
9* Created by lvyahui on 2016/4/23.
10*/
11 public class ActiveMQServer {
12
13
14   private String user;
15   private String pass;
16   private String url;
17
18   private Session session;
19   private Connection connection;
20   private ActiveMQConnectionFactory connectionFactory;
21
22   public ActiveMQConnectionFactory getConnectionFactory() {
23         return connectionFactory;
24   }
25
26   public ActiveMQServer(String user, String pass, String url) throws JMSException {
27         this.user = user;
28         this.pass = pass;
29         this.url = url;
30
31         this.connectionFactory = new ActiveMQConnectionFactory(this.user, this.pass, this.url);
32         /*
33         * 必须指明哪些包的类是可以序列化的
34         * 否则会报错:ActiveMQ Serializable class not available to broker. Reason: ClassNotFoundException
35         * 参考:http://activemq.apache.org/objectmessage.html
36         * */
37         connectionFactory.setTrustedPackages(Arrays.asList("activeMQStu"));
38         this.connection = connectionFactory.createConnection();
39         connection.start();
40   }
41
42   public Session getSession() throws JMSException {
43         if (session == null) {
44             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
45         }
46         return session;
47   }
48
49   public Queue createQueue(String name) throws JMSException {
50         return getSession().createQueue(name);
51   }
52
53   public void close() throws JMSException {
54         if (session != null) {
55             session.commit();
56             session.close();
57         }
58         if (connection != null) {
59             connection.close();
60         }
61   }
62
63 }
消息



1 package activeMQStu;
2
3 import java.io.Serializable;
4
5 /**
6* Created by lvyahui on 2016/4/23.
7*/
8 public class User implements Serializable {
9
10   private String username ;
11   private String password ;
12   private String salt ;
13
14   public String getUsername() {
15         return username;
16   }
17
18   public void setUsername(String username) {
19         this.username = username;
20   }
21
22   public String getPassword() {
23         return password;
24   }
25
26   public void setPassword(String password) {
27         this.password = password;
28   }
29
30   public String getSalt() {
31         return salt;
32   }
33
34   public void setSalt(String salt) {
35         this.salt = salt;
36   }
37
38   @Override
39   public String toString() {
40         return "User{" +
41               "username='" + username + '\'' +
42               ", password='" + password + '\'' +
43               ", salt='" + salt + '\'' +
44               '}';
45   }
46 }
消息生产者



1 package activeMQStu.queue;
2
3 import activeMQStu.ActiveMQServer;
4 import activeMQStu.User;
5 import org.apache.activemq.command.ActiveMQObjectMessage;
6
7 import javax.jms.JMSException;
8 import javax.jms.MessageProducer;
9 import javax.jms.ObjectMessage;
10 import javax.jms.Queue;
11
12 /**
13* Created by lvyahui on 2016/4/23.
14*/
15 public class UserProducer {
16
17   private MessageProducer producer;
18
19   public UserProducer(ActiveMQServer activeMQServer, String queueName) throws JMSException {
20         Queue queue = activeMQServer.createQueue(queueName);
21         producer = activeMQServer.getSession().createProducer(queue);
22   }
23
24   public void produce(User user) throws JMSException {
25         ObjectMessage objectMessage = new ActiveMQObjectMessage();
26         objectMessage.setObject(user);
27         producer.send(objectMessage);
28   }
29 }
发送消息



1 package activeMQStu;
2
3 import activeMQStu.queue.UserProducer;
4 import org.apache.activemq.ActiveMQConnection;
5
6 import javax.jms.JMSException;
7
8 /**
9* Created by lvyahui on 2016/4/23.
10*/
11 public class ProduceApp {
12   public static void main(String[] args) throws JMSException {
13         ActiveMQServer activeMQServer = new ActiveMQServer(
14               ActiveMQConnection.DEFAULT_USER,
15               ActiveMQConnection.DEFAULT_PASSWORD,
16               "tcp://localhost:61616"
17         );
18         UserProducer producer = new UserProducer(activeMQServer,"queue.devlyh");
19         for(int i =0 ;i < 100;i++){
20             User user = new User();
21             user.setUsername("lvyahui".concat(String.valueOf(i)));
22             user.setPassword("admin888" + i);
23             user.setSalt("salt"+i);
24             producer.produce(user);
25         }
26         activeMQServer.close();
27   }
28 }
  运行成功后再页面可以看到有100条消息进入了名字为queue.devlyh的队列中
  

消息接受者



1 package activeMQStu.queue;
2
3 import activeMQStu.ActiveMQServer;
4 import activeMQStu.User;
5
6 import javax.jms.JMSException;
7 import javax.jms.MessageConsumer;
8 import javax.jms.ObjectMessage;
9
10 /**
11* Created by lvyahui on 2016/4/23.
12*/
13 public class UserConsumer {
14   private MessageConsumer consumer;
15
16   public UserConsumer(ActiveMQServer activeMQServer,String queueName) throws JMSException {
17         this.consumer = activeMQServer.getSession()
18               .createConsumer(activeMQServer.createQueue(queueName));
19   }
20
21   public User consume() throws JMSException {
22         ObjectMessage objectMessage= (ObjectMessage) consumer.receive();
23         User user = (User) objectMessage.getObject();
24         return user;
25   }
26 }
接收消息



1 package activeMQStu;
2
3 import activeMQStu.queue.UserConsumer;
4 import org.apache.activemq.ActiveMQConnection;
5 import org.apache.activemq.ActiveMQConnectionFactory;
6
7 import javax.jms.JMSException;
8 import java.util.Arrays;
9
10 /**
11* Created by lvyahui on 2016/4/23.
12*/
13 public class ConsumeApp {
14   public static void main(String[] args) throws JMSException {
15         ActiveMQServer activeMQServer = new ActiveMQServer(
16               ActiveMQConnection.DEFAULT_USER,
17               ActiveMQConnection.DEFAULT_PASSWORD,
18               "tcp://localhost:61616"
19         );
20
21 //      ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) activeMQServer.getConnectionFactory();
22
23         UserConsumer consumer = new UserConsumer(activeMQServer,"queue.devlyh");
24         while(true){
25             User user = consumer.consume();
26             System.out.println(user);
27         }
28   }
29 }
  执行效果如下
页: [1]
查看完整版本: 基于ActiveMQ的点对点收发消息