设为首页 收藏本站
查看: 996|回复: 0

[经验分享] Spring Data Redis 2 之消息订阅

[复制链接]

尚未签到

发表于 2018-11-4 10:44:15 | 显示全部楼层 |阅读模式
  Spring Data Redis是spring提供了操作Redis键值存储集成方案。
  简单回顾下 内容

  •   抽象出RedisConnection ,RedisConnectionFactory概念,集成了4个redis客户端
  •   提供的RedisTemplate,是一个高层次操作视图
  •   主要提供操作视图,主要包括两大类目:*Operations,Bound*Operations。当然Bound*Operations是对*Operations的简单封装。
  •   提供了Serializers序列工具,主要应用与key,value,hash方面。
  本节主要内容,使用SDR工具,完成消息订阅与分发,事务管理,消息管道化。
  1.消息订阅与分发
  1.1 主要相关命令

  命令参照:http://redis.readthedocs.io/en/2.4/pub_sub.html
  1.2 Redis消息监听容器声明和消息监听器注册
  为了订阅消息,需要实现MessageListener回调,每次新消息到达时,回调被调用,用户代码通过onMessage方法执行。
  DefaultMessageListener.java 一个简单的MessageListener实现
public class DefaultMessageListener  implements MessageListener {  
    protected final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageListener.class);
  
    @Override
  
    public void onMessage(Message message, byte[] pattern) {
  
        byte[] channel = message.getChannel();
  
        byte[] body = message.getBody();
  
        LOGGER.info(new String(channel) + "-->" + new String(body) + "-->" + new String(pattern));
  
    }
  
}
  考虑到“等待消息“过程是阻塞的,SDR提供了RedisMessageListenerContainer。
  RedisMessageListenerContainer充当消息侦听器容器;它用于从Redis通道接收消息,并驱动注入它的MessageListener,RedisMessageListenerContainer负责消息接收和分派到侦听器中的所有线程的处理。
  lettuce-context.xml
  

  

  

  

  

  

  

  
      
  

  

  当然为了研究方便,当然使用下面的声明方式完全可以代替上面部分,完全等价。
  
   
  
   
  
        
  
            
  
               
  
                    
  
               
  
            
  
        
  
   
  

  至此,就完成了消息的订阅了。
  1.3 MessageListenerAdapter
  MessageListenerAdapter类是Spring的异步消息传递支持的最后一个组件:简而言之,它允许您将几乎任何类暴露为MDP。比如上面的DefaultMessageListener,毕竟还是实现了MessageListener接口,并没有完全实现了MDP。可以借助MessageListenerAdapter轻松实现。
  简单展示下实现过程
  1.3.1 一个完全符合MDP的接口
public interface MessageDelegate {  
//  void handleMessage(String message);
  
  void handleMessage(Map message);
  
    void handleMessage(byte[] message);
  
  void handleMessage(Serializable message);
  
  // pass the channel/pattern as well
  
  void handleMessage(Serializable message, String channel);
  
}
  1.3.2 MDP接口实现类
public class DefaultMessageDelegate   implements MessageDelegate {  
    protected final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageDelegate.class);
  
    public void handleMessage(String message) {
  
        LOGGER.info("--------handleMessage 1-----------");
  
        LOGGER.info(message);
  
    }
  

  
    @Override
  
    public void handleMessage(Map message) {
  
        LOGGER.info("--------handleMessage 2-----------");
  
    }
  

  
    @Override
  
    public void handleMessage(byte[] message) {
  
        LOGGER.info("--------handleMessage 3-----------");
  
        LOGGER.info(new String(message));
  
    }
  

  
    public void handleMessage(Serializable message) {
  
        LOGGER.info("--------handleMessage 4-----------");
  
        LOGGER.info(message.toString());
  
    }
  

  
    public void handleMessage(Serializable message, String channel) {
  
        LOGGER.info("--------handleMessage 5-----------");
  
        LOGGER.info(message + "------>" + channel);
  
    }
  
    // implementation elided for clarity...
  
}
  1.3.3配置

  

  
      
  
      
  

  
      
  
              
  
                     
  
              
  
      
  
   
  
        
  
        
  
            
  
               
  
                    
  
                        
  
                    
  
               
  
            
  
        
  
   
  

  1.4 总结
  主要涉及4个组件
  MessageListener:Redis中发布的消息的侦听器。
  MessageListenerAdapter:消息侦听器适配器,通过反射将消息处理委托给目标侦听器方法,并进行灵活的消息类型转换
  ChannelTopic:Channel topic
  RedisMessageListenerContainer:Redis消息侦听器提供异步行为的容器。 处理侦听,转换和消息分派的低级细节
  2. 事务管理
  redis事务命令,具体见http://redis.readthedocs.io/en/2.4/transaction.html。
  首先看一个测试用例
  2.1 一个不能运行的事务代码
@Test  
public void testTran(){
  
        this.redisTemplate.multi();
  
    try {
  
        for (int i = 0; i < 1; i++) {
  
            String key = "key" + i;
  
            String value = "00000";
  
            this.redisTemplate.opsForValue().set(key, value);
  
        }
  
        this.redisTemplate.exec();
  
    }catch (Exception e){
  
        this.redisTemplate.discard();
  
    }
  
}
  执行代码会报异常。具体异常
  Caused by: com.lambdaworks.redis.RedisCommandExecutionException: ERR DISCARD without MULTI
  at com.lambdaworks.redis.LettuceFutures.await(LettuceFutures.java:76)
  at com.lambdaworks.redis.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:59)
  at com.google.common.reflect.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:87)
  at com.sun.proxy.$Proxy15.discard(Unknown Source)
  at org.springframework.data.redis.connection.lettuce.LettuceConnection.discard(LettuceConnection.java:833)
  这是因为Redis通过multi,exec和discard命令为事务提供支持。 这些操作在RedisTemplate上可用,但是RedisTemplate不能保证在事务中使用相同的连接执行所有操作。
  处理异常
  通过设置setEnableTransactionSupport(true)显式为每个正在使用的RedisTemplate启用。 这将强制将正在使用的RedisConnection绑定到触发MULTI的当前线程。
  修改后的代码如下
public void testTran(){  
    StringRedisSerializer serializer = new StringRedisSerializer();
  
    redisTemplate.setKeySerializer(serializer);
  
    redisTemplate.setValueSerializer(serializer);
  
        this.redisTemplate.setEnableTransactionSupport(true);
  
        this.redisTemplate.multi();
  
    try {
  
        for (int i = 0; i < 100; i++) {
  
            String key = "key" + i;
  
            String value = "00000";
  
            this.redisTemplate.opsForValue().set(key, value);
  
        }
  
        this.redisTemplate.exec();
  
    }catch (Exception e){
  
        this.redisTemplate.discard();
  
    }
  
}
  运行结果
  1) "key95"
  2) "key82"
  3) "key69"
  4) "key90"
  5) "key18"
  6) "key1"
  7) "key4"
  8) "key63"
  9) "key78"
  执行结果,也简介认证了redisTemplate 执行结果无序。
  2.2 正常事务代码
  @Test  
    public void testTran3(){
  
        StringRedisSerializer serializer = new StringRedisSerializer();
  
        redisTemplate.setKeySerializer(serializer);
  
        redisTemplate.setValueSerializer(serializer);
  
        redisTemplate.execute(new SessionCallback() {
  
            @Override
  
            public  List execute(RedisOperations operations) throws DataAccessException {
  
                operations.multi();
  
                try {
  
                    for(int i=0;i5){
  
                           throw  new Exception();
  
                        }
  
                    }
  
                    return operations.exec();
  
                } catch (Exception e) {
  
                    operations.discard();
  
                }
  
                // This will contain the results of all ops in the transaction
  

  
                return Collections.emptyList();
  
            }
  
        });
  
    }
  
}
  3. 管道支持
  Redis提供对流水线的支持,这涉及向服务器发送多个命令,而不必等待答复,然后在一个步骤中读取答复。当您需要在一行中发送多个命令时,流水线可以提高性能,例如向同一列表中添加许多元素。
/**  
*  在管道连接上执行给定的操作对象,返回结果。
  
*  注意,回调不能返回非空值,因为它被管道覆盖。 此方法将使用默认序列化器反序列化结果
  
*
  
* @param action callback object to execute
  
* @return list of objects returned by the pipeline
  
*/
  
List executePipelined(RedisCallback action);
  

  
/**
  
*在流水线连接上执行给定的操作对象,使用专用的序列化程序返回结果
  
*
  
* @param action callback object to execute
  
* @param resultSerializer The Serializer to use for individual values or Collections of values. If any returned
  
*          values are hashes, this serializer will be used to deserialize both the key and value
  
* @return list of objects returned by the pipeline
  
*/
  
List executePipelined(final RedisCallback action, final RedisSerializer resultSerializer);
  

  
/**
  
*在流水线连接上执行给定的Redis会话。 允许事务流水线化。
  
* 注意,回调不能返回非空值,因为它被管道覆盖。
  
* @param session Session callback
  
* @return list of objects returned by the pipeline
  
*/
  
List executePipelined(final SessionCallback session);
  

  
/**
  
* 在管道连接上执行给定的Redis会话,使用专用的序列化程序返回结果。 允许事务流水线化。
  
* 注意,回调不能返回非空值,因为它被管道覆盖。
  
*
  
* @param session Session callback
  
* @param resultSerializer
  
* @return list of objects returned by the pipeline
  
*/
  
List executePipelined(final SessionCallback session, final RedisSerializer resultSerializer);



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-630565-1-1.html 上篇帖子: redis基本使用 下篇帖子: 给redis-cluster设置密码
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表