vike681 发表于 2015-7-20 10:36:02

基于redis的分布式锁

  上于公司业务上需要实现分布式锁,在网上找了一些实现的案例,最终打算采用基于redis的分布式锁方案,不多废话,上代码
  核心类



1 using System;
2 using System.Diagnostics;
3 using System.Text;
4 using System.Threading;
5 using BookSleeve;
6
7 namespace ViewAlloc.Threading
8 {
9   public class RedisBillLockHandler
10   {
11         private const int DEFAULT_SINGLE_EXPIRE_TIME = 10;
12         private static readonly DateTime DEFAULT_DATETIME = new DateTime(1970, 1, 1);
13         private const int DEFAULT_DB = 0;
14
15         private readonly RedisConnection client;
16
17         ///
18         /// 构造
19         ///
20         ///
21         public RedisBillLockHandler(RedisConnection client)
22         {
23             this.client = client;
24         }
25
26         ///
27         /// 获取锁
28         /// 如果锁可用立即返回true,否则返回false
29         ///
30         ///
31         ///
32         public bool TryLock(String key)
33         {
34             return TryLock(key, 0L);
35         }
36
37         ///
38         /// 锁在给定的等待时间内空闲,则获取锁成功 返回true, 否则返回false
39         ///
40         ///
41         ///
42         ///
43         public bool TryLock(String key, long timeout)
44         {
45             try
46             {
47               Stopwatch watch = Stopwatch.StartNew();
48               do
49               {
50                     long tt = (long)(DateTime.Now - DEFAULT_DATETIME).TotalSeconds;
51                     long timestamp = tt + DEFAULT_SINGLE_EXPIRE_TIME + 1;
52
53                     var tran = client.CreateTransaction();
54                     var taskSetIfNotExists = tran.Strings.SetIfNotExists(DEFAULT_DB, key, Encoding.UTF8.GetBytes(timestamp.ToString()));
55                     var taskGet = tran.Strings.Get(DEFAULT_DB, key);
56                     tran.Execute().Wait();
57                     tran.Dispose();
58                     if (taskSetIfNotExists.Result == true)
59                     {
60                         return true;
61                     }
62                     else
63                     {
64
65                         long ex = long.Parse(Encoding.UTF8.GetString(taskGet.Result));
66                         if (tt > ex)
67                         {
68                           var taskGetSet = client.Strings.GetSet(DEFAULT_DB, key, Encoding.UTF8.GetBytes(timestamp.ToString()));
69                           long old = long.Parse(Encoding.UTF8.GetString(taskGetSet.Result));
70
71                           if (ex == old)
72                           {
73                                 return true;
74                           }
75                         }
76                     }
77                     if (timeout == 0)
78                     {
79                         break;
80                     }
81                     Thread.Sleep(300);
82               } while (watch.ElapsedMilliseconds < timeout * 1000);
83               return false;
84             }
85             catch (Exception exc)
86             {
87               throw new RedisBillLockException(exc.Message, exc);
88             }
89         }
90
91         ///
92         /// 如果锁空闲立即返回
93         /// 获取失败一直等待
94         ///
95         ///
96         public void Lock(String key)
97         {
98             try
99             {
100               do
101               {
102                     long tt = (long)(DateTime.Now - DEFAULT_DATETIME).TotalSeconds;
103                     long timestamp = tt + DEFAULT_SINGLE_EXPIRE_TIME + 1;
104
105                     var tran = client.CreateTransaction();
106                     var taskSetIfNotExists = tran.Strings.SetIfNotExists(DEFAULT_DB, key, Encoding.UTF8.GetBytes(timestamp.ToString()));
107                     var taskGet = tran.Strings.Get(DEFAULT_DB, key);
108                     tran.Execute().Wait();
109                     tran.Dispose();
110                     if (taskSetIfNotExists.Result == true)
111                     {
112                         break;
113                     }
114                     else
115                     {
116
117                         long ex = long.Parse(Encoding.UTF8.GetString(taskGet.Result));
118                         if (tt > ex)
119                         {
120                           var taskGetSet = client.Strings.GetSet(DEFAULT_DB, key, Encoding.UTF8.GetBytes(timestamp.ToString()));
121                           long old = long.Parse(Encoding.UTF8.GetString(taskGetSet.Result));
122                           if (ex == old)
123                           {
124                                 break;
125                           }
126                         }
127                     }
128
129                     Thread.Sleep(300);
130               } while (true);
131             }
132             catch (Exception exc)
133             {
134               throw new RedisBillLockException(exc.Message, exc);
135             }
136         }
137
138         ///
139         /// 释放锁
140         ///
141         ///
142         public void UnLock(String key)
143         {
144             try
145             {
146               long tt = (long)(DateTime.Now - DEFAULT_DATETIME).TotalSeconds;
147               var taskGet = client.Strings.Get(DEFAULT_DB, key);
148               long ex = long.Parse(Encoding.UTF8.GetString(taskGet.Result));
149               if (tt < ex)
150               {
151                     var taskRemove = client.Keys.Remove(DEFAULT_DB, key);
152                     taskRemove.Wait();
153               }
154             }
155             catch (Exception exc)
156             {
157               throw new RedisBillLockException(exc.Message, exc);
158             }
159         }
160   }
161
162 }
  为了不破坏原有的代码逻辑我又加了下面两个类



using System;
namespace ViewAlloc.Threading
{
///
/// 分布式锁属性
///

public class RedisBillLockAttribute : Attribute
{
public string Scheme { set; get; }
public string Key { set; get; }
public RedisBillLockAttribute(string scheme, string key)
{
this.Scheme = scheme;
this.Key = key;
}
}
}


using System;
using System.Reflection;
using System.Runtime.Remoting;
using System.Runtime.Remoting.Messaging;
namespace ViewAlloc.Threading
{
///
/// 装饰类,不需要对每一个类进行封装了,性能可能会有一细细的影响
///
public class RedisBillLockWrapper
{
public static T Wrap(T target) where T : MarshalByRefObject
{
return new MyProxy(typeof(T), target).GetTransparentProxy() as T;
}
private class MyProxy : System.Runtime.Remoting.Proxies.RealProxy
{
public MyProxy(Type t, MarshalByRefObject target)
: base(t)
{
this.target = target;
}
public override IMessage Invoke(IMessage msg)
{
MethodBase method = (msg as IMethodMessage).MethodBase;
object[] atts = method.GetCustomAttributes(typeof(RedisBillLockAttribute), false);
bool locking = atts.Length == 1;
IMessage result = null;
if (locking)
{
RedisBillLockAttribute redisBillLockAttribute = atts as RedisBillLockAttribute;
BookSleeve.RedisConnection client = new BookSleeve.RedisConnection(redisBillLockAttribute.Scheme);
client.Open();
try
{
RedisBillLockHandler lockHandler = new RedisBillLockHandler(client);
lockHandler.Lock(redisBillLockAttribute.Key);
try
{
result = RemotingServices.ExecuteMessage(target, msg as IMethodCallMessage);
}
finally
{
lockHandler.UnLock(redisBillLockAttribute.Key);
}
}
finally
{
client.Close(false);
}
}
else
{
result = RemotingServices.ExecuteMessage(target, msg as IMethodCallMessage);
}
return result;
}
private MarshalByRefObject target;
}
}
}
  
  原先的业务逻辑类



class TestLock
{
public void Run()
{
Console.WriteLine("{0:yyyyMMddHHmmssfff}获取了锁", DateTime.Now);
Thread.Sleep(1000);
}
}
  修改后的



class TestLock : MarshalByRefObject
{

public void Run()
{
Console.WriteLine("{0:yyyyMMddHHmmssfff}获取了锁", DateTime.Now);
Thread.Sleep(1000);
}
}
  调用



TestLock testLock = RedisBillLockWrapper.Wrap(new TestLock());
testLock.Run();
  
  
页: [1]
查看完整版本: 基于redis的分布式锁