|
网上关于redis高可用基本都是用redis-sentinel 哨兵 或者 redis cluster 集群来实现, 但是有没有更简单的方式,比如我现在就只有2个redis实例。我试验的结果是我们可用采用主备的方式来实现(我们的实际需求很简单,有2个redis实例分布在不同的计算机,在一个实例down掉后我们的应用程序有继续读写redis,主从配置可用手动修改)。需求很简单, 实现也就很简单。首先下载 https://github.com/StackExchange/StackExchange.Redis 源码。启动StackExchange.Redis-master\Redis Configs里面的主从2个实例,我最终demo的code如下:
>{static>static ConnectionMultiplexer conn;static void Main(string[] args) {
ConfigurationOptions option
= new ConfigurationOptions() { EndPoints
= {
{
"127.0.0.1", 6379 }, {
"127.0.0.1", 6380 } },
AllowAdmin
=true, };
conn
= ConnectionMultiplexer.Connect(option); database
= conn.GetDatabase();
Random rand
= new Random();
while (true) {
string val = "gavin_" + rand.Next(1, 999999).ToString(); TestWriteRead(val);
Thread.Sleep(
100); }
}
static void TestWriteRead(string value) {string key = "gavinteststring";try {
database.StringSet(key, value);
Console.WriteLine($
"写入{key}={value}成功"); }
catch (Exception ex) {
var points = conn.GetEndPoints();foreach (var item in points) {
var server = conn.GetServer(item);if (server.IsConnected) {
server.MakeMaster(ReplicationChangeOptions.All);
}
else {
server.SlaveOf(points[
1],CommandFlags.FireAndForget); }
}
database.StringSet(key, value);
Console.WriteLine($
"写入{key}={value}成功");//Console.WriteLine($"写入{key}={value}失败:"+ex.ToString());// Console.ReadKey();
}
string temp = string.Empty;
try
{
temp=database.StringGet(key);
Console.WriteLine($"读取{key}={temp}成功");
}
catch (Exception ex)
{
Console.WriteLine($"读取{key}失败:" + ex.ToString());
}
}
}
View Code 大家请先忽略我catch里面的code,当我把redis的master关闭后,程序报错:
No connection is available to service this operation: SET gavinteststring; 远程主机强迫关闭了一个现有的连接。; IOCP: (Busy=0,Free=1000,Min=4,Max=1000), WORKER: (Busy=0,Free=1023,Min=4,Max=1023), Local-CPU: 100%
找到源码后发现在ConnectionMultiplexer的ExecuteSyncImpl方法里面有这么一段:
if (!TryPushMessageToBridge(message, processor, source, ref server))
{
throw ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, message.Command, message, server, GetServerSnapshot());
}
也就是说StackExchange没有找到redis的服务器实例,继续跟踪code发现具体查找server的code在ConnectionMultiplexer的AnyConnected方法里面:
internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags) {
var tmp = serverSnapshot;int len = tmp.Length; ServerEndPoint fallback
= null;for (int i = 0; i < len; i++) {
var server = tmp[(int)(((uint)i + startOffset) % len)];if (server != null && server.ServerType == serverType && server.IsSelectable(command)) {
if (server.IsSlave) {
switch (flags) {
case CommandFlags.DemandSlave:case CommandFlags.PreferSlave:return server;case CommandFlags.PreferMaster: fallback
= server;break; }
}
else {
switch (flags) {
case CommandFlags.DemandMaster:case CommandFlags.PreferMaster:return server;case CommandFlags.PreferSlave: fallback
= server;break; }
}
}
}
return fallback; }
因为主的server已经down掉了,所以可用访问的server就是Slave,但是这里的flags默认是CommandFlags.DemandMaster。所以是找不到server。那么我们把现在的从的server改为主的server如: server.MakeMaster(ReplicationChangeOptions.All); 我以为就可以了,但是还是不行。 后来我想 如果我把主的也改为从是否可以了 server.SlaveOf(points[1],CommandFlags.FireAndForget);(我测试的时候还用过quit方法,调试有,但是release的时候说没有该方法)。运行效果如下
后来把上面的code简单封装为一个方法:
void ChangeMaster(IDatabase database) {
var mex = database.Multiplexer;var endpoints = mex.GetEndPoints();if (endpoints.Count() < 2) {
return; }
//多个endpoint 才切换主备服务器 List<EndPoint> connectedPoints = new List<EndPoint>();
List<EndPoint> disconnetedPoints = new List<EndPoint>();
foreach (var item in endpoints)
{
//判断哪些服务器可以连接
var server = mex.GetServer(item);
if (server.IsConnected)
{
connectedPoints.Add(item);
}
else
{
disconnetedPoints.Add(item);
}
}
var connectedPoint = connectedPoints.FirstOrDefault();
if (connectedPoint == null)
{
throw new Exception("没有可用的redis服务器");
}
mex.GetServer(connectedPoint).MakeMaster(ReplicationChangeOptions.All);
for (int i = 1; i < connectedPoints.Count; i++)
{
mex.GetServer(connectedPoints).SlaveOf(connectedPoint, CommandFlags.FireAndForget);
}
foreach (var item in disconnetedPoints)
{
mex.GetServer(item).SlaveOf(connectedPoint, CommandFlags.FireAndForget);
}
}
View Code -----------------------------------------2017-4-14--------------------------------------------------------
我们知道读写redis的时候都是Message包
protected Message(int db, CommandFlags flags, RedisCommand command) {
bool dbNeeded = RequiresDatabase(command);if (db < 0) {
if (dbNeeded) {
throw ExceptionFactory.DatabaseRequired(false, command); }
}
else {
if (!dbNeeded) {
throw ExceptionFactory.DatabaseNotRequired(false, command); }
}
bool masterOnly = IsMasterOnly(command); Db
= db;this.command = command;this.flags = flags & UserSelectableFlags;if (masterOnly) SetMasterOnly();
createdDateTime
= DateTime.UtcNow; createdTimestamp
= System.Diagnostics.Stopwatch.GetTimestamp(); }
internal void SetMasterOnly() {
switch (GetMasterSlaveFlags(flags)) {
case CommandFlags.DemandSlave:throw ExceptionFactory.MasterOnly(false, command, null, null);case CommandFlags.DemandMaster:// already fine as-is break;
case CommandFlags.PreferMaster:
case CommandFlags.PreferSlave:
default: // we will run this on the master, then
flags = SetMasterSlaveFlags(flags, CommandFlags.DemandMaster);
break;
}
}
internal static CommandFlags SetMasterSlaveFlags(CommandFlags everything, CommandFlags masterSlave) {
// take away the two flags we don't want, and add back the ones we care about return (everything & ~(CommandFlags.DemandMaster | CommandFlags.DemandSlave | CommandFlags.PreferMaster | CommandFlags.PreferSlave))
| masterSlave;
}
这里根据我们的Command来判断是否必须是Master主库,如果是 就代用SetMasterOnly来设置flags,那么那些指令需要Master了:
public static bool IsMasterOnly(RedisCommand command) {
switch (command) {
case RedisCommand.APPEND:case RedisCommand.BITOP:case RedisCommand.BLPOP:case RedisCommand.BRPOP:case RedisCommand.BRPOPLPUSH:case RedisCommand.DECR:case RedisCommand.DECRBY:case RedisCommand.DEL:case RedisCommand.EXPIRE:case RedisCommand.EXPIREAT:case RedisCommand.FLUSHALL:case RedisCommand.FLUSHDB:case RedisCommand.GETSET:case RedisCommand.HDEL:case RedisCommand.HINCRBY:case RedisCommand.HINCRBYFLOAT:case RedisCommand.HMSET:case RedisCommand.HSET:case RedisCommand.HSETNX:case RedisCommand.INCR:case RedisCommand.INCRBY:case RedisCommand.INCRBYFLOAT:case RedisCommand.LINSERT:case RedisCommand.LPOP:case RedisCommand.LPUSH:case RedisCommand.LPUSHX:case RedisCommand.LREM:case RedisCommand.LSET:case RedisCommand.LTRIM:case RedisCommand.MIGRATE:case RedisCommand.MOVE:case RedisCommand.MSET:case RedisCommand.MSETNX:case RedisCommand.PERSIST:case RedisCommand.PEXPIRE:case RedisCommand.PEXPIREAT:case RedisCommand.PFADD:case RedisCommand.PFMERGE:case RedisCommand.PSETEX:case RedisCommand.RENAME:case RedisCommand.RENAMENX:case RedisCommand.RESTORE:case RedisCommand.RPOP:case RedisCommand.RPOPLPUSH:case RedisCommand.RPUSH:case RedisCommand.RPUSHX:case RedisCommand.SADD:case RedisCommand.SDIFFSTORE:case RedisCommand.SET:case RedisCommand.SETBIT:case RedisCommand.SETEX:case RedisCommand.SETNX:case RedisCommand.SETRANGE:case RedisCommand.SINTERSTORE:case RedisCommand.SMOVE:case RedisCommand.SPOP:case RedisCommand.SREM:case RedisCommand.SUNIONSTORE:case RedisCommand.ZADD:case RedisCommand.ZINTERSTORE:case RedisCommand.ZINCRBY:case RedisCommand.ZREM:case RedisCommand.ZREMRANGEBYLEX:case RedisCommand.ZREMRANGEBYRANK:case RedisCommand.ZREMRANGEBYSCORE:case RedisCommand.ZUNIONSTORE:return true;default:return false; }
}
如果我们执行脚本则是用的ScriptEvalMessage类,其构造函数:
private sealed>{private readonly RedisKey[] keys;private readonly string script;private readonly RedisValue[] values;private byte[] asciiHash, hexHash;public ScriptEvalMessage(int db, CommandFlags flags, string script, RedisKey[] keys, RedisValue[] values) :
this(db, flags, ResultProcessor.ScriptLoadProcessor.IsSHA1(script) ? RedisCommand.EVALSHA : RedisCommand.EVAL, script, null, keys, values) {
if (script == null) throw new ArgumentNullException(nameof(script)); }
public ScriptEvalMessage(int db, CommandFlags flags, byte[] hash, RedisKey[] keys, RedisValue[] values) :
this(db, flags, RedisCommand.EVAL, null, hash, keys, values) {
if (hash == null) throw new ArgumentNullException(nameof(hash)); }
private ScriptEvalMessage(int db, CommandFlags flags, RedisCommand command, string script, byte[] hexHash, RedisKey[] keys, RedisValue[] values) :
base(db, flags, command) {
this.script = script;this.hexHash = hexHash;
if (keys == null) keys = RedisKey.EmptyArray;if (values == null) values = RedisValue.EmptyArray;for (int i = 0; i < keys.Length; i++) keys.AssertNotNull();
this.keys = keys;for (int i = 0; i < values.Length; i++) values.AssertNotNull();
this.values = values; }
也就是说 执行脚本可以在从库上执行,很多查询语句也可以在从库上执行。 |
|