|
1 local not_empty = function(x)
2 return (type(x) == "table") and (not x.err) and (#x ~= 0)
3 end
4
5 local qName = ARGV[1] --队列名称
6 local currentTime = ARGV[2] --当前时间,这个需要从外部传入,不能使用redis自身时间,如果使用自身时间可能导致redis本身的backup在重放请求时出现不一致性
7 local considerAsFailMaxTimeSpan = ARGV[3] --超时时间设定,当消息超过一定时间还没有ack则认为此消息需要再次入队
8
9 local zsetName= qName ..'BACKUP'
10 local hashName= qName ..'CONTEXT'
11
12 local tmp = redis.call('ZRANGEBYSCORE',zsetName , '-INF', tonumber(currentTime) - tonumber(considerAsFailMaxTimeSpan), 'LIMIT', 0, 1)
13 if (not_empty(tmp)) then
14 redis.call('ZREM', zsetName, tmp[1]) --此处拿出的为消息的唯一id
15 redis.call('LPUSH', qName, redis.call('HGET', hashName, tmp[1]))
16 end
17 tmp = redis.call('RPOP', qName)
18 if (tmp) then
19 local msg = cjson.decode(tmp)
20 local>
21 redis.call('ZADD', zsetName, tonumber(currentTime),>
22 redis.call('HSET',hashName ,>
23 end
24 return tmp |
|
|