killerxf 发表于 2017-7-2 11:26:41

rabbitmq 3.6 延时消息

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#安装插件 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
  生产端



rabbitTemplate.convertAndSend("direct.exchange", "notify", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(20000);
System.out.println(sdf.format(new Date()) + " Delay sent.");
return message;
}
});
  消费端



@Component("delayedReceiver")
@EnableRabbit
@Configuration
public class DelayedReceiver {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@RabbitListener(queues = "notify.queue")
public void handleMessageNotify(Object object, Channel channel) throws Exception {
Message msg = (Message) object;
System.out.println(sdf.format(new Date()) + " notify :" + new String(msg.getBody()));
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}
@Bean
public Exchange direct() {
return ExchangeBuilder.directExchange("direct.exchange").delayed().withArgument("x-delayed-type", "direct")
.build();
}
@Bean
public Queue notifyQueue() {
return new Queue("notify.queue");
}
@Bean
public Binding bindingNotify(DirectExchange direct, Queue notifyQueue) {
return BindingBuilder.bind(notifyQueue).to(direct).with("notify");
}
}
页: [1]
查看完整版本: rabbitmq 3.6 延时消息