Redis is single threaded, how can I exploit multiple CPU / cores?
Simply start multiple instances of Redis in different ports in the same box and threat them as different servers! Given that Redis is a distributed database anyway in order to scale you need to think in terms of multiple computational units. At some point a single box may not be enough anyway.
In general key-value databases are very scalable because of the property that different keys can stay on different servers independently.
In Redis there are client libraries such Redis-rb (the Ruby client) that are able to handle multiple servers automatically using consistent hashing. We are going to implement consistent hashing in all the other major client libraries. If you use a different language you can implement it yourself otherwise just hash the key before to SET / GET it from a given server. For example imagine to have N Redis servers, server-0, server-1, ..., server-N. You want to store the key "foo", what's the right server where to put "foo" in order to distribute keys evenly among different servers? Just perform the crc = CRC32("foo"), then servernum = crc % N (the rest of the division for N). This will give a number between 0 and N-1 for every key. Connect to this server and store the key. The same for gets.
This is a basic way of performing key partitioning, consistent hashing is much better and this is why after Redis 1.0 will be released we'll try to implement this in every widely used client library starting from Python and PHP (Ruby already implements this support).
测试代码的设计。
static void MultiThreadEnqueue(List queues, List messages, string queueType)
{
Dictionary queueTaskDic = new Dictionary();
var sw = Stopwatch.StartNew();
int i = 1;
foreach (var q in queues)
{
queueTaskDic[q] = new Task(Enqueue, new QueueTask { Queue = q, Messages = messages });
queueTaskDic[q].Start();
i++;
}
Task.WaitAll(queueTaskDic.Values.ToArray());
sw.Stop();
Console.WriteLine("{2}, Enqueue {0} messages cost {1} ms, {4}, {0}, {3:0.0000}", messages.Count() * queues.Count(), sw.ElapsedMilliseconds, queueType, messages.Count() * queues.Count() * 1000.0 / sw.ElapsedMilliseconds, queues.Count());
}
static void MultiThreadDequeue(List queues, int msgCount, string queueType)
{
Dictionary queueTaskDic = new Dictionary();
var sw = Stopwatch.StartNew();
foreach (var q in queues)
{
queueTaskDic[q] = new Task(Dequeue, new QueueTask { Queue = q, DequeueCount=msgCount });
queueTaskDic[q].Start();
}
AQAccessor.cs
public static List CreateSharedTablePersistentQueues(int queueCount)
{
List ret = new List();
for (int i = 0; i < queueCount; i++)
{
ret.Add(CreatePersistentQueue(1));
}
return ret;
}
public static List CreateSharedTableBufferedQueues(int queueCount)
{
List ret = new List();
for (int i = 0; i < queueCount; i++)
{
ret.Add(CreateBufferedQueue(1));
}
return ret;
}
public static List CreateDedicatedTableBufferedQueues(int queueCount)
{
List ret = new List();
for (int i = 0; i < queueCount; i++)
{
ret.Add(CreateBufferedQueue(i % 8 +1));
}
return ret;
}
public static List CreateDedicatedTablePersistentQueues(int queueCount)
{
List ret = new List();
for (int i = 0; i < queueCount; i++)
{
ret.Add(CreatePersistentQueue(i % 8 + 1));
}
return ret;
}
RedisAccessor.cs
public static List CreateSharedServerQueues(int queueCount)
{
List ret = new List();
for (int i = 0; i < queueCount; i++)
{
ret.Add(CreateSharedServerQueue(1));
}
return ret;
}
public static List CreateDedicatedServerQueues(int queueCount)
{
List ret = new List();
for (int i = 0; i < queueCount; i++)
{
ret.Add(CreateDedicatedServerQueue(i % 8 + 1));
}
return ret;
}