设为首页 收藏本站
查看: 1164|回复: 0

[经验分享] memcached+net缓存

[复制链接]

尚未签到

发表于 2015-8-31 12:46:14 | 显示全部楼层 |阅读模式
  最近对线上项目进行了一次缓存方面的优化,本来6月就该写的,但是项目一直忙,就给自己理由偷懒了
  缓存方面的思考参考代震军的文章比较多,文章地址:
  http://www.cnblogs.com/daizhj/archive/2009/11/17/1604436.html
  http://www.cnblogs.com/daizhj/archive/2009/03/23/1386652.html
  
  我以前项目上一直都是,直接使用memcached或者直接使用net本地缓存,二者都是单独使用
  这次对项目调整修改了下结构。主要目的是结合二者优势更有效使用缓存,二来也是为了解决网站更新代码缓解数据库查询压力的问题。
  先大致说下它们的优点缺点:
  1.memcached 优点是,稳定,受外因素影响较小,安全性高。缺点是速度方面不如net缓存,序列化和传输产生的额外开销比较大
  2.net缓存,优点是高效,速度极快。缺点是稳定性,安全性差,受应用程序池回收设置影响。
  
  针对以上优缺点,决定使用二级缓存模式:客户端-》NET本地缓存-》memcached缓存-》数据库  (NET本地缓存默认5分钟失效,memcached缓存时间设置,一般30分钟以上,个人觉得最后把memcached缓
  存当成个永久缓存来设置,缺点就是缓存更新方面的策略需要考虑周全)
  这样,当客户端访问本地缓存:
有数据->直接返回
无数据->读取下级缓存(“memcached缓存”)->有数据->返回,同步写份net缓存(无数据则数据库查询,并建立memcached缓存)
  -------------------------------------------------------------------------------------------
  以上设计经过实际应用,服务器内网和memcached之间的通讯量大大减少,CPU占用资源方面也相对保持稳定状态,不如同之前浮动从10%-30%(现在是10%-15%)。该项目网站日PV大概在250W+
  网站缓存方面算是有了个初步的完成,以上还有个很大的优点就是在站点重启的时候,尤其是高并发访问的情况下,net缓存一失效,就可直接通过memcached,而不会直接造成高并发查询数据库,导致
  数据库压力上升,如果有一句sql语句查询稍慢,就有可能导致查询死锁的状况出现。哈,这个可是很实际的优点。现在的代码部署了,在网站有用户访问的情况下更新代码不再是问题了
  ----------------------------------------------------------------
  上面说了这么多,引入正题,C#网站应用程序中,使用memcached,在windos服务重启会有个情况,网站和memcached之间的socks连接失败,需要网站也同步重启下,服务才能重新连接。那么我刚上面说
  的高并发查询情况导致数据库堵了的情况又会出现,而且是net本地缓存,和memcached缓存都丢失了,二者加起来至少3个G的数据
需要解决上面的问题,重新启动网站,而重新启动网站无非也就重新建立了下同memcahced之间的socks连接池,所以解决这问题的关键点是重新构建网站和memcached之间的socks连接池.本着这样的思路
  ,我这对Memcached.ClientLibrary 进行了扩展
1.
首先扩展定义了接口 ICacheManageConfig.cs
  里面定义里个必须实现的方法 void Memcached_Restart(string LogMessage);  用来重建连接池
  
  2.在调用memcached客户端组件的数据层,增加了接口实现类CacheManageConfig.cs
  

DSC0000.gif DSC0001.gif 代码

   public class CacheManageConfig : Memcached.ClientLibrary.ICacheManageConfig
    {
        private static Memcached.ClientLibrary.SockIOPool pool = null;
        
        /// <summary>
        /// 实现接口方法,memcached客户端组件反射调用接口的实现方法
        /// </summary>
        /// <param name="message"></param>
        public void Memcached_Restart(string message)
        {
            _Restart(message);
        }
        /// <summary>
        /// 初始化连接池
        /// </summary>
        public static void Memcached_Init()
        {
            Memcached.ClientLibrary.MemcachedSocksMonitor.CacheManageConfigPath = "KM.Brand.DBLib.Cache.CacheManageConfig";
            //每次初始化都用新的连接池名
            Memcached.ClientLibrary.MemcachedSocksMonitor.CachePoolName = "pool" + Guid.NewGuid().ToString();
           string memcachedserver = System.Configuration.ConfigurationManager.AppSettings["memcachedserver"];
            string[] serverlist = {memcachedserver};
            pool = Memcached.ClientLibrary.SockIOPool.GetInstance(Memcached.ClientLibrary.MemcachedSocksMonitor.CachePoolName);
            pool.SetServers(serverlist);
            pool.InitConnections = 15;
            pool.MinConnections = 15;
            pool.MaxConnections = 200;
            pool.SocketTimeout = 3000;
            pool.SocketConnectTimeout = 1000;
            pool.MaintenanceSleep = 30;
            pool.Failover = true;
            pool.Nagle = false;
            //socket单次任务的最大时间,超过这个时间socket会被强行中断掉(当前任务失败)
            pool.MaxBusy = 1000 * 10;
            pool.Initialize();
            //由于有旧代码使用默认的程序池名字,所以同时初始化一个默认名字的连接池
             pool = Memcached.ClientLibrary.SockIOPool.GetInstance();
            pool.SetServers(serverlist);
            pool.InitConnections =10;
            pool.MinConnections = 10;
            pool.MaxConnections = 50;
            pool.SocketTimeout = 3000;
            pool.SocketConnectTimeout = 1000;
            pool.MaintenanceSleep = 30;
            pool.Failover = true;
            pool.Nagle = false;
            //socket单次任务的最大时间,超过这个时间socket会被强行中断掉(当前任务失败)
             pool.MaxBusy = 1000 * 10;
            pool.Initialize();
         }
        #region  连接池重建
        /// <summary>
        /// 连接池重建
        /// </summary>
        /// <param name="message"></param>
        private static void _Restart(string message)
        {
            if (pool != null)
            {//其实可以把缓存管理模块整合到这个项目一起的。哎!               
                pool.Shutdown(Memcached.ClientLibrary.MemcachedSocksMonitor.CachePoolName);
            }
            //重新初始化
              Memcached_Init();
            KM.Brand.DBLib.Cache.CacheManage.Dispose(message);
        }
        #endregion
    }  
  
  3.在网站应用global.asax中配置连接池启动
  


protected void Application_Start(object sender, EventArgs e)
{
           //初始化memcached连接池
            KM.Brand.DBLib.Cache.CacheManageConfig.Memcached_Init();
}  
  
  4。最关键点,修改memcached客户端源代码实现,回收连接池
  主要有新建立了个类,MemcachedSocksMonitor.cs 实现连接池基本操作
  

代码

public static class MemcachedSocksMonitor
    {
        private static string _CacheManageConfigPath;
        private static int _SocksMaxErrCount;
        private static string _CachePoolName;
        /// <summary>
        /// 连接池名字
        /// </summary>
        public static string CachePoolName
        {
            get { return _CachePoolName; }
            set { _CachePoolName = value;}
        }
        ///<summary>
        ///初始化一定要设置
        ///实现接口ICacheManageConfig的类完整命名空间路径(包含类名),重新建立缓存管理模块单件实例的方法实现名称
        /// </summary>
        public static string CacheManageConfigPath
        {
            get { return _CacheManageConfigPath; }
            set { _CacheManageConfigPath = value; }
        }
        /// <summary>
        /// 执行重建的条件,该数字累加到20就重建连接池,并清零,次数累计保留1分钟
        /// </summary>
        public static int SocksMaxErrCount
        {
            get { return _SocksMaxErrCount; }
            set { _SocksMaxErrCount = value; }
        }
        /// <summary>
        /// 重建连接池
        /// </summary>
        public  static void Restart()
        {//识别需要重启,看日志错误 log.Error(GetLocalizedString("set IOException"), exception5);
            ICacheManageConfig CacheDispose = (ICacheManageConfig)Assembly.Load(CacheManageConfigPath).CreateInstance("Memcached_Restart"); //(ICacheManageConfig)Create(DisposeMonthodName);
            CacheDispose.Memcached_Restart("memcached服务连接池传输数据失败,重新生成了连接池!\r\n 无法将数据写入传输连接: 远程主机强迫关闭了一个现有的连接!!一分钟内连续出错次数" + SocksMaxErrCount);
        }
        /// <summary>
        /// 记录传输错误的失败次数,并决定是否要重建连接池
        /// 这里用了本地缓存,一般情况下这缓存都是不存在的,该方法也很少会执行到
        /// </summary>
        public static void SockIOErrRestatr()
        {
            //执行一次就进行一次逻辑处理
            string key = "MemcachedClientLibrarySockIOErrRestatr";
            object value = System.Web.HttpRuntime.Cache.Get(key);
            if (value != null)
            {
                if (SocksMaxErrCount > 20)
                {
                    //执行重建memcached连接池
                    Restart();
                    SocksMaxErrCount = 0;
                }
                else
                {
                    SocksMaxErrCount = SocksMaxErrCount + 1;
                }
            }
            else
            {
                SocksMaxErrCount = 1;
                System.Web.HttpRuntime.Cache.Add(key,1, null,DateTime.Now.AddMinutes(1), System.Web.Caching.Cache.NoSlidingExpiration, System.Web.Caching.CacheItemPriority.High, null);
            }
        }
    }  
  
  
  5.SockIOPool.cs 修改Shutdown 函授,也忘记有没其他地方修改了
  


代码

   [MethodImpl(MethodImplOptions.Synchronized)]
        public void Shutdown(string poolName)
        {
            bool isDebugEnabled = Log.IsDebugEnabled;
            if ((this._maintenanceThread != null) && this._maintenanceThread.IsRunning)
            {
                this.StopMaintenanceThread();
            }
            bool flag2 = Log.IsDebugEnabled;
            ClosePool(this._availPool);
            ClosePool(this._busyPool);
            this._availPool = null;
            this._busyPool = null;
            this._buckets = null;
            this._hostDeadDuration = null;
            this._hostDead = null;
            this._initialized = false;
            bool flag3 = Log.IsDebugEnabled;
            //下面方法是增加的,当memcached重启后,导致当前连接池连接不上,执行停止命令后
            //移除失败的连接池,这样重新初始化就会重新建立sock连接池
            if (poolName == "")
            {
                poolName = GetLocalizedString("default instance");
            }
            Pools.Remove(GetLocalizedString(poolName));
        }  
  6 MemcachedClient.cs修改,具体忘记改了哪些,全部复制上来好了
  

代码

namespace Memcached.ClientLibrary
{
    using ICSharpCode.SharpZipLib.GZip;
    using log4net;
    using System;
    using System.Collections;
    using System.Globalization;
    using System.IO;
    using System.Resources;
    using System.Runtime.Serialization;
    using System.Runtime.Serialization.Formatters.Binary;
    using System.Text;
    using System.Text.RegularExpressions;
    public class MemcachedClient
    {
        private bool _compressEnable;
        private long _compressThreshold;
        private string _defaultEncoding;
        private string _poolName;
        private bool _primitiveAsString;//Memcached.ClientLibrary.StringMessages
        private static ResourceManager _resourceManager = new ResourceManager("Memcached.ClientLibrary.StringMessages", typeof(MemcachedClient).Assembly);
        private const string CLIENT_ERROR = "CLIENT_ERROR";
        private const int COMPRESS_THRESH = 0x7800;
        private const string DELETED = "DELETED";
        private const string END = "END";
        private const string ERROR = "ERROR";
        private const int F_COMPRESSED = 2;
        private const int F_SERIALIZED = 8;
        private static ILog log = LogManager.GetLogger(typeof(MemcachedClient));
        private const string NOTFOUND = "NOT_FOUND";
        private const string NOTSTORED = "NOT_STORED";
        private const string OK = "OK";
        private const string SERVER_ERROR = "SERVER_ERROR";
        private const string STATS = "STAT";
        private const string STORED = "STORED";
        private const string VALUE = "VALUE";
        public MemcachedClient(): this("default instance")
        {
         
        }
        public MemcachedClient(string name)
        {
            if (name != "")
            {
                this.Init(name);
            }
            else
            {
                this.Init();
            }
        }
        private void Init(string key)
        {
            this._primitiveAsString = false;
            this._compressEnable = true;
            this._compressThreshold = 0x7800L;
            this._defaultEncoding = "UTF-8";
            this._poolName = GetLocalizedString(key);
        }
        public bool Add(string key, object value)
        {
            return this.Set("add", key, value, DateTime.MaxValue, null, this._primitiveAsString);
        }
        public bool Add(string key, object value, DateTime expiry)
        {
            return this.Set("add", key, value, expiry, null, this._primitiveAsString);
        }
        public bool Add(string key, object value, int hashCode)
        {
            return this.Set("add", key, value, DateTime.MaxValue, hashCode, this._primitiveAsString);
        }
        public bool Add(string key, object value, DateTime expiry, int hashCode)
        {
            return this.Set("add", key, value, expiry, hashCode, this._primitiveAsString);
        }
        public long Decrement(string key)
        {
            return this.IncrementOrDecrement("decr", key, 1L, null);
        }
        public long Decrement(string key, long inc)
        {
            return this.IncrementOrDecrement("decr", key, inc, null);
        }
        public long Decrement(string key, long inc, int hashCode)
        {
            return this.IncrementOrDecrement("decr", key, inc, hashCode);
        }
        public bool Delete(string key)
        {
            return this.Delete(key, null, DateTime.MaxValue);
        }
        public bool Delete(string key, DateTime expiry)
        {
            return this.Delete(key, null, expiry);
        }
        public bool Delete(string key, object hashCode, DateTime expiry)
        {
            if (key == null)
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("null key delete"));
                }
                return false;
            }
            SockIO sock = SockIOPool.GetInstance(this._poolName).GetSock(key, hashCode);
            if (sock != null)
            {
                StringBuilder builder = new StringBuilder("delete ").Append(key);
                if (expiry != DateTime.MaxValue)
                {
                    builder.Append(" " + (GetExpirationTime(expiry) / 0x3e8));
                }
                builder.Append("\r\n");
                try
                {
                    sock.Write(Encoding.UTF8.GetBytes(builder.ToString()));
                    sock.Flush();
                    string newValue = sock.ReadLine();
                    if ("DELETED" == newValue)
                    {
                        bool isInfoEnabled = log.IsInfoEnabled;
                        sock.Close();
                        sock = null;
                        return true;
                    }
                    if ("NOT_FOUND" == newValue)
                    {
                        if (!log.IsInfoEnabled)
                        {
                        }
                    }
                    else if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("delete key error").Replace("$$Key$$", key).Replace("$$Line$$", newValue));
                    }
                }
                catch (IOException exception)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("delete IOException"), exception);
                    }
                    try
                    {
                        sock.TrueClose();
                    }
                    catch (IOException exception2)
                    {
                        if (log.IsErrorEnabled)
                        {
                            log.Error(GetLocalizedString("failed to close some socket").Replace("$$Socket$$", sock.ToString()), exception2);
                        }
                    }
                    sock = null;
                }
                if (sock != null)
                {
                    sock.Close();
                }
            }
            return false;
        }
        public bool FlushAll()
        {
            return this.FlushAll(null);
        }
        public bool FlushAll(ArrayList servers)
        {
            SockIOPool instance = SockIOPool.GetInstance(this._poolName);
            if (instance == null)
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("unable to get socket pool"));
                }
                return false;
            }
            if (servers == null)
            {
                servers = instance.Servers;
            }
            if ((servers == null) || (servers.Count <= 0))
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("flushall no servers"));
                }
                return false;
            }
            bool flag = true;
            for (int i = 0; i < servers.Count; i++)
            {
                SockIO connection = instance.GetConnection((string) servers);
                if (connection == null)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("unable to connect").Replace("$$Server$$", servers.ToString()));
                    }
                    flag = false;
                }
                else
                {
                    string s = "flush_all\r\n";
                    try
                    {
                        connection.Write(Encoding.UTF8.GetBytes(s));
                        connection.Flush();
                        string str2 = connection.ReadLine();
                        flag = ("OK" == str2) ? flag : false;
                    }
                    catch (IOException exception)
                    {
                        if (log.IsErrorEnabled)
                        {
                            log.Error(GetLocalizedString("flushall IOException"), exception);
                        }
                        try
                        {
                            connection.TrueClose();
                        }
                        catch (IOException exception2)
                        {
                            if (log.IsErrorEnabled)
                            {
                                log.Error(GetLocalizedString("failed to close some socket").Replace("$$Socket$$", connection.ToString()), exception2);
                            }
                        }
                        flag = false;
                        connection = null;
                    }
                    if (connection != null)
                    {
                        connection.Close();
                    }
                }
            }
            return flag;
        }
        public object Get(string key)
        {
            return this.Get(key, null, false);
        }
        public object Get(string key, int hashCode)
        {
            return this.Get(key, hashCode, false);
        }
        public object Get(string key, object hashCode, bool asString)
        {
            SockIO sock = SockIOPool.GetInstance(this._poolName).GetSock(key, hashCode);
            if (sock != null)
            {
                try
                {
                    string s = "get " + key + "\r\n";
                    bool isDebugEnabled = log.IsDebugEnabled;
                    sock.Write(Encoding.UTF8.GetBytes(s));
                    sock.Flush();
                    Hashtable hm = new Hashtable();
                    this.LoadItems(sock, hm, asString);
                    bool flag2 = log.IsDebugEnabled;
                    sock.Close();
                    return hm[key];
                }
                catch (IOException exception)
                {
                    //##监测##
                    //exception.Message=无法将数据写入传输连接: 远程主机强迫关闭了一个现有的连接。。
                    //原因:memcached重启类造成的连接池无法再通信的错误
                    //解决:判断上面的message出现次数几次,就考虑重新建立连接池
                    if (exception.Message.Contains("无法将数据写入传输连接: 远程主机强迫关闭了一个现有的连接"))
                    {
                       MemcachedSocksMonitor.SockIOErrRestatr();
                    }
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("get IOException").Replace("$$Key$$", key), exception);
                    }
                    try
                    {
                        sock.TrueClose();
                    }
                    catch (IOException exception2)
                    {
                        if (log.IsErrorEnabled)
                        {
                            log.Error(GetLocalizedString("failed to close some socket").Replace("$$Socket$$", sock.ToString()), exception2);
                        }
                    }
                    sock = null;
                }
                if (sock != null)
                {
                    sock.Close();
                }
            }
            return null;
        }
        public long GetCounter(string key)
        {
            return this.GetCounter(key, null);
        }
        public long GetCounter(string key, object hashCode)
        {
            if (key == null)
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("getcounter null key"));
                }
                return -1L;
            }
            long num = -1L;
            try
            {
                num = long.Parse((string) this.Get(key, hashCode, true), new NumberFormatInfo());
            }
            catch (ArgumentException)
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("getcounter counter not found").Replace("$$Key$$", key));
                }
            }
            return num;
        }
        private static int GetExpirationTime(DateTime expiration)
        {
            if (expiration <= DateTime.Now)
            {
                return 0;
            }
            TimeSpan span = new TimeSpan(0x1d, 0x17, 0x3b, 0x3b);
            if (expiration.Subtract(DateTime.Now) > span)
            {
                return (int) span.TotalSeconds;
            }
            return (int) expiration.Subtract(DateTime.Now).TotalSeconds;
        }
        private static string GetLocalizedString(string key)
        {
            if (key != "default instance")
            {
                return key;
            }
            else
            {
                return _resourceManager.GetString(key);
            }
        }
        public Hashtable GetMultiple(string[] keys)
        {
            return this.GetMultiple(keys, null, false);
        }
        public Hashtable GetMultiple(string[] keys, int[] hashCodes)
        {
            return this.GetMultiple(keys, hashCodes, false);
        }
        public Hashtable GetMultiple(string[] keys, int[] hashCodes, bool asString)
        {
            if (keys == null)
            {
                return new Hashtable();
            }
            Hashtable hashtable = new Hashtable();
            for (int i = 0; i < keys.Length; i++)
            {
                object hashCode = null;
                if ((hashCodes != null) && (hashCodes.Length > i))
                {
                    hashCode = hashCodes;
                }
                SockIO sock = SockIOPool.GetInstance(this._poolName).GetSock(keys, hashCode);
                if (sock != null)
                {
                    if (!hashtable.ContainsKey(sock.Host))
                    {
                        hashtable[sock.Host] = new StringBuilder();
                    }
                    ((StringBuilder) hashtable[sock.Host]).Append(" " + keys);
                    sock.Close();
                }
            }
            bool isInfoEnabled = log.IsInfoEnabled;
            Hashtable hm = new Hashtable();
            ArrayList list = new ArrayList();
            foreach (string str in hashtable.Keys)
            {
                SockIO connection = SockIOPool.GetInstance(this._poolName).GetConnection(str);
                try
                {
                    string s = "get" + ((StringBuilder) hashtable[str]) + "\r\n";
                    bool flag2 = log.IsDebugEnabled;
                    connection.Write(Encoding.UTF8.GetBytes(s));
                    connection.Flush();
                    this.LoadItems(connection, hm, asString);
                }
                catch (IOException exception)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("getmultiple IOException"), exception);
                    }
                    list.Add(str);
                    try
                    {
                        connection.TrueClose();
                    }
                    catch (IOException exception2)
                    {
                        if (log.IsErrorEnabled)
                        {
                            log.Error(GetLocalizedString("failed to close some socket").Replace("$$Socket$$", connection.ToString()), exception2);
                        }
                    }
                    connection = null;
                }
                if (connection != null)
                {
                    connection.Close();
                }
            }
            foreach (string str3 in list)
            {
                hashtable.Remove(str3);
            }
            bool isDebugEnabled = log.IsDebugEnabled;
            return hm;
        }
        public object[] GetMultipleArray(string[] keys)
        {
            return this.GetMultipleArray(keys, null, false);
        }
        public object[] GetMultipleArray(string[] keys, int[] hashCodes)
        {
            return this.GetMultipleArray(keys, hashCodes, false);
        }
        public object[] GetMultipleArray(string[] keys, int[] hashCodes, bool asString)
        {
            if (keys == null)
            {
                return new object[0];
            }
            Hashtable hashtable = this.GetMultiple(keys, hashCodes, asString);
            object[] objArray = new object[keys.Length];
            for (int i = 0; i < keys.Length; i++)
            {
                objArray = hashtable[keys];
            }
            return objArray;
        }
        public long Increment(string key)
        {
            return this.IncrementOrDecrement("incr", key, 1L, null);
        }
        public long Increment(string key, long inc)
        {
            return this.IncrementOrDecrement("incr", key, inc, null);
        }
        public long Increment(string key, long inc, int hashCode)
        {
            return this.IncrementOrDecrement("incr", key, inc, hashCode);
        }
        private long IncrementOrDecrement(string cmdname, string key, long inc, object hashCode)
        {
            SockIO sock = SockIOPool.GetInstance(this._poolName).GetSock(key, hashCode);
            if (sock != null)
            {
                try
                {
                    string s = string.Concat(new object[] { cmdname, " ", key, " ", inc, "\r\n" });
                    bool isDebugEnabled = log.IsDebugEnabled;
                    sock.Write(Encoding.UTF8.GetBytes(s));
                    sock.Flush();
                    string input = sock.ReadLine();
                    if (new Regex(@"\d+").Match(input).Success)
                    {
                        sock.Close();
                        return long.Parse(input, new NumberFormatInfo());
                    }
                    if ("NOT_FOUND" == input)
                    {
                        if (!log.IsInfoEnabled)
                        {
                        }
                    }
                    else if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("incr-decr key error").Replace("$$Key$$", key));
                    }
                }
                catch (IOException exception)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("incr-decr IOException"), exception);
                    }
                    try
                    {
                        sock.TrueClose();
                    }
                    catch (IOException)
                    {
                        if (log.IsErrorEnabled)
                        {
                            log.Error(GetLocalizedString("failed to close some socket").Replace("$$Socket$$", sock.ToString()));
                        }
                    }
                    sock = null;
                }
                if (sock != null)
                {
                    sock.Close();
                }
            }
            return -1L;
        }
        private void Init()
        {
            this._primitiveAsString = false;
            this._compressEnable = true;
            this._compressThreshold = 0x7800L;
            this._defaultEncoding = "UTF-8";
            this._poolName = GetLocalizedString("default instance");
        }
        public bool KeyExists(string key)
        {
            return (this.Get(key, null, true) != null);
        }
        private void LoadItems(SockIO sock, Hashtable hm, bool asString)
        {
            string str;
            object obj2;
        Label_0000:
            str = sock.ReadLine();
            bool isDebugEnabled = log.IsDebugEnabled;
            if (!str.StartsWith("VALUE"))
            {
                if ("END" == str)
                {
                    if (log.IsDebugEnabled)
                    {
                    }
                    return;
                }
                goto Label_0000;
            }
            string[] strArray = str.Split(new char[] { ' ' });
            string newValue = strArray[1];
            int num = int.Parse(strArray[2], new NumberFormatInfo());
            int num2 = int.Parse(strArray[3], new NumberFormatInfo());
            bool flag2 = log.IsDebugEnabled;
            byte[] bytes = new byte[num2];
            sock.Read(bytes);
            sock.ClearEndOfLine();
            if ((num & 2) != 0)
            {
                try
                {
                    int num3;
                    GZipInputStream stream = new GZipInputStream(new MemoryStream(bytes));
                    MemoryStream stream2 = new MemoryStream(bytes.Length);
                    byte[] buffer = new byte[0x800];
                    while ((num3 = stream.Read(buffer, 0, buffer.Length)) > 0)
                    {
                        stream2.Write(buffer, 0, num3);
                    }
                    bytes = stream2.ToArray();
                    stream.Close();
                }
                catch (IOException exception)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("loaditems uncompression IOException").Replace("$$Key$$", newValue), exception);
                    }
                    throw new IOException(GetLocalizedString("loaditems uncompression IOException").Replace("$$Key$$", newValue), exception);
                }
            }
            if ((num & 8) == 0)
            {
                if (this._primitiveAsString || asString)
                {
                    bool isInfoEnabled = log.IsInfoEnabled;
                    obj2 = Encoding.GetEncoding(this._defaultEncoding).GetString(bytes);
                    goto Label_022A;
                }
                try
                {
                    obj2 = NativeHandler.Decode(bytes);
                    goto Label_022A;
                }
                catch (Exception exception2)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("loaditems deserialize error").Replace("$$Key$$", newValue), exception2);
                    }
                    throw new IOException(GetLocalizedString("loaditems deserialize error").Replace("$$Key$$", newValue), exception2);
                }
            }
            try
            {
                MemoryStream serializationStream = new MemoryStream(bytes);
                obj2 = new BinaryFormatter().Deserialize(serializationStream);
                bool flag4 = log.IsInfoEnabled;
            }
            catch (SerializationException exception3)
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("loaditems SerializationException").Replace("$$Key$$", newValue), exception3);
                }
                throw new IOException(GetLocalizedString("loaditems SerializationException").Replace("$$Key$$", newValue), exception3);
            }
        Label_022A:
            hm[newValue] = obj2;
            goto Label_0000;
        }
        public bool Replace(string key, object value)
        {
            return this.Set("replace", key, value, DateTime.MaxValue, null, this._primitiveAsString);
        }
        public bool Replace(string key, object value, DateTime expiry)
        {
            return this.Set("replace", key, value, expiry, null, this._primitiveAsString);
        }
        public bool Replace(string key, object value, int hashCode)
        {
            return this.Set("replace", key, value, DateTime.MaxValue, hashCode, this._primitiveAsString);
        }
        public bool Replace(string key, object value, DateTime expiry, int hashCode)
        {
            return this.Set("replace", key, value, expiry, hashCode, this._primitiveAsString);
        }
        public bool Set(string key, object value)
        {
            return this.Set("set", key, value, DateTime.MaxValue, null, this._primitiveAsString);
        }
        public bool Set(string key, object value, DateTime expiry)
        {
            return this.Set("set", key, value, expiry, null, this._primitiveAsString);
        }
        public bool Set(string key, object value, int hashCode)
        {
            return this.Set("set", key, value, DateTime.MaxValue, hashCode, this._primitiveAsString);
        }
        public bool Set(string key, object value, DateTime expiry, int hashCode)
        {
            return this.Set("set", key, value, expiry, hashCode, this._primitiveAsString);
        }
        private bool Set(string cmdname, string key, object obj, DateTime expiry, object hashCode, bool asString)
        {
            byte[] bytes;
            if (expiry < DateTime.Now)
            {
                return true;
            }
            if (((cmdname == null) || (cmdname.Trim().Length == 0)) || ((key == null) || (key.Length == 0)))
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("set key null"));
                }
                return false;
            }
            SockIO sock = SockIOPool.GetInstance(this._poolName).GetSock(key, hashCode);
            if (sock == null)
            {
                return false;
            }
            if (expiry == DateTime.MaxValue)
            {
                expiry = new DateTime(0L);
            }
            int num = 0;
            int count = 0;
            if (NativeHandler.IsHandled(obj))
            {
                if (asString)
                {
                    if (obj != null)
                    {
                        bool flag1 = log.IsInfoEnabled;
                        try
                        {
                            bytes = Encoding.UTF8.GetBytes(obj.ToString());
                            count = bytes.Length;
                            goto Label_01DF;
                        }
                        catch (ArgumentException exception)
                        {
                            if (log.IsErrorEnabled)
                            {
                                log.Error(GetLocalizedString("set invalid encoding").Replace("$$Encoding$$", this._defaultEncoding), exception);
                            }
                            sock.Close();
                            sock = null;
                            return false;
                        }
                    }
                    bytes = new byte[0];
                    count = 0;
                    goto Label_01DF;
                }
                bool isInfoEnabled = log.IsInfoEnabled;
                try
                {
                    bytes = NativeHandler.Encode(obj);
                    count = bytes.Length;
                    goto Label_01DF;
                }
                catch (ArgumentException exception2)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("set failed to native handle object"), exception2);
                    }
                    sock.Close();
                    sock = null;
                    return false;
                }
            }
            if (obj != null)
            {
                bool flag3 = log.IsInfoEnabled;
                try
                {
                    MemoryStream serializationStream = new MemoryStream();
                    new BinaryFormatter().Serialize(serializationStream, obj);
                    bytes = serializationStream.GetBuffer();
                    count = (int) serializationStream.Length;
                    num |= 8;
                    goto Label_01DF;
                }
                catch (IOException exception3)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("set failed to serialize").Replace("$$Object$$", obj.ToString()), exception3);
                    }
                    sock.Close();
                    sock = null;
                    return false;
                }
            }
            bytes = new byte[0];
            count = 0;
        Label_01DF:
            if (this._compressEnable && (count > this._compressThreshold))
            {
                bool flag4 = log.IsInfoEnabled;
                try
                {
                    MemoryStream baseOutputStream = new MemoryStream();
                    GZipOutputStream stream3 = new GZipOutputStream(baseOutputStream);
                    stream3.Write(bytes, 0, count);
                    stream3.Finish();
                    bytes = baseOutputStream.GetBuffer();
                    count = (int) baseOutputStream.Length;
                    num |= 2;
                    bool flag5 = log.IsInfoEnabled;
                }
                catch (IOException exception4)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("set compression failure"), exception4);
                    }
                }
            }
            try
            {
                string s = string.Concat(new object[] { cmdname, " ", key, " ", num, " ", GetExpirationTime(expiry), " ", count, "\r\n" });
                sock.Write(Encoding.UTF8.GetBytes(s));
                sock.Write(bytes, 0, count);
                sock.Write(Encoding.UTF8.GetBytes("\r\n"));
                sock.Flush();
                string newValue = sock.ReadLine();
                bool flag6 = log.IsInfoEnabled;
                if ("STORED" == newValue)
                {
                    bool flag7 = log.IsInfoEnabled;
                    sock.Close();
                    sock = null;
                    return true;
                }
                if ("NOT_STORED" == newValue)
                {
                    if (!log.IsInfoEnabled)
                    {
                    }
                }
                else if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("set error").Replace("$$Key$$", key).Replace("$$Size$$", count.ToString(new NumberFormatInfo())).Replace("$$Line$$", newValue));
                }
            }
            catch (IOException exception5)
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("set IOException"), exception5);
                }
                //##监测##
                //exception.Message=无法将数据写入传输连接: 远程主机强迫关闭了一个现有的连接。。
                //原因:memcached重启类造成的连接池无法再通信的错误
                //解决:判断上面的message出现次数几次,就考虑重新建立连接池
                if (exception5.Message.Contains("无法将数据写入传输连接: 远程主机强迫关闭了一个现有的连接"))
                {
                    MemcachedSocksMonitor.SockIOErrRestatr();
                }
                try
                {
                    sock.TrueClose();
                }
                catch (IOException exception6)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("failed to close some socket").Replace("$$Socket$$", sock.ToString()), exception6);
                    }
                }
                sock = null;
            }
            if (sock != null)
            {
                sock.Close();
            }
            return false;
        }
        public Hashtable Stats()
        {
            return this.Stats(null);
        }
        public Hashtable Stats(ArrayList servers)
        {
            SockIOPool instance = SockIOPool.GetInstance(this._poolName);
            if (instance == null)
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("unable to get socket pool"));
                }
                return null;
            }
            if (servers == null)
            {
                servers = instance.Servers;
            }
            if ((servers == null) || (servers.Count <= 0))
            {
                if (log.IsErrorEnabled)
                {
                    log.Error(GetLocalizedString("stats no servers"));
                }
                return null;
            }
            Hashtable hashtable = new Hashtable();
            for (int i = 0; i < servers.Count; i++)
            {
                SockIO connection = instance.GetConnection((string) servers);
                if (connection == null)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("unable to connect").Replace("$$Server$$", servers.ToString()));
                    }
                    continue;
                }
                string s = "stats\r\n";
                try
                {
                    connection.Write(Encoding.UTF8.GetBytes(s));
                    connection.Flush();
                    Hashtable hashtable2 = new Hashtable();
                    while (true)
                    {
                        string str2 = connection.ReadLine();
                        bool isDebugEnabled = log.IsDebugEnabled;
                        if (str2.StartsWith("STAT"))
                        {
                            string[] strArray = str2.Split(new char[] { ' ' });
                            string str3 = strArray[1];
                            string str4 = strArray[2];
                            bool flag2 = log.IsDebugEnabled;
                            hashtable2[str3] = str4;
                        }
                        else if ("END" == str2)
                        {
                            if (!log.IsDebugEnabled)
                            {
                            }
                            goto Label_01E4;
                        }
                        hashtable[servers] = hashtable2;
                    }
                }
                catch (IOException exception)
                {
                    if (log.IsErrorEnabled)
                    {
                        log.Error(GetLocalizedString("stats IOException"), exception);
                    }
                    try
                    {
                        connection.TrueClose();
                    }
                    catch (IOException)
                    {
                        if (log.IsErrorEnabled)
                        {
                            log.Error(GetLocalizedString("failed to close some socket").Replace("$$Socket$$", connection.ToString()));
                        }
                    }
                    connection = null;
                }
            Label_01E4:
                if (connection != null)
                {
                    connection.Close();
                }
            }
            return hashtable;
        }
        public bool StoreCounter(string key, long counter)
        {
            return this.Set("set", key, counter, DateTime.MaxValue, null, true);
        }
        public bool StoreCounter(string key, long counter, int hashCode)
        {
            return this.Set("set", key, counter, DateTime.MaxValue, hashCode, true);
        }
        public long CompressionThreshold
        {
            get
            {
                return this._compressThreshold;
            }
            set
            {
                this._compressThreshold = value;
            }
        }
        public string DefaultEncoding
        {
            get
            {
                return this._defaultEncoding;
            }
            set
            {
                this._defaultEncoding = value;
            }
        }
        public bool EnableCompression
        {
            get
            {
                return this._compressEnable;
            }
            set
            {
                this._compressEnable = value;
            }
        }
        public string PoolName
        {
            get
            {
                return this._poolName;
            }
            set
            {
                this._poolName = value;
            }
        }
        public bool PrimitiveAsString
        {
            get
            {
                return this._primitiveAsString;
            }
            set
            {
                this._primitiveAsString = value;
            }
        }
    }
}
  
  
  
  大致就以上内容,修改的地方也不算多,总结发现自己写博文很吃力,平时的缺点很明显,不知道如何去描述一个问题及解决方法,虽然自己都知道,但是就是表达不出来
  
  这文章算是靠代码说话了,很惭愧,有兴趣的话就给点意见以及错误的地方

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-106817-1-1.html 上篇帖子: Memcached通用类(基于Memcached Client Library) 下篇帖子: simple-spring-memcached
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表