我是条汉子 发表于 2015-9-1 02:14:14

memcached client

memcached client - Enyim 1.2.0.2

EnyimMemcached - 1.2.0.2
Project Home: http://enyimmemcached.codeplex.com/
Examples
下载binary测试时发生错误,下载源码编译后没有再遇到
Configurations:


<configSections>
<sectionGroup name="enyim.com">
<section name="memcached" type="Enyim.Caching.Configuration.MemcachedClientSection, Enyim.Caching" />
</sectionGroup>
<section name="memcached" type="Enyim.Caching.Configuration.MemcachedClientSection, Enyim.Caching" />
</configSections>
<enyim.com>
<memcached>
<servers>
    <!-- put your own server(s) here-->
    <add address="127.0.0.1" port="11211" />
</servers>
<socketPool minPoolSize="10" maxPoolSize="100" connectionTimeout="00:00:10" deadTimeout="00:02:00" />
</memcached>
</enyim.com>
<memcached keyTransformer="Enyim.Caching.TigerHashTransformer, Enyim.Caching">
<servers>
<add address="127.0.0.1" port="11211" />
</servers>
<socketPool minPoolSize="2" maxPoolSize="100" connectionTimeout="00:00:10" deadTimeout="00:02:00" />
</memcached>

Basic examples: get, set, expiration


MemcachedClient mc = new MemcachedClient();
mc.Store(StoreMode.Set, "key_1", "A".PadRight(20, 'A')); //no expiration time
DateTime expireAt = DateTime.Now.AddMinutes(0.5);
mc.Store(StoreMode.Set, "key_2", "B".PadRight(20, 'B'), expireAt); //expired after 30s
mc.Store(StoreMode.Set, "key_3", "C".PadRight(20, 'C'), new TimeSpan(0, 0, 15)); //expired after 15s
Console.WriteLine("{0}:", DateTime.Now.ToString("HH:mm:ss fff"));
Console.WriteLine("\tkey_1: {0}\tno expiration time", mc.Get<string>("key_1"));
Console.WriteLine("\tkey_2: {0}\texpired at {1}", mc.Get<string>("key_2"), expireAt.ToString("HH:mm:ss fff"));
Console.WriteLine("\tkey_3: {0}\texpired after 15s", mc.Get<string>("key_3"));
Thread.Sleep(18 * 1000); //make the thread sleep for 18s, key_3 should expired
Console.WriteLine("{0}: sleep 18s", DateTime.Now.ToString("HH:mm:ss fff"));
Console.WriteLine("\tkey_1: {0}", mc.Get<string>("key_1"));
Console.WriteLine("\tkey_2: {0}", mc.Get<string>("key_2"));
Console.WriteLine("\tkey_3: {0}", mc.Get<string>("key_3"));
mc.Store(StoreMode.Add, "key_1", "X".PadRight(20, 'X'));
mc.Store(StoreMode.Add, "key_2", "Y".PadRight(20, 'Y'));
mc.Store(StoreMode.Add, "key_3", "Z".PadRight(20, 'Z'));
Console.WriteLine("{0}: try to change values by using StoreMode.Add", DateTime.Now.ToString("HH:mm:ss fff"));
//make the thread sleep 15s, key_2 should expired and key_3 should be set a new value
Thread.Sleep(15 * 1000);
Console.WriteLine("{0}: sleep 15s", DateTime.Now.ToString("HH:mm:ss fff"));
Console.WriteLine("\tkey_1: {0}", mc.Get<string>("key_1"));
Console.WriteLine("\tkey_2: {0}", mc.Get<string>("key_2"));
Console.WriteLine("\tkey_3: {0}", mc.Get<string>("key_3"));

object get and set


public enum UserGender
{
    Male = 1,
    Female = 2,
    Unspecified = 0,
}

public class User
{
    public int ID { get; set; }
    public string Name { get; set; }
    public DateTime Birthday { get; set; }
    public UserGender Gender { get; set; }
    public override string ToString()
    {
      return new StringBuilder()
            .Append("User{")
            .Append("ID:").Append(this.ID).Append(", Name:\"").Append(this.Name).Append("\"")
            .Append(", Birthday:\"").Append(this.Birthday.ToString("yyyy-MM-dd")).Append("\"")
            .Append(", Gender:").Append(this.Gender)
            .Append("}").ToString();
    }
}
//object get and set
User user = new User()
{
    ID = 601981,
    Name = "riccc.cnblogs.com",
    Birthday = new DateTime(1943, 2, 3),
    Gender = UserGender.Male
};
mc.Store(StoreMode.Set, "user", user);
user = mc.Get<User>("user");
Console.WriteLine(user);
Test output:
   
18秒后key_3过期;随后的add命令,因为key_1和key_2仍有效,所以操作失败,而key_3已经过期,操作成功;再过15秒key_2过期
Multiple gets test
Attention: gets commands only supported by memcached 1.2.5 or higher versions, so the flowing code needs memcached 1.2.5 at least

IDictionary<string, object> multiResults = mc.Get(new string[] { "key_1", "key_2", "key_3" });
Console.WriteLine("gets command test");
foreach (KeyValuePair<string, object> kvp in multiResults)
    Console.WriteLine("\t{0}: {1}", kvp.Key, kvp.Value);
cas tests


MemcachedClient mc = new MemcachedClient();
mc.Store(StoreMode.Set, "key_1", "A".PadRight(20, 'A'));
mc.Store(StoreMode.Set, "key_2", "B".PadRight(20, 'B'));
IDictionary<string, ulong> casValues = null;
IDictionary<string, object> values = mc.Get(new string[] { "key_1", "key_2" }, out casValues);
Console.WriteLine("key_1: {0}", values["key_1"]);
Console.WriteLine("key_2: {0}", values["key_2"]);
mc.Store(StoreMode.Set, "key_1", "A".PadRight(20, 'X'));
mc.Get<string>("key_2");
Console.WriteLine("cas key_1: {0}", mc.CheckAndSet("key_1", "M".PadRight(20, 'M'), casValues["key_1"]));
Console.WriteLine("cas key_2: {0}", mc.CheckAndSet("key_2", "N".PadRight(20, 'N'), casValues["key_2"]));
Console.WriteLine("key_1 after cas: {0}", mc.Get<string>("key_1"));
Console.WriteLine("key_2 after cas: {0}", mc.Get<string>("key_2"));
Test output:
   
key_1因为在读取之后使用set命令更新了,因此cas操作失败,而key_2的cas操作成功
Consistent Hashing test
It's a bit difficult to test consistent hashing and load balance directly using Enyim, the following code token from Enyim will simplify this task


public sealed class DefaultNodeLocator
{
    private const int ServerAddressMutations = 100;
    private uint[] keys;
    private Dictionary<uint, string> servers = new Dictionary<uint, string>();
    public void Initialize(IList<string> nodes)
    {
      this.keys = new uint;
      int nodeIdx = 0;
      foreach (string node in nodes)
      {
            List<uint> tmpKeys = DefaultNodeLocator.GenerateKeys(node, DefaultNodeLocator.ServerAddressMutations);
            tmpKeys.ForEach(delegate(uint k) { this.servers = node; });
            tmpKeys.CopyTo(this.keys, nodeIdx);
            nodeIdx += DefaultNodeLocator.ServerAddressMutations;
      }
      Array.Sort<uint>(this.keys);
    }
    public string Locate(string key)
    {
      if (this.keys.Length == 0) return null;
      uint itemKeyHash = BitConverter.ToUInt32(new FNV1a().ComputeHash(Encoding.Unicode.GetBytes(key)), 0);
      int foundIndex = Array.BinarySearch<uint>(this.keys, itemKeyHash);
      if (foundIndex < 0)
      {
            foundIndex = ~foundIndex;
            if (foundIndex == 0) foundIndex = this.keys.Length - 1;
            else if (foundIndex >= this.keys.Length) foundIndex = 0;
      }
      if (foundIndex < 0 || foundIndex > this.keys.Length) return null;
      return this.servers];
    }
    private static List<uint> GenerateKeys(string node, int numberOfKeys)
    {
      const int KeyLength = 4;
      const int PartCount = 1;
      List<uint> k = new List<uint>(PartCount * numberOfKeys);
      for (int i = 0; i < numberOfKeys; i++)
      {
            byte[] data = new FNV1a().ComputeHash(Encoding.ASCII.GetBytes(String.Concat(node, "-", i)));
            for (int h = 0; h < PartCount; h++)
                k.Add(BitConverter.ToUInt32(data, h * KeyLength));
      }
      return k;
    }
}
public class FNV1a : HashAlgorithm
{
    private const uint Prime = 16777619;
    private const uint Offset = 2166136261;
    protected uint CurrentHashValue;
    public FNV1a()
    {
      this.HashSizeValue = 32;
      this.Initialize();
    }
    public override void Initialize()
    {
      this.CurrentHashValue = Offset;
    }
    protected override void HashCore(byte[] array, int ibStart, int cbSize)
    {
      int end = ibStart + cbSize;
      for (int i = ibStart; i < end; i++)
            this.CurrentHashValue = (this.CurrentHashValue ^ array) * FNV1a.Prime;
    }
    protected override byte[] HashFinal()
    {
      return BitConverter.GetBytes(this.CurrentHashValue);
    }
}

测试过程:使用4个mamcached server配置,测试的几个key会分布在这些server上;删除一个server配置,测试key值仍会分配在剩余可用server上,可以维持原映射不变化;添加新的server,同样已有key值仍映射到相同server,不会变化
备注:删除server(或者server发生故障),添加新的server,Enyim都会重新构建映射索引(ServerPool.RebuildIndexes()),映射的一致性完全由完成映射的哈希算法保证
Test code:


IList<string> servers = new List<string>(new string[] {
    "192.168.1.100:800", "192.168.1.201:800",
    "192.168.1.151:800", "192.168.1.400:800" });
DefaultNodeLocator locator = new DefaultNodeLocator();
locator.Initialize(servers); //will rebuild the mapping indexes
string key1 = "42123", key2 = Guid.NewGuid().ToString();
Console.WriteLine("key: {0}, server: {1}", key1, locator.Locate(key1));
Console.WriteLine("key: {0}, server: {1}", key2, locator.Locate(key2));
//delete a server
for (int i = 0; i < servers.Count; i++)
{
    if (servers != locator.Locate(key1) && servers != locator.Locate(key2))
    {
      servers.RemoveAt(i);
      break;
    }
}
locator = new DefaultNodeLocator();
locator.Initialize(servers); //will rebuild the mapping indexes
Console.WriteLine("mappings after 1 server was removed");
Console.WriteLine("key: {0}, server: {1}", key1, locator.Locate(key1));
Console.WriteLine("key: {0}, server: {1}", key2, locator.Locate(key2));
//add a new server
servers.Add("rdroad.com:11211");
locator = new DefaultNodeLocator();
locator.Initialize(servers); //will rebuild the mapping indexes
Console.WriteLine("mappings after new server was added");
Console.WriteLine("key: {0}, server: {1}", key1, locator.Locate(key1));
Console.WriteLine("key: {0}, server: {1}", key2, locator.Locate(key2));
Conclusion: consistent hashing is well supported
Load balance test
Test code:


IList<string> servers = new List<string>(new string[] {
   "192.168.1.100:800", "192.168.1.201:800",
   "192.168.1.151:800", "192.168.1.400:800" });
DefaultNodeLocator locator = new DefaultNodeLocator();
locator.Initialize(servers);
int[] mappings = new int; //to save the total count of mapped keys in the servers
Random rd = new Random();
for (int i = 0; i < 1000000; i++)
{
    int index = servers.IndexOf(locator.Locate(i % 3 == 0 ? i.ToString() : i % 3 == 1 ? rd.Next().ToString()
         : Guid.NewGuid().ToString()));
    mappings++;
}
for (int i = 0; i < servers.Count; i++)
    Console.WriteLine("server: {0}, keys: {1}", servers, mappings);
Test results:
   
Enyim不像Memcached.Clientlibrary那样对服务器的负载提供可配置的支持,不过这一点可以尝试通过配置server list实现,例如两台server A和B,需要A承受75%的负载,可以尝试在servers的配置中,配置3个A和1个B(这个方法需要测试,并且注意server A会存在3个MemcachedNode节点的实例,每个实例均会应用pool设置,主要是minPoolSize、maxPoolSize)
从测试效果来看,负载的分布在各种情况下均保持固定比例,负载分布很不平衡。按道理,如果无法实现负载的配置型,应当实现平均的分布负载,这是算法本身的缺陷
Conclusion: doe's not support configurable load balance, loads not balanced among servers
代码结构、处理方式
主要结构:
   
PooledSocket: 负责socket通讯,例如发送命令、读取回应消、数据。为了提高高并发情况下的吞吐量,socket一直保持连接状态,这也是memcached官方推荐的处理方式
MemcachedNode: 表示一个Memcached服务器节点,他主要负责维护当前节点的活动(可用)状态,维护与该节点通讯用的socket pool。MemcachedNode.Acquire()方法就是请求一个空闲的PooledSocket对象用于通讯作业
   socket pool的维护通过内嵌类InternalPoolImpl实现。他使用一个空闲队列freeItems保存空闲的socket连接对象,Acquire()方法请求空闲socket时,从空闲队列中取出一个,socket对象使用完毕时通过ReleaseSocket方法放回队列中。仅在请求或者释放socket回队列时加锁。使用者不用显示调用ReleaseSocket方法将socket释放回pool中,创建PooledSocket时ReleaseSocket方法作为委托传给PooledSocket,PooledSocket Dispose时自动通过委托进行调用
ServerPool: 负责对所有server节点的维护,包括
   1. key与节点的映射:LocateNode方法确定某个key值映射到哪个节点,SplitKeys确定一组key值分别映射到哪些节点(例如使用gets命令批量读取时使用),RebuildIndexes用于任何节点状态发生变化时,重建映射索引
   2. 节点状态的管理,具体处理过程如下:
       a). workingServers列表存放可用的工作节点,deadServers列表存放死节点(出现故障的)
       b). MemcachedNode创建新的socket或者从pool中请求socket时,如果发生socket异常,则将该MemcachedNode节点标记为不可用
       c). PooledSocket进行通讯时如果发生socket异常,会将该PooledSocket对象标记为不可用。PooledSocket使用完毕后都会执行Dispose方法,该方法中通过委托cleanupCallback(MemcachedNode在创建PooledSocket时传进来的)调用InternalPoolImpl的ReleaseSocket方法,将MemcachedNode标记为不可用
       d). 下一次请求中,MemcachedClient通过ServerPool调用LocateNode或者SplitKeys方法,将key映射到MemcachedNode时,如果节点被标记为不可用,则将节点从workingServers列表移入到deadServers列表中,重建映射索引,这样后续的请求就不会再将key值映射到这个不可用的节点上了
       e). ServerPool定期的对deadServers列表中的死节点检查状态(callback_isAliveTimer方法,调用MemcachedNode的Ping方法对节点状态进行检查),如果节点可用,则将节点从deadServers列表移入到workingServers列表中,并重建映射索引,这样这些状态已经恢复的节点将重新加入到工作节点中
       这个状态管理机制存在一个数据一致性问题,如果某个节点仅仅因为网络故障而中断,之前映射到他上面的数据将被映射到其他节点上。网络恢复时该节点重新加入工作组中,因为一致性哈希算法的缘故,之前映射到他上面的key又将重新映射到这个节点。问题之一,如果该节点上的数据没有清除掉,这些数据可能已经是老版本、过时的数据了;问题之二,造成其他节点上存在另外版本的脏数据
       Enyim并没有充分考虑failover、failback机制,这是一个缺陷
Operations
   
Operations目录下的类,是对Memcached各种命令(命令名称、通讯协议)的封装,构造时传入ServerPool对象,通过他可以得到与各个key对应的MemcachedNode,再取得PooledSocket,完成各个命令的通讯,以及对结果的处理
KeyTransformers: 主要是与memcached server通讯时,对key值进行处理(是否需要哈希等,使用什么哈希算法等),默认使用的DefaultKeyTransformer并不进行哈希处理(仅仅检查key值不能包含特殊字符),这在memcached server跨应用使用时比较简单
Transcoders: 引用类型对象序列化的处理
页: [1]
查看完整版本: memcached client