设为首页 收藏本站
查看: 713|回复: 0

[经验分享] redis发布和订阅(pub/sub)

[复制链接]

尚未签到

发表于 2017-12-21 07:42:47 | 显示全部楼层 |阅读模式
--[[  
cosocket即coroutine+socket
  
顺序执行,但它是非阻塞执行方式
  
因为nginx core是非阻塞执行;
  
redis中subscribe是阻塞方式,
  
因此在nginx_lua平台中使用redis
  
中sub特性无法保持阻塞连接状态;
  
流程模型:http://www.cnblogs.com/foundwant/p/6382083.html
  

]]  
local args = ngx.req.get_uri_args()
  
local ttype = args.type  -- pub/sub
  

  
local function newRedis(timeout, ip, port, section)
  
local red = redis.new()
  
red:set_timeout(timeout)
  

  
local ok, err = red:connect(ip, port)
  
if not ok then
  
nlog.dinfo("connect:" .. err)
  
end
  

  
red:select(section)
  

  
return red
  
end
  

  
local red = newRedis(10000, "127.0.0.1", "6379", 0)
  
local bak = newRedis(10000, "127.0.0.1", "6379", 0)
  

  
local function subscribe(channel)
  
local res, err = red:subscribe(channel)
  
if not res then
  
nlog.dinfo("subscribe error.")
  
return nil, err
  
end
  

  
--这里以函数返回,不然sub会在这里断连失去可操作性
  
--这就是提到的特殊之一
  
local function read_func(do_read)
  
if nil == do_read or true == do_read then
  
res, err = red:read_reply()
  
if not res then
  
return nil, err
  
end
  

  
return res
  
end
  

  
red:unsubscribe(channel)
  
red:set_keepalive(60000, 100)
  

  
--连接回收
  
        bak:close()
  
bak:set_keepalive(60000, 100)
  

  
--断连后重启等待
  
red = newRedis(10000, "127.0.0.1", "6379", 0)
  
red:subscribe(channel)
  

  
bak = newRedis(10000, "127.0.0.1", "6379", 0)
  
return
  
end
  

  
return read_func
  
end
  

  
local subset = "subset"  --set
  
local channel = "test"   --list
  

  
consume = function(length)
  
--若订阅者消息队列有残余,先消费,再订阅
  
for i=1, llength do
  
local recv, err = red:lpop(channel)  --头部开始消费
  
nlog.dinfo("recv:" .. cjson.encode(recv))
  
end
  
redis_util.coroutine_count = 1
  
coroutine.yield()
  
end
  

  
--订阅者
  
if "sub" == ttype then
  
--向set集合增加"订阅者id"
  
    red:sadd(subset, channel)
  

  
--为每个"订阅者id"建立list
  
local llength = red:llen(channel)
  
if 0 == llength then
  
red:rpush(channel, "hello")
  
else
  
--若订阅者消息队列有残余,先消费,再订阅
  
for i=1, llength do
  
local recv, err = red:lpop(channel)  --头部开始消费
  
nlog.dinfo("recv:" .. cjson.encode(recv))
  
end
  
end
  
nlog.dinfo("run coroutine after...")
  

  
--开始订阅
  
local func, err = subscribe(channel)
  
while true do
  
local res, err = func()   --res:["message","test","world"]
  
if err then
  
func(false)
  
end
  
--在redis的订阅模式中,
  
--单例模式下只能使用固定几个命令[ (P)SUBSCRIBE,(P)UNSUBSCRIBE,QUIT,PING,... ],
  
--无法使用其它命令,比如lpop, rpush等命令,
  
--所以这里无法使用red:lpop()来执行出队删除操作,
  
--只能另起一个客户端对象来进行删除操作;
  
local oo, ooerr = bak:lpop(channel)
  
nlog.dinfo("bak lpop:" .. cjson.encode(oo))
  
nlog.dinfo("res:" .. cjson.encode(res))
  
ngx.sleep(1)
  
end
  
end
  

  
--发布者,测试用,实际调用是在业务层
  
if "pub" == ttype then
  
--先发布,再追加队列
  
--local subchannel, err = red:spop(subset)
  
--nlog.dinfo("subchannel:" .. type(subchannel))
  
--if "userdata" ~= type(subchannel) then
  
for i=1, 1000 do
  
local str = "world_" .. i
  
red:publish(channel, str)
  
red:rpush(channel, str)  --尾部追加
  
ngx.sleep(0.1)
  
end
  
--end
  
end
  

  
--监听器,crontab定时运行
  
if "spy" == ttype then
  
while true do
  
red:publish(channel, "0")
  
ngx.sleep(60)
  
end
  
end
  

  
ok, err = red:set_keepalive(60000, 100)
  
if not ok then
  
ngx.say("set_keepalive:", err)
  
end
  

  
ngx.print("rpush done.")
  
ngx.exit(200)

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-426313-1-1.html 上篇帖子: 如何用redis来生成唯一Id 下篇帖子: redis 3.2 报错 Redis protected-mode 配置文件没有真正启动
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表