|
reids数据库是一种内存数据库,也提供了两种持久化的方式。作为内存数据库, 访问数据的速度肯定是杠杠的。但是随着数据的不断增加,消耗的内存也就越来越多直到内存消耗完。这种问题要么增加内存,要么就是将内存中的很少用到的数据替换到硬盘中。redis采用的就是第二种方法,也就是redis中的虚拟内存实现的功能。redis采用了阻塞式更替和非阻塞式更替,这个等会再讲。先来看下虚拟内存的初始化:
void vmInit(void) {
......
if (server.vm_max_threads != 0)
zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) {
server.vm_fp = fopen(server.vm_swap_file,"w+b");
}
......
server.vm_next_page = 0;
server.vm_near_pages = 0;
server.vm_stats_used_pages = 0;
server.vm_stats_swapped_objects = 0;
server.vm_stats_swapouts = 0;
server.vm_stats_swapins = 0;
totsize = server.vm_pages*server.vm_page_size;
......
server.io_newjobs = listCreate();
server.io_processing = listCreate();
server.io_processed = listCreate();
server.io_ready_clients = listCreate();
......
if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE,
vmThreadedIOCompletedJob, NULL) == AE_ERR)
oom("creating file event");
}既然是初始化,肯定是给虚拟内存的属性值进行赋值。
vm_max_threads就是虚拟化最大的线程数,这个也是在非阻塞更替时用到的。既然是多线程,肯定就需要线程间的同步。
io_newjobs:redis在把数据置换出内存还是从硬盘中load到内存都是新建一个job,然后插入到io_newjobs中。而下面的io_processing,io_processed也是相同的概念。
io_ready_clients:客户端因为操作某个key,导致客户端阻塞,当这个key被改变就会把阻塞的客户端插入到io_ready_clients队列中。
最后创建一个可读事件,这个事件很重要,后面会讲到。
redis会有两种方式将内存数据置换到虚拟内存或者把虚拟内存中的数据load到内存中。第一种:采用定时器方式实现,前面也讲过redis的定时器都是在serverCron中调用。第二种就是rdb文件load和aof文件load的时候会调用。主要不同是rdb,aof文件load采用的都是阻塞式更替。而serverCron就需要看配置文件中的配置来决定。下面来看着这三个地方的代码:
aofLoad:
force_swapout = 0;
if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
force_swapout = 1;
if (server.vm_enabled && force_swapout) {
while (zmalloc_used_memory() > server.vm_max_memory) {
if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
}
}
rdbLoad:
if (!swap_all_values && server.vm_enabled && force_swapout) {
while (zmalloc_used_memory() > server.vm_max_memory) {
if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
}
if (zmalloc_used_memory() > server.vm_max_memory)
swap_all_values = 1; /* We are already using too much mem */
}
serverCron:
if (vmCanSwapOut()) {
while (server.vm_enabled && zmalloc_used_memory() >
server.vm_max_memory)
{
int retval = (server.vm_max_threads == 0) ?
vmSwapOneObjectBlocking() :
vmSwapOneObjectThreaded();
if (retval == REDIS_ERR && !(loops % 300) &&
zmalloc_used_memory() >
(server.vm_max_memory+server.vm_max_memory/10))
{
redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
}
/* Note that when using threade I/O we free just one object,
* because anyway when the I/O thread in charge to swap this
* object out will finish, the handler of completed jobs
* will try to swap more objects if we are still out of memory. */
if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
}
}对比这三处代码,虚拟内存需要在配置文件中启用,也就是vm_enabled对应的值,并且已经占用的内存空间大于配置文件中配置的虚拟内存的最大内存(vm_max_memory),只用这两个条件都满足的情况下才会进行虚拟化。而无论是阻塞式还是非阻塞式都是会调用vmSwapOneObject这个函数:
int vmSwapOneObject(int usethreads) {
.......
if (usethreads) {
robj *keyobj = createStringObject(key,sdslen(key));
vmSwapObjectThreaded(keyobj,val,best_db);
decrRefCount(keyobj);
return REDIS_OK;
} else {
vmpointer *vp;
if ((vp = vmSwapObjectBlocking(val)) != NULL) {
dictGetEntryVal(best) = vp;
return REDIS_OK;
} else {
return REDIS_ERR;
}
}
}redis会从数据库中选择一个比较合适的key置换出来,而且在选取最合适的key时,是每个db只重复5次。阻塞式更换会调用vmSwapObjectBlocking,而非阻塞调用vmSwapObjectThreaded
vmpointer *vmSwapObjectBlocking(robj *val) {
.......
off_t pages = rdbSavedObjectPages(val);
.......
if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return NULL;
if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return NULL;
.......
}首先会计算出这个value需要占用多少页内存,然后找到这个value需要放在虚拟内存中的哪个位置。最后写入到虚拟内存中。
int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
iojob *j;
j = zmalloc(sizeof(*j));
j->type = REDIS_IOJOB_PREPARE_SWAP;
j->db = db;
j->key = key;
incrRefCount(key);
j->id = j->val = val;
incrRefCount(val);
j->canceled = 0;
j->thread = (pthread_t) -1;
val->storage = REDIS_VM_SWAPPING;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
return REDIS_OK;
}非阻塞主要是创建一个job,然后加入到队列中。从下面的代码可以看到也就是io_newjobs队列中。
void queueIOJob(iojob *j) {
redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
(void*)j, j->type, (char*)j->key->ptr);
listAddNodeTail(server.io_newjobs,j);
if (server.io_active_threads < server.vm_max_threads)
spawnIOThread();
}如果当前的线程少于vm_max_threads就会新建一个线程去进行操作。这个线程首先会屏蔽一些信号,然后才会正式进入线程操作:
void *IOThreadEntryPoint(void *arg) {
......
while(1) {
/* Get a new job to process */
lockThreadedIO();
if (listLength(server.io_newjobs) == 0) {
/* No new jobs in queue, exit. */
redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
(long) pthread_self());
server.io_active_threads--;
unlockThreadedIO();
return NULL;
}
ln = listFirst(server.io_newjobs);
j = ln->value;
listDelNode(server.io_newjobs,ln);
/* Add the job in the processing queue */
j->thread = pthread_self();
listAddNodeTail(server.io_processing,j);
ln = listLast(server.io_processing); /* We use ln later to remove it */
unlockThreadedIO();
redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
(long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
/* Process the Job */
if (j->type == REDIS_IOJOB_LOAD) {
vmpointer *vp = (vmpointer*)j->id;
j->val = vmReadObjectFromSwap(j->page,vp->vtype);
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
j->pages = rdbSavedObjectPages(j->val);
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
j->canceled = 1;
}
/* Done: insert the job into the processed queue */
redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
(long) pthread_self(), (void*)j, (char*)j->key->ptr);
lockThreadedIO();
listDelNode(server.io_processing,ln);
listAddNodeTail(server.io_processed,j);
unlockThreadedIO();
/* Signal the main thread there is new stuff to process */
redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);
}
......
}这个函数才是真正处理内存与虚拟内存的地方,主要是从io_newjobs队列中抽取一个job插入到io_processing中,再判断job的类型进行处理,逻辑很简单。大家应该看到最后一行也就是向管道写入一个字节的数据,而这行代码也就对应了vmInti中创建写事件,这个时候写事件就会被激活。
void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
int mask)
{
......
if (j->type == REDIS_IOJOB_LOAD) {
handleClientsBlockedOnSwappedKey(db,j->key);
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
if (!vmCanSwapOut() ||
vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
{
/* Ooops... no space or we can't swap as there is
* a fork()ed Redis trying to save stuff on disk. */
j->val->storage = REDIS_VM_MEMORY; /* undo operation */
freeIOJob(j);
} else {
/* Note that we need to mark this pages as used now,
* if the job will be canceled, we'll mark them as freed
* again. */
vmMarkPagesUsed(j->page,j->pages);
j->type = REDIS_IOJOB_DO_SWAP;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
}
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
......
if (trytoswap && vmCanSwapOut() &&
zmalloc_used_memory() > server.vm_max_memory)
{
int more = 1;
while(more) {
lockThreadedIO();
more = listLength(server.io_newjobs) <
(unsigned) server.vm_max_threads;
unlockThreadedIO();
/* Don't waste CPU time if swappable objects are rare. */
if (vmSwapOneObjectThreaded() == REDIS_ERR) {
trytoswap = 0;
break;
}
}
}
}REDIS_IOJOB_PREPARE_SWAP,REDIS_IOJOB_DO_SWAP还是一样的处理,这里就不讲了。主要来看下REDIS_IOJOB_LOAD。这种类型的job主要是为了唤醒阻塞在某个键的客户端。
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) {
......
while (len--) {
ln = listFirst(l);
redisClient *c = ln->value;
if (dontWaitForSwappedKey(c,key)) {
/* Put the client in the list of clients ready to go as we
* loaded all the keys about it. */
listAddNodeTail(server.io_ready_clients,c);
}
}
}主要是将阻塞在某个key的客户端加入到io_ready_clients这个队列中。而这个io_ready_clients会在beforeSleep这个函数中去操作。
版权声明:本文为博主原创文章,未经博主允许不得转载。 |
|