|
绑定是exchange和queue之间的一种关系,这可以简单的理解为:这个queue对这个exchange中的消息感兴趣。
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定可以使用一个额外的routingKey参数,为了避免和basic_publish参数混淆,我们称它为binding key。 我们可以这样来使用key创建一个绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
binding key的含义取决于不同的exchange类型,我们之前使用的fanout类型会直接忽略这个值。
direct类型的exchange的路由算法很简单——消息将会被传递到与它的routing key完全相同的 binding key的queue中
package com.rabbitmq.www.publish_subscribe.direct;
import java.util.Random;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private final static String HOST_ADDR = "172.18.112.102";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_ADDR);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//申明exchange 类型direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
for(int i=0;i<=10;i++){
String message = "helloworld"+i;
Random random = new Random();
String severity = "info";
if(random.nextInt(2)==1){
severity = "debug";
}
//信息发送给申明的exchange,指明routingkey
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
channel.close();
connection.close();
}
}
package com.rabbitmq.www.publish_subscribe.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private final static String HOST_ADDR = "172.18.112.102";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_ADDR);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
//direct类型exchange申明routekey debug
channel.queueBind(queueName, EXCHANGE_NAME, "debug");
System.out.println("
Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
} |
|
|