设为首页 收藏本站
查看: 1200|回复: 0

[经验分享] 根据redis的pub/sub机制,写一个即时在线聊天应用

[复制链接]

尚未签到

发表于 2017-12-22 07:46:11 | 显示全部楼层 |阅读模式
  在Redis中,有个Pub/Sub,他的主要的工作流程如:  
  redis订阅一个模式频道如:chat_*,然后由小a想找人聊天了,就发送一个消息“现在有人聊天吗?chat_a”,末尾的chat_a为标识,表示你要在chat_* 这个圈子里面说。这个时候,chat_*这个圈子的管理员,就会对所有加入这个圈子的人发送一条消息。消息内容就是小a说的话。说白了,就是有个大喇叭,你说话声音不够大,但是你想让所有人都听到你的消息,那么你就要先对喇叭说话,然后喇叭把你的话扩散。。。。
  还是根据代码说,直接描述比抽象函数还要抽象。
  首先我们先在配置文件里面配置下订阅的频道对应的监听:
  

   <!--chat-->  
     <bean/>
  

  
     <bean>
  
         <property name="connectionFactory" ref="jedisConnFactory"/>
  
         <property name="messageListeners">
  
             <map>
  
                 <entry key-ref="msgListener" value-ref="patternTopic"/>
  
             </map>
  
         </property>
  
     </bean>
  

  
     <bean>
  
         <constructor-arg value="chat_*"/>
  
     </bean>
  

  2行是根据监听消息的接口写的监听类,当监听到有消息的时候,就会调用onMessage类
  

public>@Autowired  SubService subService;
  @Override
public void onMessage(Message message, byte[] bytes) {  subService.isCall(message);
  

//        System.out.println(SerializeUtil.unserialize(bytes).toString());  System.out.println("当前的Message值为:" + message.toString());
  }
  
}
  

  4-11行 是pub/sub监听配置,redis的配置文件是  jedisConnFactory,监听的频道模式为 patternTopic,监听到的消息处理类为 msgListener
  13-15行 是监听频道的设置
  消息的entity为:
  

public>
private String user;private String content;  

//省略get set  
}
  

  Controller层代码,主要有三个方法,sendMessage 是在页面中发送发送消息,(前台有一个ajax方法一直在请求)callMsg 当监听到新的消息的时候,返回监听到的消息,addChatUser 当有新的用户加入的时候,做记录,1是方便以后根据用户返回数据,2是防止重复的用户名。
  

@Controller  
@RequestMapping(
"/back")  

public>
  @Autowired
  PubService pubService;
  

  @Autowired
  SubService subService;
  

  @Autowired
  ChatService chatService;
  

  @ResponseBody
  @RequestMapping(
"/send")public ResultMsg sendMessage(MessageEntity messageEntity) {  ResultMsg resultMsg
= new ResultMsg();if (messageEntity != null && !messageEntity.getUser().equals("") && !messageEntity.getContent().equals("")) {  pubService.sendMessage(messageEntity);
  resultMsg.setMsg(
"发送成功!");  resultMsg.setCode(ResultCode.SUCCESS);
return resultMsg;  }
  resultMsg.setMsg(
"输入信息有误!");  resultMsg.setCode(ResultCode.FAIL);
return resultMsg;  }
  

  @ResponseBody
  @RequestMapping(
"/callBack")public ResultMsg callMsg(String user) throws InterruptedException {  ResultMsg resultMsg
= new ResultMsg();  Logger logger
= LogManager.getLogger(ChatController.class);  MessageEntity message;
  message
= subService.callBack(user);if (message != null) {  resultMsg.setCode(ResultCode.SUCCESS);
  resultMsg.setContent(message);
return resultMsg;  }
else {  resultMsg.setCode(ResultCode.FAIL);
return resultMsg;  }
  }
  

  @ResponseBody
  @RequestMapping(
"/join")public ResultMsg addChatUser(String user) {  ResultMsg resultMsg
= new ResultMsg();if (chatService.addUser(user) > 0) {  resultMsg.setCode(ResultCode.SUCCESS);
  }
else {  resultMsg.setCode(ResultCode.FAIL);
  resultMsg.setMsg(
"昵称已经存在,请重新输入!");  }
return resultMsg;  }
  
}
  

  Service 有三个 接口类为:
  

ChatService  

int addUser(String user);  

  
PubService
  

void sendMessage(MessageEntity messageEntity);  

  
SubService
  

void isCall(Message message);  
MessageEntity callBack(String user)
throws InterruptedException;  

  ChatService接口的具体方法代码
  

@Service  

public>
  @Autowired
  StringRedisTemplate stringRedisTemplate;
  

  @Override
public int addUser(String user) {//判断userList 已经其中的用户是否已经存在  if (stringRedisTemplate.hasKey("userList") && stringRedisTemplate.opsForZSet().score("userList", user) != null) {
  return 0;
  } else {
  //增加新的用户,但是要判断下,是否是第一次刚启动的时候
  int currentInidex;
  if (stringRedisTemplate.hasKey("msgList")) {
  currentInidex = (int) (-1 - stringRedisTemplate.opsForList().size("msgList"));
  } else {
  currentInidex = -1;
  }
  stringRedisTemplate.opsForZSet().add("userList", user, currentInidex);
  return 1;
  }
  }
  
}
  

  PubService接口的具体方法代码:
  

@Service  

public>
  @Autowired
  StringRedisTemplate stringRedisTemplate;
  

  @Override
public void sendMessage(MessageEntity messageEntity) {//消息的频道为chat_*  String channel = "chat_";
  String content = messageEntity.getContent();
  //使得发送消息的 频道为chat_用户名  例如chat_jack 为了后面能根据这个得到 jack用户
  stringRedisTemplate.convertAndSend(channel + messageEntity.getUser(), content);
  }
  
}
  

  SubService接口的具体方法代码:
  

@Service  

public>
  @Autowired
  StringRedisTemplate stringRedisTemplate;
  

  @Autowired
  JedisConnectionFactory jedisConnFactory;
  

  @Override
public void isCall(Message message) {  MessageEntity messageEntity
= new MessageEntity();//请参考配置文件,本例中key,value的序列化方式均为string。//其中key必须为stringSerializer。和redisTemplate.convertAndSend对应  messageEntity.setUser(stringRedisTemplate.getStringSerializer().deserialize(message.getChannel()).split("_")[1]);
  messageEntity.setContent(stringRedisTemplate.getValueSerializer().deserialize(message.getBody()).toString());
  stringRedisTemplate.opsForList().leftPush("msgList", JSON.toJSONString(messageEntity));
  

  
//        Jedis jedis = (Jedis) jedisConnFactory.getConnection().getNativeConnection();
  
//        stringRedisTemplate.opsForValue().set("broadcast",jedis.pubsubNumPat().toString() );
  
//        System.out.println(jedis.pubsubNumPat().toString());
  
//        jedis.close();
  

  }
  

  @Override
  public MessageEntity callBack(String user) throws InterruptedException {
  

  //模拟1s 查看一次 不至于一直在连接redis 低于1s的频率连接redis会报错
  Thread.sleep(1000);
  
//            String msgTxt = stringRedisTemplate.opsForList().rightPop("msgList");
  //获取当前user 对应的消息 坐标值
  Double index = stringRedisTemplate.opsForZSet().score("userList", user);
  

  long l = new Double(index).longValue();
  if (stringRedisTemplate.hasKey("msgList")) {
  String msgTxt = stringRedisTemplate.opsForList().index("msgList", l);
  

  //只有当msgList 有新的消息的时候,才会获取消息
  if (msgTxt != null && msgTxt != "") {
  
//                list.remove(user);
  MessageEntity messageEntity = JSON.parseObject(msgTxt, MessageEntity.class);
  

  //消息坐标加-1
  stringRedisTemplate.opsForZSet().incrementScore("userList", user, -1);
  return messageEntity;
  }
  }
  return null;
  

  }
  
}
  

  service层用到的redis主要是 list和zset,当有用户发送消息的时候,就把消息放到list中,获取第一条可以是: opsForList().index("msgList", -1) ,第二条为:opsForList().index("msgList", -2),第三台为opsForList().index("msgList", -3)……以此类推,又因为我们前端有个ajax一直发送请求,按道理是只要我们list中有消息,我们就把他拿出来,在页面展示。但是这里又不能实时的判断当前是不是所有的用户都获取过一次,而且仅仅只能为一次。这个时候就根据list的长度以及zset的score来判断了。过程为:当有用户加入的时候, 如果是第一个用户,那么就把他的zset的score设为-1,此时list中的消息为空,只有当我们发送一条消息的时候,onMessage做出响应,再把发送的消息存到list中,这个时候,一直发送请求的ajax发现,此时消息的长度为了1(可以通过 opsForList().index("msgList", -1)得到),而且当前用户的score标志为-1,正好他们一致。那么就把这个消息取出来,在前台页面展示,然后把score自自增-1,等待list里面再次有消息放入(长度为2,可以通过opsForList().index("msgList", -2)获取)的时候才满足取出消息的条件。。以此循环;如果不是第一个加入的,就把现在消息的长度放到score中,只有当接受下一条数据的时候才展示。
  前台代码。js部分:
  

$(function () {  $(
"#user").focus();  
});
  

  

function loading(user) {  eoooxy.ajax(
"post", "/back/callBack", {"user":user}, function (r) {if (!eoooxy.isEmpty(r) && r.code == '100') {var o = r.content;var h = "<div style='margin: 10px 20px 10px 20px;'><label>" + o.user + "</label><br><label>" + o.content + "</label></div>";  $(
"#chatSpace").append(h);  $(
"#chatSpace")[0].scrollTop = $("#chatSpace")[0].scrollHeight;//$("#content").focus();  
            loading(user);
  } else {
  console.log("当前没有消息,继续请求……");
  loading(user);
  }
  }, "json"/*, function (XMLHttpRequest, status) {
  if (status == 'timeout') {//超时,status还有success,error等值的情况
  loading();
  }
  }, 3000*/)
  
}
  

  
function chatting() {
  if (eoooxy.isEmpty($("#user").val())) {
  alert("必须先输入昵称,然后点击开始聊天!");
  return false;
  }
  var data = {"user": $("#user").val()};
  eoooxy.ajax("post", "/back/join", data, function (r) {
  if (!eoooxy.isEmpty(r) && r.code == '100') {
  $("#user").attr("disabled", "disabled");
  loading($("#user").val());
  } else {
  alert(r.msg);
  }
  }, "json")
  
}
  

  
function sendInfo() {
  var message = $("#content");
  var data = {"user": $("#user").val(), "content": $("#content").val()};
  // var data = {"user": "jack", "content": $("#content").val()};
  eoooxy.ajax("post", "/back/send", data, function (r) {
  if (!eoooxy.isEmpty(r) && r.code == '100') {
  message.val('');
  $("#chatSpace")[0].scrollTop = $("#chatSpace")[0].scrollHeight;
  message.focus();
  } else {
  alert(r.msg);
  }
  

  })
  
}
  

  jsp部分:
  

<%@ page language="java" contentType="text/html; "  pageEncoding
="UTF-8" %>  
<html>
  
<head>
  <%@include file="/WEB-INF/view/common/meta.jsp" %>
  <title>聊天室</title>
  ${f:addJs("resources/js/back/chat.js")}
  
</head>
  
<body>
  
<div align="center">
  <h1>多人聊天室</h1>
  <span>
  <input placeholder="昵称,必须输入">

  <button>  </span>
  <div
  style="width: 600px;height: 500px; border: solid 1px #CCCCCC; overflow-y: auto;text-align: left">
  

  </div>
  <div>
  <textarea
  style="width: 590px;height: 60px; resize: none;border: 0px;outline: none;margin: 5px;"></textarea>

  <button>  </div>
  
</div>
  
</body>
  
</html>
  

  以上就是根据pub/sub 以及ajax长连接写的一个在线实时聊天系统(实际上延迟1s),如果有错,请指出,谢谢!
  因为这边采用的是ajax长连接(就是一直问:有没有消息啊,有没有消息啊,有的话拿走然后继续问。。),所以会占用资源。如果我们能更好的优化他,我们可以使用H5的新的特性WebSocket来构建实时的聊天系统,具体这边我就不介绍了,因为我也还没搞透彻,没有调查没有发言权。。。
  示例代码git连接:https://github.com/eoooxy/anhoo

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-426703-1-1.html 上篇帖子: Redis的EXPIRE过期机制介绍 下篇帖子: redis集群设置密码详解
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表