设为首页 收藏本站
查看: 1148|回复: 0

[经验分享] RabbitMQ(四)消息确认(发送确认,接收确认)

[复制链接]

尚未签到

发表于 2017-12-8 21:41:07 | 显示全部楼层 |阅读模式
  下面是几个问题:
  1.为什么要进行消息确认?
  2.rabbitmq消息确认 机制是什么样的?
  3.发送方如何确认消息发送成功?什么样才算发送成功?
  4.消费方如何告知rabbitmq消息消费成功或失败?
  5.使用spring的代码示例
  1.为什么要进行消息确认?
  经常会听到丢消息的字眼, 对于前面的demo来说,就存在丢消息的隐患.
  发送者没法确认是否发送成功,消费者处理失败也无法反馈.
  没有消息确认机制,就会出现消息莫名其妙的没了,也不知道什么情况.
  2.rabbitmq消息确认 机制是什么样的?
  首先看官网对消息确认的介绍http://www.rabbitmq.com/confirms.html
  网上会有很多总结的博客(包括现在看的),很多就是对官网的翻译.所以看资料首先要去官网看看,这很关键.
DSC0000.jpg

  看上图官网的介绍.唯一保证消息不丢失的是使用事务,但是性能太差,作为补偿,有了消息确认机制.
  并说明了开启方法,以及和事务模式不共存.
  还写了一个例子,但是点进去那个链接已经失效了,新版的源码上也没有这个例子,我找了最近一版是3.6.7上面还有.
  点这里看官方的例子
  3.发送的消息什么样才算成功或失败? 如何确认?
  判断消息成功或失败,其实就是看进行消息确认的时机,因为成功或失败后就会把结果告诉发送方.还是看官方解释:
DSC0001.jpg

  意思如下:
  确认消息不能路由时(exchange确认不能路由到任何queue),进行确认操作(确认失败).如果发送方设置了mandatory模式,则会先调用basic.return方法.
  消息可以路由时,当需要发送的队列都发送成功后,进行消息确认.对于持久化的队列,意味着已经写入磁盘,对于镜像队列,意味着所有镜像都接受成功.
  至于如何确认的问题,上面已经写了 basic.ack方法
  4.消费方如何告知rabbitmq消息消费成功或失败?
DSC0002.jpg

  如图可知,根据消费方不同的确认模式,确认时机也不同.
  自动确认会在消息发送给消费者后立即确认,如果手动则当消费者调用ack,nack,reject几种方法时进行确认.
  一般会设置手动模式,业务失败后可以进行一些操作.
  5.使用spring的代码示例
  下面是一个使用spring整合的代码示例:
  首先是rabbitmq的配置文件:






[html] view plain copy


print?

  • <?xml version="1.0" encoding="UTF-8"?>  
  • <beans xmlns="http://www.springframework.org/schema/beans"  
  •     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
  •     xsi:schemaLocation="http://www.springframework.org/schema/beans   
  •     http://www.springframework.org/schema/beans/spring-beans.xsd
  •     http://www.springframework.org/schema/rabbit
  •     http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">  
  •     <!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都没有,要用跟jar包匹配的版本 -->  

  •     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />  

  •     <rabbit:connection-factory   
  •         id="connectionFactory"  
  •         host="${rabbit.host}"   
  •         port="${rabbit.port}"   
  •         username="${rabbit.username}"   
  •         password="${rabbit.password}"  
  •         publisher-confirms="true"   
  •     />  

  •     <rabbit:admin connection-factory="connectionFactory" />  

  •     <!-- 给模板指定转换器 --><!-- mandatory必须设置true,return callback才生效 -->  
  •     <rabbit:template id="amqpTemplate"   connection-factory="connectionFactory"   
  •         confirm-callback="confirmCallBackListener"  
  •         return-callback="returnCallBackListener"   
  •         mandatory="true"   
  •     />  

  •     <rabbit:queue name="CONFIRM_TEST" />  

  •     <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" >  
  •         <rabbit:bindings>  
  •             <rabbit:binding queue="CONFIRM_TEST" />  
  •         </rabbit:bindings>  
  •     </rabbit:direct-exchange>  

  •     <!-- 配置consumer, 监听的类和queue的对应关系 -->  
  •     <rabbit:listener-container  
  •         connection-factory="connectionFactory" acknowledge="manual" >  
  •         <rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" />  
  •     </rabbit:listener-container>  

  • </beans>  

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
<!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都没有,要用跟jar包匹配的版本 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<rabbit:connection-factory
id="connectionFactory"
host="${rabbit.host}"
port="${rabbit.port}"
username="${rabbit.username}"
password="${rabbit.password}"
publisher-confirms="true"
/>
<rabbit:admin connection-factory="connectionFactory" />
<!-- 给模板指定转换器 --><!-- mandatory必须设置true,return callback才生效 -->
<rabbit:template id="amqpTemplate"connection-factory="connectionFactory"
confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener"
mandatory="true"
/>
<rabbit:queue name="CONFIRM_TEST" />
<rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" >
<rabbit:bindings>
<rabbit:binding queue="CONFIRM_TEST" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 配置consumer, 监听的类和queue的对应关系 -->
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual" >
<rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" />
</rabbit:listener-container>
</beans>
  


  
然后发送方:






[java] view plain copy

  print?

  • import org.springframework.amqp.core.AmqpTemplate;  
  • import org.springframework.beans.factory.annotation.Autowired;  
  • import org.springframework.stereotype.Service;  

  • @Service("publishService")  
  • public class PublishService {  
  •     @Autowired   
  •     private AmqpTemplate amqpTemplate;   

  •     public void send(String exchange, String routingKey, Object message) {   
  •         amqpTemplate.convertAndSend(exchange, routingKey, message);
  •     }
  • }

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("publishService")
public class PublishService {
@Autowired  
private AmqpTemplate amqpTemplate;
public void send(String exchange, String routingKey, Object message) {  
amqpTemplate.convertAndSend(exchange, routingKey, message);
}  
}
  消费方:






[java] view plain copy


print?

  • import org.springframework.amqp.core.Message;  
  • import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;  
  • import org.springframework.stereotype.Service;  

  • import com.rabbitmq.client.Channel;  

  • @Service("receiveConfirmTestListener")  
  • public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {   
  •     @Override  
  •     public void onMessage(Message message, Channel channel) throws Exception {  
  •         try{  
  •             System.out.println("consumer--:"+message.getMessageProperties()+":"+new String(message.getBody()));  
  •             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  
  •         }catch(Exception e){  
  •             e.printStackTrace();//TODO 业务处理  
  •             channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);  
  •         }
  •     }
  • }

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
@Service("receiveConfirmTestListener")
public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {  
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
System.out.println("consumer--:"+message.getMessageProperties()+":"+new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e){
e.printStackTrace();//TODO 业务处理
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}  
}  
  确认后回调:






[java] view plain copy


print?

  • import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;  
  • import org.springframework.amqp.rabbit.support.CorrelationData;  
  • import org.springframework.stereotype.Service;  

  • @Service("confirmCallBackListener")  
  • public class ConfirmCallBackListener implements ConfirmCallback{  
  •     @Override  
  •     public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
  •         System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);  
  •     }
  • }

import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;
@Service("confirmCallBackListener")
public class ConfirmCallBackListener implements ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
}
}
  
失败后return回调:






[java] view plain copy

  print?

  • import org.springframework.amqp.core.Message;  
  • import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;  
  • import org.springframework.stereotype.Service;  

  • @Service("returnCallBackListener")  
  • public class ReturnCallBackListener implements ReturnCallback{  
  •     @Override  
  •     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {  
  •         System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);  
  •     }
  • }

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.stereotype.Service;
@Service("returnCallBackListener")
public class ReturnCallBackListener implements ReturnCallback{
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
}
}
  
测试类:






[java] view plain copy

  print?

  • import org.junit.Test;  
  • import org.junit.runner.RunWith;  
  • import org.springframework.beans.factory.annotation.Autowired;  
  • import org.springframework.test.context.ContextConfiguration;  
  • import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;  

  • import com.dingcheng.confirms.publish.PublishService;   

  • @RunWith(SpringJUnit4ClassRunner.class)   
  • @ContextConfiguration(locations = {"classpath:application-context.xml"})   
  • public class TestConfirm {   
  •     @Autowired   
  •     private PublishService publishService;   

  •     private static String exChange = "DIRECT_EX";  

  •     @Test   
  •     public void test1() throws InterruptedException{   
  •         String message = "currentTime:"+System.currentTimeMillis();  
  •         System.out.println("test1---message:"+message);  
  •         //exchange,queue 都正确,confirm被回调, ack=true  
  •         publishService.send(exChange,"CONFIRM_TEST",message);   
  •         Thread.sleep(1000);  
  •     }

  •     @Test   
  •     public void test2() throws InterruptedException{   
  •         String message = "currentTime:"+System.currentTimeMillis();  
  •         System.out.println("test2---message:"+message);  
  •         //exchange 错误,queue 正确,confirm被回调, ack=false  
  •         publishService.send(exChange+"NO","CONFIRM_TEST",message);   
  •         Thread.sleep(1000);  
  •     }

  •     @Test   
  •     public void test3() throws InterruptedException{   
  •         String message = "currentTime:"+System.currentTimeMillis();  
  •         System.out.println("test3---message:"+message);  
  •         //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE  
  •         publishService.send(exChange,"",message);   
  • //        Thread.sleep(1000);  
  •     }

  •     @Test   
  •     public void test4() throws InterruptedException{   
  •         String message = "currentTime:"+System.currentTimeMillis();  
  •         System.out.println("test4---message:"+message);  
  •         //exchange 错误,queue 错误,confirm被回调, ack=false  
  •         publishService.send(exChange+"NO","CONFIRM_TEST",message);   
  •         Thread.sleep(1000);  
  •     }
  • }

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.dingcheng.confirms.publish.PublishService;  

@RunWith(SpringJUnit4ClassRunner.class)  
@ContextConfiguration(locations = {"classpath:application-context.xml"})  
public class TestConfirm {  
@Autowired  
private PublishService publishService;  
private static String exChange = "DIRECT_EX";
@Test  
public void test1() throws InterruptedException{  
String message = "currentTime:"+System.currentTimeMillis();
System.out.println("test1---message:"+message);
//exchange,queue 都正确,confirm被回调, ack=true
publishService.send(exChange,"CONFIRM_TEST",message);  
Thread.sleep(1000);
}  
@Test  
public void test2() throws InterruptedException{  
String message = "currentTime:"+System.currentTimeMillis();
System.out.println("test2---message:"+message);
//exchange 错误,queue 正确,confirm被回调, ack=false
publishService.send(exChange+"NO","CONFIRM_TEST",message);  
Thread.sleep(1000);
}  
@Test  
public void test3() throws InterruptedException{  
String message = "currentTime:"+System.currentTimeMillis();
System.out.println("test3---message:"+message);
//exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
publishService.send(exChange,"",message);  
//        Thread.sleep(1000);
}  
@Test  
public void test4() throws InterruptedException{  
String message = "currentTime:"+System.currentTimeMillis();
System.out.println("test4---message:"+message);
//exchange 错误,queue 错误,confirm被回调, ack=false
publishService.send(exChange+"NO","CONFIRM_TEST",message);  
Thread.sleep(1000);
}  
}  
  


  测试结果:






[html] view plain copy

  print?

  • test1---message:currentTime:1483786948506
  • test2---message:currentTime:1483786948532
  • consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=1, messageCount=0]:currentTime:1483786948506  
  • test3---message:currentTime:1483786948536
  • confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)  
  • confirm--:correlationData:null,ack:false,cause:Channel closed by application
  • [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)   
  • return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey:
  • confirm--:correlationData:null,ack:true,cause:null
  • test4---message:currentTime:1483786948546
  • confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)  
  • [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)   


test1---message:currentTime:1483786948506
test2---message:currentTime:1483786948532
consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=1, messageCount=0]:currentTime:1483786948506
test3---message:currentTime:1483786948536
confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
confirm--:correlationData:null,ack:false,cause:Channel closed by application
[ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)  
return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey:
confirm--:correlationData:null,ack:true,cause:null
test4---message:currentTime:1483786948546
confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
[ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)  

  


  代码和配置里面,已经都有注释,就不在多说明了.(callback是异步的,所以测试中sleep1秒钟等待下)
  总结下就是:
  如果消息没有到exchange,则confirm回调,ack=false
  如果消息到达exchange,则confirm回调,ack=true
  exchange到queue成功,则不回调return
  exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
  备注:需要说明,spring-rabbit和原生的rabbit-client ,表现是不一样的.
  测试的时候,原生的client,exchange错误的话,直接就报错了,是不会到confirmListener和returnListener的
  源码地址:https://github.com/qq315737546/spring-rabbit

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422272-1-1.html 上篇帖子: (转) RabbitMQ学习之spring整合发送同步消息(注解实现) 下篇帖子: RabbitMQ学习之spring-amqp的重要类的认识
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表