基于ActiveMQ的点对点收发消息
ActiveMQ是apache的一个开源消息引擎。可以作为即通引擎或者消息中间件引擎。准备
下载ActiveMQ
http://activemq.apache.org/download.html
进入\bin\win64双击InstallService.bat安装为系统服务。然后启动这个系统服务
访问 http://localhost:8161/admin/queues.jsp 可以看到消息队列列表 账号密码默认就是admin admin,配置在conf/jetty-realm.properties中
然后将根目录下的activemq-all-5.13.2.jar引入项目,就可以进行开发了
首先对ActiveMQ进行简单封装
1 package activeMQStu;
2
3 import org.apache.activemq.ActiveMQConnectionFactory;
4
5 import javax.jms.*;
6 import java.util.Arrays;
7
8 /**
9* Created by lvyahui on 2016/4/23.
10*/
11 public class ActiveMQServer {
12
13
14 private String user;
15 private String pass;
16 private String url;
17
18 private Session session;
19 private Connection connection;
20 private ActiveMQConnectionFactory connectionFactory;
21
22 public ActiveMQConnectionFactory getConnectionFactory() {
23 return connectionFactory;
24 }
25
26 public ActiveMQServer(String user, String pass, String url) throws JMSException {
27 this.user = user;
28 this.pass = pass;
29 this.url = url;
30
31 this.connectionFactory = new ActiveMQConnectionFactory(this.user, this.pass, this.url);
32 /*
33 * 必须指明哪些包的类是可以序列化的
34 * 否则会报错:ActiveMQ Serializable class not available to broker. Reason: ClassNotFoundException
35 * 参考:http://activemq.apache.org/objectmessage.html
36 * */
37 connectionFactory.setTrustedPackages(Arrays.asList("activeMQStu"));
38 this.connection = connectionFactory.createConnection();
39 connection.start();
40 }
41
42 public Session getSession() throws JMSException {
43 if (session == null) {
44 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
45 }
46 return session;
47 }
48
49 public Queue createQueue(String name) throws JMSException {
50 return getSession().createQueue(name);
51 }
52
53 public void close() throws JMSException {
54 if (session != null) {
55 session.commit();
56 session.close();
57 }
58 if (connection != null) {
59 connection.close();
60 }
61 }
62
63 }
消息
1 package activeMQStu;
2
3 import java.io.Serializable;
4
5 /**
6* Created by lvyahui on 2016/4/23.
7*/
8 public class User implements Serializable {
9
10 private String username ;
11 private String password ;
12 private String salt ;
13
14 public String getUsername() {
15 return username;
16 }
17
18 public void setUsername(String username) {
19 this.username = username;
20 }
21
22 public String getPassword() {
23 return password;
24 }
25
26 public void setPassword(String password) {
27 this.password = password;
28 }
29
30 public String getSalt() {
31 return salt;
32 }
33
34 public void setSalt(String salt) {
35 this.salt = salt;
36 }
37
38 @Override
39 public String toString() {
40 return "User{" +
41 "username='" + username + '\'' +
42 ", password='" + password + '\'' +
43 ", salt='" + salt + '\'' +
44 '}';
45 }
46 }
消息生产者
1 package activeMQStu.queue;
2
3 import activeMQStu.ActiveMQServer;
4 import activeMQStu.User;
5 import org.apache.activemq.command.ActiveMQObjectMessage;
6
7 import javax.jms.JMSException;
8 import javax.jms.MessageProducer;
9 import javax.jms.ObjectMessage;
10 import javax.jms.Queue;
11
12 /**
13* Created by lvyahui on 2016/4/23.
14*/
15 public class UserProducer {
16
17 private MessageProducer producer;
18
19 public UserProducer(ActiveMQServer activeMQServer, String queueName) throws JMSException {
20 Queue queue = activeMQServer.createQueue(queueName);
21 producer = activeMQServer.getSession().createProducer(queue);
22 }
23
24 public void produce(User user) throws JMSException {
25 ObjectMessage objectMessage = new ActiveMQObjectMessage();
26 objectMessage.setObject(user);
27 producer.send(objectMessage);
28 }
29 }
发送消息
1 package activeMQStu;
2
3 import activeMQStu.queue.UserProducer;
4 import org.apache.activemq.ActiveMQConnection;
5
6 import javax.jms.JMSException;
7
8 /**
9* Created by lvyahui on 2016/4/23.
10*/
11 public class ProduceApp {
12 public static void main(String[] args) throws JMSException {
13 ActiveMQServer activeMQServer = new ActiveMQServer(
14 ActiveMQConnection.DEFAULT_USER,
15 ActiveMQConnection.DEFAULT_PASSWORD,
16 "tcp://localhost:61616"
17 );
18 UserProducer producer = new UserProducer(activeMQServer,"queue.devlyh");
19 for(int i =0 ;i < 100;i++){
20 User user = new User();
21 user.setUsername("lvyahui".concat(String.valueOf(i)));
22 user.setPassword("admin888" + i);
23 user.setSalt("salt"+i);
24 producer.produce(user);
25 }
26 activeMQServer.close();
27 }
28 }
运行成功后再页面可以看到有100条消息进入了名字为queue.devlyh的队列中
消息接受者
1 package activeMQStu.queue;
2
3 import activeMQStu.ActiveMQServer;
4 import activeMQStu.User;
5
6 import javax.jms.JMSException;
7 import javax.jms.MessageConsumer;
8 import javax.jms.ObjectMessage;
9
10 /**
11* Created by lvyahui on 2016/4/23.
12*/
13 public class UserConsumer {
14 private MessageConsumer consumer;
15
16 public UserConsumer(ActiveMQServer activeMQServer,String queueName) throws JMSException {
17 this.consumer = activeMQServer.getSession()
18 .createConsumer(activeMQServer.createQueue(queueName));
19 }
20
21 public User consume() throws JMSException {
22 ObjectMessage objectMessage= (ObjectMessage) consumer.receive();
23 User user = (User) objectMessage.getObject();
24 return user;
25 }
26 }
接收消息
1 package activeMQStu;
2
3 import activeMQStu.queue.UserConsumer;
4 import org.apache.activemq.ActiveMQConnection;
5 import org.apache.activemq.ActiveMQConnectionFactory;
6
7 import javax.jms.JMSException;
8 import java.util.Arrays;
9
10 /**
11* Created by lvyahui on 2016/4/23.
12*/
13 public class ConsumeApp {
14 public static void main(String[] args) throws JMSException {
15 ActiveMQServer activeMQServer = new ActiveMQServer(
16 ActiveMQConnection.DEFAULT_USER,
17 ActiveMQConnection.DEFAULT_PASSWORD,
18 "tcp://localhost:61616"
19 );
20
21 // ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) activeMQServer.getConnectionFactory();
22
23 UserConsumer consumer = new UserConsumer(activeMQServer,"queue.devlyh");
24 while(true){
25 User user = consumer.consume();
26 System.out.println(user);
27 }
28 }
29 }
执行效果如下
页:
[1]