设为首页 收藏本站
查看: 791|回复: 0

[经验分享] memcached 源代码阅读笔记(4)- SET 操作分析

[复制链接]

尚未签到

发表于 2015-9-1 11:29:22 | 显示全部楼层 |阅读模式
  memcached 源代码阅读笔记- SET 操作分析
输入
set yzn 32 0 5
hell1
  进入static void process_command(conn *c, char *command) 函数
进入如下分支
if (ntokens == 6 &&
               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
  process_update_command(c, tokens, ntokens, comm, false);
  }
可以看到调用 process_update_command函数
  static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
    char *key;
    size_t nkey;
    int flags;
    time_t exptime;
    int vlen;
    uint64_t req_cas_id;
    item *it;
  assert(c != NULL);
  if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }
  key = tokens[KEY_TOKEN].value;
    nkey = tokens[KEY_TOKEN].length;
  flags = strtoul(tokens[2].value, NULL, 10);
    exptime = strtol(tokens[3].value, NULL, 10);
    vlen = strtol(tokens[4].value, NULL, 10);
  // does cas value exist?
    if(handle_cas)
    {
      req_cas_id = strtoull(tokens[5].value, NULL, 10);
    }
  if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }
  if (settings.detail_enabled) {
        stats_prefix_record_set(key);
    }
  if (settings.managed) {
        int bucket = c->bucket;
        if (bucket == -1) {
            out_string(c, "CLIENT_ERROR no BG data in managed mode");
            return;
        }
        c->bucket = -1;
        if (buckets[bucket] != c->gen) {
            out_string(c, "ERROR_NOT_OWNER");
            return;
        }
    }
  it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
  if (it == 0) {
        if (! item_size_ok(nkey, flags, vlen + 2))
            out_string(c, "SERVER_ERROR object too large for cache");
        else
            out_string(c, "SERVER_ERROR out of memory");
        /* swallow the data line */
        c->write_and_go = conn_swallow;
        c->sbytes = vlen + 2;
        return;
    }
    if(handle_cas)
      it->cas_id = req_cas_id;
  c->item = it;
    c->ritem = ITEM_data(it);
    c->rlbytes = it->nbytes;
    c->item_comm = comm;
    conn_set_state(c, conn_nread);
}
可以看到命令格式就是
cmdname key flags exptime vlen
    key = tokens[KEY_TOKEN].value;
    nkey = tokens[KEY_TOKEN].length;
  flags = strtoul(tokens[2].value, NULL, 10);
    exptime = strtol(tokens[3].value, NULL, 10);
    vlen = strtol(tokens[4].value, NULL, 10);
  
it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
为item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) {
    uint8_t nsuffix;
    item *it;
    char suffix[40];
    size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
  unsigned int id = slabs_clsid(ntotal);
    if (id == 0)
        return 0;
  it = slabs_alloc(ntotal);
    if (it == 0) {
        int tries = 50;
        item *search;
  /* If requested to not push old items out of cache when memory runs out,
         * we're out of luck at this point...
         */
  if (settings.evict_to_free == 0) return NULL;
  /*
         * try to get one off the right LRU
         * don't necessariuly unlink the tail because it may be locked: refcount>0
         * search up from tail an item with refcount==0 and unlink it; give up after 50
         * tries
         */
  if (id > LARGEST_ID) return NULL;
        if (tails[id] == 0) return NULL;
  for (search = tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
            if (search->refcount == 0) {
               if (search->exptime == 0 || search->exptime > current_time) {
                       STATS_LOCK();
                       stats.evictions++;
                       STATS_UNLOCK();
                }
                do_item_unlink(search);
                break;
            }
        }
        it = slabs_alloc(ntotal);
        if (it == 0) return NULL;
    }
  assert(it->slabs_clsid == 0);
  it->slabs_clsid = id;
  assert(it != heads[it->slabs_clsid]);
  it->next = it->prev = it->h_next = 0;
    it->refcount = 1;     /* the caller will have a reference */
    DEBUG_REFCNT(it, '*');
    it->it_flags = 0;
    it->nkey = nkey;
    it->nbytes = nbytes;
    strcpy(ITEM_key(it), key);
    it->exptime = exptime;
    memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
    it->nsuffix = nsuffix;
    return it;
}
static size_t item_make_header(const uint8_t nkey, const int flags, const int nbytes,
                     char *suffix, uint8_t *nsuffix) {
    /* suffix is defined at 40 chars elsewhere.. */
    *nsuffix = (uint8_t) snprintf(suffix, 40, " %d %d\r\n", flags, nbytes - 2);
    return sizeof(item) + nkey + *nsuffix + nbytes;
}
  选择一个合适的slabs
unsigned int slabs_clsid(const size_t size) {
    int res = POWER_SMALLEST;
  if (size == 0)
        return 0;
    while (size > slabclass[res].size)
        if (res++ == power_largest)     /* won't fit in the biggest slab */
            return 0;
    return res;
}
  it = slabs_alloc(ntotal);
void *do_slabs_alloc(const size_t size) {
    slabclass_t *p;
  unsigned int id = slabs_clsid(size);
    if (id < POWER_SMALLEST || id > power_largest)
        return NULL;
  p = &slabclass[id];
    assert(p->sl_curr == 0 || ((item *)p->slots[p->sl_curr - 1])->slabs_clsid == 0);
  #ifdef USE_SYSTEM_MALLOC
    if (mem_limit && mem_malloced + size > mem_limit)
        return 0;
    mem_malloced += size;
    return malloc(size);
#endif
  /* fail unless we have space at the end of a recently allocated page,
       we have something on our freelist, or we could allocate a new page */
    if (! (p->end_page_ptr != 0 || p->sl_curr != 0 || do_slabs_newslab(id) != 0))
        return 0;
  /* return off our freelist, if we have one */
    if (p->sl_curr != 0)
        return p->slots[--p->sl_curr];
  /* if we recently allocated a whole page, return from that */
    if (p->end_page_ptr) {
        void *ptr = p->end_page_ptr;
        if (--p->end_page_free != 0) {
            (char*)(p->end_page_ptr) += p->size;
        } else {
            p->end_page_ptr = 0;
        }
        return ptr;
    }
  return NULL;  /* shouldn't ever get here */
}
  
static slabclass_t slabclass[POWER_LARGEST + 1];
#define POWER_SMALLEST 1
#define POWER_LARGEST  200
#define POWER_BLOCK 1048576
#define CHUNK_ALIGN_BYTES (sizeof(void *))
  typedef struct {
    unsigned int size;      /* sizes of items */
    unsigned int perslab;   /* how many items per slab *//
  void **slots;           /* list of item ptrs */
    unsigned int sl_total;  /* size of previous array */
    unsigned int sl_curr;   /* first free slot */
  void *end_page_ptr;         /* pointer to next free item at end of page, or 0 */
    unsigned int end_page_free; /* number of items remaining at end of last alloced page */
  unsigned int slabs;     /* how many slabs were allocated for this class */
  void **slab_list;       /* array of slab pointers */
    unsigned int list_size; /* size of prev array */
  unsigned int killing;  /* index+1 of dying slab, or zero if none */
} slabclass_t;
  跟踪代码可知p此时
-  p 0x0046ebac {size=88 perslab=11915 slots=0x02231e70 ...} slabclass_t *
  size 88 unsigned int
  perslab 11915 unsigned int
  slots 0x02231e70 void * *
  sl_total 16 unsigned int
  sl_curr 1 unsigned int
  end_page_ptr 0x022f00d0 void *
  end_page_free 11913 unsigned int
  slabs 1 unsigned int
  slab_list 0x02231e18 void * *
  list_size 16 unsigned int
  killing 0 unsigned int
  static int do_slabs_newslab(const unsigned int id) {
    slabclass_t *p = &slabclass[id];
#ifdef ALLOW_SLABS_REASSIGN
    int len = POWER_BLOCK;
#else
    int len = p->size * p->perslab;
#endif
    char *ptr;
  if (mem_limit && mem_malloced + len > mem_limit && p->slabs > 0)
        return 0;
  if (grow_slab_list(id) == 0) return 0;
  ptr = malloc((size_t)len);
    if (ptr == 0) return 0;
  memset(ptr, 0, (size_t)len);
    p->end_page_ptr = ptr;
    p->end_page_free = p->perslab;
  p->slab_list[p->slabs++] = ptr;
    mem_malloced += len;
    return 1;
}
  static int grow_slab_list (const unsigned int id) {
    slabclass_t *p = &slabclass[id];
    if (p->slabs == p->list_size) {
        size_t new_size =  (p->list_size != 0) ? p->list_size * 2 : 16;
        void *new_list = realloc(p->slab_list, new_size * sizeof(void *));
        if (new_list == 0) return 0;
        p->list_size = new_size;
        p->slab_list = new_list;
    }
    return 1;
}
  自此完成一个item的内存分配
item的结构如下:
typedef struct _stritem {
    struct _stritem *next;
    struct _stritem *prev;
    struct _stritem *h_next;    /* hash chain next */
    rel_time_t      time;       /* least recent access */
    rel_time_t      exptime;    /* expire time */
    int             nbytes;     /* size of data */
    unsigned short  refcount;
    uint8_t         nsuffix;    /* length of flags-and-length string */
    uint8_t         it_flags;   /* ITEM_* above */
    uint8_t         slabs_clsid;/* which slab class we're in */
    uint8_t         nkey;       /* key length, w/terminating null and padding */
    uint64_t        cas_id;     /* the CAS identifier */
    void * end[];
    /* then null-terminated key */
    /* then " flags length\r\n" (no terminating null) */
    /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
  strcpy(ITEM_key(it), key);
#define ITEM_key(item) ((char*)&((item)->end[0]))
  memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1)
  
  
   c->item = it;
    c->ritem = ITEM_data(it);
#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 + (item)->nsuffix)
    c->rlbytes = it->nbytes;
  
static void complete_nread(conn *c) {
    item *it;
    int comm;
    int ret;
    assert(c != NULL);
   
    comm = c->item_comm;
    it = c->item;
  STATS_LOCK();
    stats.set_cmds++;
    STATS_UNLOCK();
  if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
        out_string(c, "CLIENT_ERROR bad data chunk");
    } else {
      ret = store_item(it, comm);
      if (ret == 1)
          out_string(c, "STORED");
      else if(ret == 2)
          out_string(c, "EXISTS");
      else if(ret == 3)
          out_string(c, "NOT_FOUND");
      else
          out_string(c, "NOT_STORED");
    }
  item_remove(c->item);       /* release the c->item reference */
    c->item = 0;
}
  ret = store_item(it, comm);
int do_store_item(item *it, int comm) {
    char *key = ITEM_key(it);
    bool delete_locked = false;
    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
    int stored = 0;
  item *new_it = NULL;
    int flags;
  if (old_it != NULL && comm == NREAD_ADD) {
        /* add only adds a nonexistent item, but promote to head of LRU */
        do_item_update(old_it);
    } else if (!old_it && (comm == NREAD_REPLACE
        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
    {
        /* replace only replaces an existing value; don't store */
    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
    {
        /* replace and add can't override delete locks; don't store */
    } else if (comm == NREAD_CAS) {
        /* validate cas operation */
        if (delete_locked)
            old_it = do_item_get_nocheck(key, it->nkey);
  if(old_it == NULL) {
          // LRU expired
          stored = 3;
        }
        else if(it->cas_id == old_it->cas_id) {
          // cas validates
          do_item_replace(old_it, it);
          stored = 1;
        }
        else
        {
          stored = 2;
        }
    } else {
        /*
         * Append - combine new and old record into single one. Here it's
         * atomic and thread-safe.
         */
  if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
  /* we have it and old_it here - alloc memory to hold both */
            /* flags was already lost - so recover them from ITEM_suffix(it) */
  flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
  new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
  if (new_it == NULL) {
                /* SERVER_ERROR out of memory */
                return 0;
            }
  /* copy data from it and old_it to new_it */
  if (comm == NREAD_APPEND) {
                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
            } else {
                /* NREAD_PREPEND */
                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
            }
  it = new_it;
        }
  /* "set" commands can override the delete lock
           window... in which case we have to find the old hidden item
           that's in the namespace/LRU but wasn't returned by
           item_get.... because we need to replace it */
        if (delete_locked)
            old_it = do_item_get_nocheck(key, it->nkey);
  if (old_it != NULL)
            do_item_replace(old_it, it);
        else
            do_item_link(it);
  stored = 1;
    }
  if (old_it != NULL)
        do_item_remove(old_it);         /* release our reference */
    if (new_it != NULL)
        do_item_remove(new_it);
  return stored;
}
  
if (old_it != NULL)
            do_item_replace(old_it, it);
  int do_item_replace(item *it, item *new_it) {
    assert((it->it_flags & ITEM_SLABBED) == 0);
  do_item_unlink(it);
    return do_item_link(new_it);
}
  
int do_item_link(item *it) {
    assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
    assert(it->nbytes < (1024 * 1024));  /* 1MB max size */
    it->it_flags |= ITEM_LINKED;
    it->time = current_time;
    assoc_insert(it);
  STATS_LOCK();
    stats.curr_bytes += ITEM_ntotal(it);
    stats.curr_items += 1;
    stats.total_items += 1;
    STATS_UNLOCK();
  /* Allocate a new CAS ID on link. */
    it->cas_id = get_cas_id();
  item_link_q(it);
  return 1;
}
  
/* Note: this isn't an assoc_update.  The key must not already exist to call this */
  int assoc_insert(item *it) {
    uint32_t hv;
    unsigned int oldbucket;
  assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */
  hv = hash(ITEM_key(it), it->nkey, 0);
    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        it->h_next = old_hashtable[oldbucket];
        old_hashtable[oldbucket] = it;
    } else {
        it->h_next = primary_hashtable[hv & hashmask(hashpower)];
        primary_hashtable[hv & hashmask(hashpower)] = it;
    }
  hash_items++;
    if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
        assoc_expand();
    }
  return 1;
}
  看到这个函数可以看出内部的hash表采用开链存储法。
  当hash_items > 桶的个数的1.5倍的时候,就扩张hash表。
  /* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
    old_hashtable = primary_hashtable;
  primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
    if (primary_hashtable) {
        if (settings.verbose > 1)
            fprintf(stderr, "Hash table expansion starting\n");
        hashpower++;
        expanding = true;
        expand_bucket = 0;
        do_assoc_move_next_bucket();
    } else {
        primary_hashtable = old_hashtable;
        /* Bad news, but we can keep running. */
    }
}
  /* migrates the next bucket to the primary hashtable if we're expanding. */
void do_assoc_move_next_bucket(void) {
    item *it, *next;
    int bucket;
  if (expanding) {
        for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
            next = it->h_next;
  bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
            it->h_next = primary_hashtable[bucket];
            primary_hashtable[bucket] = it;
        }
  old_hashtable[expand_bucket] = NULL;
  expand_bucket++;
        if (expand_bucket == hashsize(hashpower - 1)) {
            expanding = false;
            free(old_hashtable);
            if (settings.verbose > 1)
                fprintf(stderr, "Hash table expansion done\n");
        }
    }
}
  
  static void item_link_q(item *it) { /* item is the new head */
    item **head, **tail;
    /* always true, warns: assert(it->slabs_clsid <= LARGEST_ID); */
    assert((it->it_flags & ITEM_SLABBED) == 0);
  head = &heads[it->slabs_clsid];
    tail = &tails[it->slabs_clsid];
    assert(it != *head);
    assert((*head && *tail) || (*head == 0 && *tail == 0));
    it->prev = 0;
    it->next = *head;
    if (it->next) it->next->prev = it;
    *head = it;
    if (*tail == 0) *tail = it;
    sizes[it->slabs_clsid]++;
    return;
}
  放到如下的LRU列表的头位置
static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];
static unsigned int sizes[LARGEST_ID];

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-108315-1-1.html 上篇帖子: memcached server LRU 深入分析[转载] 下篇帖子: Memcached探索(系列三)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表