|
RabbitMQ可以实现消息发布和订阅,这其中需要exchange,消息的发布者,消费者,消息队列和Exchange关系如下图
消息的生产者发出消息,然后经过Exchange转换,消息队列(queue)需要绑定Exchange,Exchange把消息发送到各个消息队列中,然后,各个消费者从消息队列中取到发布者发布的消息。
利用Java作为客户端,具体代码如下:
发布者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
*
* 用rabbitMQ实现发布订阅
* 发布类
* Created by wangtf on 2015/11/17.
*/
public class Emitlog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String args[]) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String msg = "各单位请注意";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("[send] msg: " +msg);
channel.close();
connection.close();
}
}
|
消息接受类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| import com.rabbitmq.client.*;
import java.io.IOException;
/**
* RabbitMQ实现发布订阅的功能
* 订阅类
* Created by wangtf on 2015/11/17.
*/
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.print(" waiting for message");
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("[x] receive message :" + message);
}
};
channel.basicConsume(queueName,true,consume);
}
}
|
分别运行ReceiveLogs和Emitlog,能得到如下结果:
【Emitlog】
[send] msg: 各单位请注意
Process finished with exit code 0
【ReceiveLogs】 waiting for message[x] receive message :各单位请注意
|
|
|