设为首页 收藏本站
查看: 1064|回复: 0

[经验分享] redis数据库之VM(虚拟内存)

[复制链接]

尚未签到

发表于 2015-11-12 12:31:19 | 显示全部楼层 |阅读模式
  reids数据库是一种内存数据库,也提供了两种持久化的方式。作为内存数据库, 访问数据的速度肯定是杠杠的。但是随着数据的不断增加,消耗的内存也就越来越多直到内存消耗完。这种问题要么增加内存,要么就是将内存中的很少用到的数据替换到硬盘中。redis采用的就是第二种方法,也就是redis中的虚拟内存实现的功能。redis采用了阻塞式更替和非阻塞式更替,这个等会再讲。先来看下虚拟内存的初始化:
  void vmInit(void) {
......
if (server.vm_max_threads != 0)
zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) {
server.vm_fp = fopen(server.vm_swap_file,"w+b");
}
......
server.vm_next_page = 0;
server.vm_near_pages = 0;
server.vm_stats_used_pages = 0;
server.vm_stats_swapped_objects = 0;
server.vm_stats_swapouts = 0;
server.vm_stats_swapins = 0;
totsize = server.vm_pages*server.vm_page_size;
......
server.io_newjobs = listCreate();
server.io_processing = listCreate();
server.io_processed = listCreate();
server.io_ready_clients = listCreate();
......
if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE,
vmThreadedIOCompletedJob, NULL) == AE_ERR)
oom("creating file event");
}既然是初始化,肯定是给虚拟内存的属性值进行赋值。
  vm_max_threads就是虚拟化最大的线程数,这个也是在非阻塞更替时用到的。既然是多线程,肯定就需要线程间的同步。
  io_newjobs:redis在把数据置换出内存还是从硬盘中load到内存都是新建一个job,然后插入到io_newjobs中。而下面的io_processing,io_processed也是相同的概念。
  io_ready_clients:客户端因为操作某个key,导致客户端阻塞,当这个key被改变就会把阻塞的客户端插入到io_ready_clients队列中。
  最后创建一个可读事件,这个事件很重要,后面会讲到。
  redis会有两种方式将内存数据置换到虚拟内存或者把虚拟内存中的数据load到内存中。第一种:采用定时器方式实现,前面也讲过redis的定时器都是在serverCron中调用。第二种就是rdb文件load和aof文件load的时候会调用。主要不同是rdb,aof文件load采用的都是阻塞式更替。而serverCron就需要看配置文件中的配置来决定。下面来看着这三个地方的代码:
  aofLoad:
force_swapout = 0;
if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
force_swapout = 1;
if (server.vm_enabled && force_swapout) {
while (zmalloc_used_memory() > server.vm_max_memory) {
if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
}
}
rdbLoad:
if (!swap_all_values && server.vm_enabled && force_swapout) {
while (zmalloc_used_memory() > server.vm_max_memory) {
if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
}
if (zmalloc_used_memory() > server.vm_max_memory)
swap_all_values = 1; /* We are already using too much mem */
}
serverCron:
if (vmCanSwapOut()) {
while (server.vm_enabled && zmalloc_used_memory() >
server.vm_max_memory)
{
int retval = (server.vm_max_threads == 0) ?
vmSwapOneObjectBlocking() :
vmSwapOneObjectThreaded();
if (retval == REDIS_ERR && !(loops % 300) &&
zmalloc_used_memory() >
(server.vm_max_memory+server.vm_max_memory/10))
{
redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
}
/* Note that when using threade I/O we free just one object,
* because anyway when the I/O thread in charge to swap this
* object out will finish, the handler of completed jobs
* will try to swap more objects if we are still out of memory. */
if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
}
}对比这三处代码,虚拟内存需要在配置文件中启用,也就是vm_enabled对应的值,并且已经占用的内存空间大于配置文件中配置的虚拟内存的最大内存(vm_max_memory),只用这两个条件都满足的情况下才会进行虚拟化。而无论是阻塞式还是非阻塞式都是会调用vmSwapOneObject这个函数:
  int vmSwapOneObject(int usethreads) {
.......
if (usethreads) {
robj *keyobj = createStringObject(key,sdslen(key));
vmSwapObjectThreaded(keyobj,val,best_db);
decrRefCount(keyobj);
return REDIS_OK;
} else {
vmpointer *vp;
if ((vp = vmSwapObjectBlocking(val)) != NULL) {
dictGetEntryVal(best) = vp;
return REDIS_OK;
} else {
return REDIS_ERR;
}
}
}redis会从数据库中选择一个比较合适的key置换出来,而且在选取最合适的key时,是每个db只重复5次。阻塞式更换会调用vmSwapObjectBlocking,而非阻塞调用vmSwapObjectThreaded
  vmpointer *vmSwapObjectBlocking(robj *val) {
.......
off_t pages = rdbSavedObjectPages(val);
.......
if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return NULL;
if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return NULL;
.......
}首先会计算出这个value需要占用多少页内存,然后找到这个value需要放在虚拟内存中的哪个位置。最后写入到虚拟内存中。
  int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
iojob *j;
j = zmalloc(sizeof(*j));
j->type = REDIS_IOJOB_PREPARE_SWAP;
j->db = db;
j->key = key;
incrRefCount(key);
j->id = j->val = val;
incrRefCount(val);
j->canceled = 0;
j->thread = (pthread_t) -1;
val->storage = REDIS_VM_SWAPPING;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
return REDIS_OK;
}非阻塞主要是创建一个job,然后加入到队列中。从下面的代码可以看到也就是io_newjobs队列中。
  void queueIOJob(iojob *j) {
redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
(void*)j, j->type, (char*)j->key->ptr);
listAddNodeTail(server.io_newjobs,j);
if (server.io_active_threads < server.vm_max_threads)
spawnIOThread();
}如果当前的线程少于vm_max_threads就会新建一个线程去进行操作。这个线程首先会屏蔽一些信号,然后才会正式进入线程操作:
  void *IOThreadEntryPoint(void *arg) {
......
while(1) {
/* Get a new job to process */
lockThreadedIO();
if (listLength(server.io_newjobs) == 0) {
/* No new jobs in queue, exit. */
redisLog(REDIS_DEBUG,&quot;Thread %ld exiting, nothing to do&quot;,
(long) pthread_self());
server.io_active_threads--;
unlockThreadedIO();
return NULL;
}
ln = listFirst(server.io_newjobs);
j = ln->value;
listDelNode(server.io_newjobs,ln);
/* Add the job in the processing queue */
j->thread = pthread_self();
listAddNodeTail(server.io_processing,j);
ln = listLast(server.io_processing); /* We use ln later to remove it */
unlockThreadedIO();
redisLog(REDIS_DEBUG,&quot;Thread %ld got a new job (type %d): %p about key '%s'&quot;,
(long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
/* Process the Job */
if (j->type == REDIS_IOJOB_LOAD) {
vmpointer *vp = (vmpointer*)j->id;
j->val = vmReadObjectFromSwap(j->page,vp->vtype);
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
j->pages = rdbSavedObjectPages(j->val);
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
j->canceled = 1;
}
/* Done: insert the job into the processed queue */
redisLog(REDIS_DEBUG,&quot;Thread %ld completed the job: %p (key %s)&quot;,
(long) pthread_self(), (void*)j, (char*)j->key->ptr);
lockThreadedIO();
listDelNode(server.io_processing,ln);
listAddNodeTail(server.io_processed,j);
unlockThreadedIO();
/* Signal the main thread there is new stuff to process */
redisAssert(write(server.io_ready_pipe_write,&quot;x&quot;,1) == 1);
}
......
}这个函数才是真正处理内存与虚拟内存的地方,主要是从io_newjobs队列中抽取一个job插入到io_processing中,再判断job的类型进行处理,逻辑很简单。大家应该看到最后一行也就是向管道写入一个字节的数据,而这行代码也就对应了vmInti中创建写事件,这个时候写事件就会被激活。
  void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
int mask)
{
......
if (j->type == REDIS_IOJOB_LOAD) {
handleClientsBlockedOnSwappedKey(db,j->key);
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
if (!vmCanSwapOut() ||
vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
{
/* Ooops... no space or we can't swap as there is
* a fork()ed Redis trying to save stuff on disk. */
j->val->storage = REDIS_VM_MEMORY; /* undo operation */
freeIOJob(j);
} else {
/* Note that we need to mark this pages as used now,
* if the job will be canceled, we'll mark them as freed
* again. */
vmMarkPagesUsed(j->page,j->pages);
j->type = REDIS_IOJOB_DO_SWAP;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
}
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
......
if (trytoswap && vmCanSwapOut() &&
zmalloc_used_memory() > server.vm_max_memory)
{
int more = 1;
while(more) {
lockThreadedIO();
more = listLength(server.io_newjobs) <
(unsigned) server.vm_max_threads;
unlockThreadedIO();
/* Don't waste CPU time if swappable objects are rare. */
if (vmSwapOneObjectThreaded() == REDIS_ERR) {
trytoswap = 0;
break;
}
}
}
}REDIS_IOJOB_PREPARE_SWAP,REDIS_IOJOB_DO_SWAP还是一样的处理,这里就不讲了。主要来看下REDIS_IOJOB_LOAD。这种类型的job主要是为了唤醒阻塞在某个键的客户端。
  void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) {
......
while (len--) {
ln = listFirst(l);
redisClient *c = ln->value;
if (dontWaitForSwappedKey(c,key)) {
/* Put the client in the list of clients ready to go as we
* loaded all the keys about it. */
listAddNodeTail(server.io_ready_clients,c);
}
}
}主要是将阻塞在某个key的客户端加入到io_ready_clients这个队列中。而这个io_ready_clients会在beforeSleep这个函数中去操作。






  








  




  



版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-138321-1-1.html 上篇帖子: redis数据结构之对象 下篇帖子: spring redis整合
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表