|
近来遇到一个需求,是要在两地机房间进行缓存的同步
实现方案如下:
在两地机房cache在本地执行后,插入本地的维护两个不同方向的队列,利用http将队列中的数据打包传送到异地,解包后处理。完成同步。
1. 为避免大流量的缓存同步请求造成队列拥堵,对缓存的写操作进行分类,在cache基类里封装是否同步的开关,避免不需要同步的cache进入队列造成数据异常。
2. 队列需要的是顺序执行,要求有较快的插入,弹出以及取长度(便于计算同步效率)效率,对排序和随机插入没有要求,但是对数据要求要有持久性,便于同步脚本出问题或者机器网络故障进行数据恢复。因此选择了redis的List作为队列存储。
3. 原本最先考虑的是利用Gearman进行同步。其特点是实现简单,gearman本身封装了任务队列,只需要在代理机器上运行gearmand服务器端,在两地各个memcache集群机器上启动worker,通过分布式计算,实现负载均衡。但是此方案对做代理的server端稳定性要求高,存在server端与worker端成功建立连接后crash,worker端实现自动切换问题,并且引入gearmand这个架构增加运维成本,因此在技术实现上没有采取这种方案。
4. 后来看到agentzh大牛的openresty项目,实现了很多效率很高的nginx模块,利用Nginx事件模型来进行非阻塞I/O通信。可以利用HttpLuaModule进行nginx模块开发,直接利用httpMemc/srcacheModule进行memcache操作或lua-resty-memcached模块直接用Lua进行memcache操作。QPS大大提高。
5. 在本地服务器上启动一个类似守护进程,将本地执行成功后的cache命令压入队列,1000个分成一个数据包,curl发送到异地nginx。这里采用的是redis的Hash(哈希表)结构。
6. 为了可维护性和可扩展性,最终方案选定由HttpLuaModule和Lua-resty-memcached模块实现memcache相关操作。用lua实现set和delete操作开始是采用的是subrequest方式也就是ngx.location.capture_multi来实现,这个好处是响应是按照请求顺序返回,后面程序等待所有结果返回后才开始处理。后来考虑到memcache操作memcache集群返回值并不要求完全统一,采用了异步的Lua轻量级线程ngx.thread.spawn,这样可实现“先到先服务”。例如,当delete操作有一台机器返回HTTP.OK是可认为操作成功即将该命令和key值压入异地同步队列而无需等所有机器响应全部返回。
7. 实际过程中,发现使用的是PECL的memcache扩展,一致性hash算法Lua-resty-memcached的默认算法不同,需要实现Lua版本的一致性算法。这样在set的时候可以根据计算key值hash制定机器进行set。为了简单delete采取了轮询删除的方式。
8. 当然,如果自己实现一致性hash,可以利用fastcgi方式调用php脚本利用PECL的memcache扩展实现set以保证一致性。这样需要得到curl的post数据,上文1000个cache数据,而PHP实现的$_REQUEST 方法会自动解析,导致只有$_REQUEST数组中只会有最后一组数据。可采用file_get_contents("php://input")得到原始的POST数据。
部分实现代码如下:
1> 本地处理cache成功后并插入当前队列
local router = require "meilishuo.router"
local memcached = require "resty.memcached"
local redis = require "resty.redis"
local config = ngx.shared.config
local insert = table.insert
--ngx.req.read_body()
local param = ngx.req.get_body_data()
local request_args = router:router_mem(param)
local method = "/" .. request_args.method
local threads = {}
local total = config:get("total");
local fifo = redis:new();
--调用resty.memcached 完成delete操作
local function query_memcache(no, key)
local cacheHandle = memcached:new();
local host, port = config:get("cache_" .. no .. "_host"), config:get("cache_" .. no .. "_port")
local ok, err = cacheHandle:connect(host, port);
if not ok then
ngx.say("cache server " .. host .. ":" .. port .. " went away")
end
local res, err = cacheHandle:delete(key)
if res then
ngx.say("found cache in server" .. host .. ":" .. port)
response = 'OK'
elseif err == 'NOT_FOUND' then
ngx.say("cache not found in server " .. host .. ":" .. port)
response = 'NOT_FOUND'
elseif not res then
response = 'ERROR'
ngx.say("cache server" .. host .. ":" .. port .. "write error")
ngx.exit(ngx.HTTP_OK)
end
cacheHandle:set_keepalive(200, 300 * config:get("total"));
--ngx.say(response)
return response
end
--本地操作delete cache成功后,执行插入队列
local function insert_redis(config, fifo, param)
local host, port = config:get("fifo_host"), config:get("fifo_port");
local ok, err = fifo:connect(host, port);
if not ok then
ngx.log(ngx.ERR, "fifo server " .. host .. ":" .. port .. " went away")
end
local fifokey = "FIFO:MEM"
res = fifo:lpush(fifokey, param)
--to the connection pool
fifo:set_keepalive(200, 200 * config:get("total"));
end
if (request_args.method == 'delete') then
for no = 1, total, 1 do
--ngx.thread.spawn线程并发
local co = ngx.thread.spawn(query_memcache, no, request_args.key)
insert(threads, co)
end
for i = 1, #threads do
local ok, res = ngx.thread.wait(threads)
if not ok then
ngx.say(i, ": failed to run: ", res)
else
ngx.say(i, ": status: ", res)
if (res == 'OK') then
insert_redis(config, fifo, param)
end
end
end
elseif (request_args.method == 'set') then
local host, port = config:get("fifo_host"), config:get("fifo_port");
local ok, err = fifo:connect(host, port);
if not ok then
ngx.log(ngx.ERR, "fifo server " .. host .. ":" .. port .. " went away")
end
local fifokey = "FIFO:MEM:SET"
res = fifo:lpush(fifokey, param)
--to the connection pool
fifo:set_keepalive(200, 200 * config:get("total"));
end
2> fastcgi setcache脚本
<?php
/**
* set cache 脚本
*/
header('Content-type: text/html;charset=utf8');
//得到原始的POST数据
$data = file_get_contents("php://input");
//得到GET数据
$stamp = $_GET['stamp'];
$cacheObj = Cache::instance();
$logHandle = new log('cache_content', 'normal');
//初始化redis 查看cache队列处理status
$redis = new Redis();
$redis_key = "FIFO:status";
$host = "192.168.60.4";
$port = 6579;
$timeout = 5;
$result = $redis->pconnect($host, $port, $timeout);
if (empty($result)) {
$str = "redis server went away!";
$logHandle->w_log(print_r($str, TRUE));
exit;
}
//得到本次操作前 process status
$result = $redis->hgetall($redis_key);
$processing = $result['processing'];
$processed = $result['processed'];
if ($stamp <= $processed) {
echo "$stamp has been processed!\n";
}
if ($stamp <= $processing) {
echo "$stamp has been processing!\n";
}
$redis->hset($redis_key, 'processing', $stamp);
$response = split_and_set($data, '||', $cacheObj, $logHandle);
if (empty($response)) {
$log = new zx_log('cache_content_error', 'normal');
$str = "stamp: $stamp error when operate, content is: $data\n";
$logHandle->w_log(print_r($str, TRUE));
}
//本次操作后设置redis
$redis->hset($redis_key, 'processed', $stamp);
//处理cache值并set
function split_and_set($str, $delimiter = '||', $cacheObj, $logHandle) {
$resp = array();
$data = explode($delimiter, $str);
if (empty($data)) {
return FALSE;
}
foreach($data as $key => $value) {
$temp = explode('&', $value);
array_shift($temp);
if (empty($temp[0]) || empty($temp[1]) || empty($temp[2])) {
continue;
}
$resp[$key]['key'] = substr($temp[0], strlen('key='));
$resp[$key]['expire'] = substr($temp[1], strlen('expire='));
$arg = substr($temp[2], strlen('arg='));
$resp[$key]['value'] = unserialize(base64_decode($arg));
$logHandle->w_log(print_r($resp[$key], TRUE));
$cacheObj->set($resp[$key]['key'], $resp[$key]['value'], $resp[$key]['expire']);
}
return TRUE;
}
3> lua实现一致性hash 这个直接从github上拿了一份别人的 地址:https://github.com/alacner/flexihash
--Flexihash - A simple consistent hashing implementation for Lua.
--local nix = require "nix"
module('Flexihash', package.seeall)
Flexihash_Crc32Hasher = {
hash = function(string) return ngx.crc32_short(string) end
}
Flexihash_Md5Hasher = {
hash = function(string) return string.sub(ngx.md5(string), 0, 8) end -- 8 hexits = 32bit
}
local function array_keys_values(tbl)
local keys, values = {}, {}
for k,v in pairs(tbl) do
table.insert(keys, k)
table.insert(values, v)
end
return keys, values
end
local function __toString(this)
end
--[[
-- Sorts the internal mapping (positions to targets) by position
--]]
local function _sortPositionTargets(this)
-- sort by key (position) if not already
if not this._positionToTargetSorted then
this._sortedPositions = array_keys_values(this._positionToTarget)
table.sort(this._sortedPositions)
this._positionToTargetSorted = true
end
end
--[[
-- Add a target.
-- @param string target
--]]
local function addTarget(this, target)
if this._targetToPositions[target] then
return false, "Target '" .. target .."' already exists."
end
this._targetToPositions[target] = {}
-- hash the target into multiple positions
for i = 0, this._replicas-1 do
local position = this._hasher(target .. i)
this._positionToTarget[position] = target -- lookup
table.insert(this._targetToPositions[target], position) -- target removal
end
this._positionToTargetSorted = false;
this._targetCount = this._targetCount + 1
return this
end
--[[
-- Add a list of targets.
--@param table targets
--]]
local function addTargets(this, targets)
for k,target in pairs(targets) do
addTarget(this, target)
end
return this
end
--[[
-- Remove a target.
-- @param string target
--]]
local function removeTarget(this, target)
if not this._targetToPositions[target] then
return false, "Target '" .. target .. "' does not exist."
end
for k,position in pairs(this._targetToPositions[target]) do
if this._positionToTarget[position] then
this._positionToTarget[position] = nil
end
end
this._targetToPositions[target] = nil
this._targetCount = this._targetCount - 1
return this
end
--[[
-- A list of all potential targets
-- @return array
--]]
local function getAllTargets(this)
local targets = {}
for target,v in pairs(this._targetToPositions) do
table.insert(targets, target)
end
return targets
end
--[[
-- Get a list of targets for the resource, in order of precedence.
-- Up to $requestedCount targets are returned, less if there are fewer in total.
--
-- @param string resource
-- @param int requestedCount The length of the list to return
-- @return table List of targets
--]]
local function lookupList(this, resource, requestedCount)
if tonumber(requestedCount) == 0 then
return {}, 'Invalid count requested'
end
-- handle no targets
if this._targetCount == 0 then
return {}
end
-- optimize single target
if this._targetCount == 1 then
local keys, values = array_keys_values(this._positionToTarget)
return {values[1]}
end
-- hash resource to a position
local resourcePosition = this._hasher(resource)
local results, _results = {}, {}
local collect = false;
this._sortPositionTargets(this)
-- search values above the resourcePosition
for i,key in ipairs(this._sortedPositions) do
-- start collecting targets after passing resource position
if (not collect) and key > resourcePosition then
collect = true
end
local value = this._positionToTarget[key]
-- only collect the first instance of any target
if collect and (not _results[value]) then
table.insert(results, value)
_results[value] = true
end
-- return when enough results, or list exhausted
if #results == requestedCount or #results == this._targetCount then
return results
end
end
-- loop to start - search values below the resourcePosition
for i,key in ipairs(this._sortedPositions) do
local value = this._positionToTarget[key]
if not _results[value] then
table.insert(results, value)
_results[value] = true
end
-- return when enough results, or list exhausted
if #results == requestedCount or #results == this._targetCount then
return results
end
end
-- return results after iterating through both "parts"
return results
end
--[[
-- Looks up the target for the given resource.
-- @param string resource
-- @return string
--]]
local function lookup(this, resource)
local targets = this.lookupList(this, resource, 1)
if #targets == 0 then
return false, 'No targets exist'
end
return targets[1]
end
function New(...)
local hasher, replicas = ...
if type(hasher) ~= 'function' then
hasher = hasher or Flexihash_Crc32Hasher.hash
end
replicas = replicas or 64
local this = {
_replicas = replicas, --The number of positions to hash each target to.
_hasher = hasher, --The hash algorithm, encapsulated in a Flexihash_Hasher implementation.
_targetCount = 0, --Internal counter for current number of targets.
_positionToTarget = {}, --Internal map of positions (hash outputs) to targets @var array { position => target, ... }
_targetToPositions = {}, --Internal map of targets to lists of positions that target is hashed to. @var array { target => [ position, position, ... ], ... }
_sortedPositions = {},
_positionToTargetSorted = false, --Whether the internal map of positions to targets is already sorted.
_sortPositionTargets = _sortPositionTargets,
addTarget = addTarget,
addTargets = addTargets,
removeTarget = removeTarget,
getAllTargets = getAllTargets,
lookupList = lookupList,
lookup = lookup
}
return this
end
|
|