设为首页 收藏本站
查看: 1174|回复: 0

[经验分享] Memcached源码阅读之资源初始化

[复制链接]

尚未签到

发表于 2015-11-18 15:39:39 | 显示全部楼层 |阅读模式
  Memcached内部有hash表,各种统计信息,工作线程,网络,连接,内存结构等,在memcached启动时(执行main函数),会对这些资源进行初始化的,网络和内存的初始化操作放到后续分析,这次分析hash表,统计信息,工作线程,网络连接的初始化过程。
  1 hash表的初始化


  

//hash表的初始化,传入的参数是启动时传入的
assoc_init(settings.hashpower_init);
//hashsize的实现
#define hashsize(n) ((ub4)1<<(n))
//主hash表结构定义,在hash表扩容时,会有次hash表,所以有主次hash表区分,该结构是指针的指针,也即相当于数组指针
static item** primary_hashtable = 0;
void assoc_init(const int hashtable_init) {
if (hashtable_init) {//如果设置了初始化参数,则按设置的参数进行初始化
hashpower = hashtable_init;
}
    //hashpower的默认值为16,如果未设置新值,则按默认值进行初始化
    primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
if (! primary_hashtable) {
fprintf(stderr, &quot;Failed to init hashtable.\n&quot;);
exit(EXIT_FAILURE);
}
STATS_LOCK();//全局统计信息加锁,保证数据同步
stats.hash_power_level = hashpower;
stats.hash_bytes = hashsize(hashpower) * sizeof(void *);
STATS_UNLOCK();
}
  2 统计信息的初始化
  Memcached内部有很多全局的统计信息,用于实时获取各个资源的使用情况,后面将会看到,所有对统计信息的更新都需要加锁,而这些信息的更新是和Memcached的操作次数同数量级的,所以,在一定程度来说,这些统计信息对性能有影响。
  stats结构是对统计信息的一个抽象,各个字段都比较好理解,不做解释。


  

struct stats {
pthread_mutex_t mutex;
unsigned int  curr_items;
unsigned int  total_items;
uint64_t      curr_bytes;
unsigned int  curr_conns;
unsigned int  total_conns;
uint64_t      rejected_conns;
unsigned int  reserved_fds;
unsigned int  conn_structs;
uint64_t      get_cmds;
uint64_t      set_cmds;
uint64_t      touch_cmds;
uint64_t      get_hits;
uint64_t      get_misses;
uint64_t      touch_hits;
uint64_t      touch_misses;
uint64_t      evictions;
uint64_t      reclaimed;
time_t        started;          /* when the process was started */
bool          accepting_conns;  /* whether we are currently accepting */
uint64_t      listen_disabled_num;
unsigned int  hash_power_level; /* Better hope it's not over 9000 */
uint64_t      hash_bytes;       /* size used for hash tables */
bool          hash_is_expanding; /* If the hash table is being expanded */
uint64_t      expired_unfetched; /* items reclaimed but never touched */
uint64_t      evicted_unfetched; /* items evicted but never touched */
bool          slab_reassign_running; /* slab reassign in progress */
uint64_t      slabs_moved;       /* times slabs were moved around */
};

统计信息的初始化也就是对stats变量的一个初始化。  
  

//全局对象的定义
struct stats stats;
//全局变量的初始化,该全局变量在memcached启动之后,一直使用
static void stats_init(void)
{
stats.curr_items = stats.total_items = stats.curr_conns =
stats.total_conns = stats.conn_structs = 0;
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses =
stats.evictions = stats.reclaimed = 0;
stats.touch_cmds = stats.touch_misses = stats.touch_hits =
stats.rejected_conns = 0;
stats.curr_bytes = stats.listen_disabled_num = 0;
stats.hash_power_level = stats.hash_bytes = stats.hash_is_expanding = 0;
stats.expired_unfetched = stats.evicted_unfetched = 0;
stats.slabs_moved = 0;
stats.accepting_conns = true; /* assuming we start in this state. */
stats.slab_reassign_running = false;
/* make the time we started always be 2 seconds before we really
did, so time(0) - time.started is never zero.  if so, things
like 'settings.oldest_live' which act as booleans as well as
values are now false in boolean context... */
process_started = time(0) - 2;
stats_prefix_init();
}
  
  3 工作线程的初始化
  Memcached采用了典型的Master-Worker的线程模式,Master就是由main线程来充当,而Worker线程则是通过Pthread创建的。
  

//传入线程个数和libevent的main_base实例
thread_init(settings.num_threads, main_base);//工作线程初始化
void thread_init(int nthreads, struct event_base *main_base) {
int         i;
int         power;
    //初始化各种锁和条件变量
pthread_mutex_init(&cache_lock, NULL);
pthread_mutex_init(&stats_lock, NULL);
pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);
pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;
    //Memcached对hash桶的锁采用分段锁,按线程个数来分段,默认总共是1<<16个hash桶,而锁的数目是1<<power个
    /* Want a wide lock table, but don't waste memory */
if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (nthreads < 5) {
power = 12;
} else {
/* 8192 buckets, and central locks don't scale much past 5 threads */
power = 13;
}
item_lock_count = hashsize(power);
    //申请1<<power个pthread_mutex_t锁,保存在item_locks数组。
item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
if (! item_locks) {
perror(&quot;Can't allocate item locks&quot;);
exit(1);
}
    //对这些锁进行初始化,这部分可参考APUE的线程部分
    for (i = 0; i < item_lock_count; i++) {
pthread_mutex_init(&item_locks, NULL);
}
    /*创建线程的局部变量,该局部变量的名称为item_lock_type_key,用于保存主hash表所持有的锁的类型
    主hash表在进行扩容时,该锁类型会变为全局的锁,否则(不在扩容过程中),则是局部锁*/
    pthread_key_create(&item_lock_type_key, NULL);
pthread_mutex_init(&item_global_lock, NULL);
    //申请nthreds个工作线程,LIBEVENT_THREAD是Memcached内部对工作线程的一个封装
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
perror(&quot;Can't allocate thread descriptors&quot;);
exit(1);
}
    /*分发线程的初始化,分发线程的base为main_base
    线程id为main线程的线程id*/
    dispatcher_thread.base = main_base;
dispatcher_thread.thread_id = pthread_self();
//工作线程的初始化,工作线程和主线程(main线程)是通过pipe管道进行通信的
for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {//初始化pipe管道
perror(&quot;Can't create notify pipe&quot;);
exit(1);
}
threads.notify_receive_fd = fds[0];//读管道绑定到工作线程的接收消息的描述符
threads.notify_send_fd = fds[1];//写管道绑定到工作线程的发送消息的描述符
setup_thread(&threads);//添加工作线程到libevent中
/* Reserve three fds for the libevent base, and two for the pipe */
stats.reserved_fds += 5;//统计信息更新
}
//创建工作线程
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads);
}
//等待所有工作线程创建完毕
pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}
//Memcached内部工作线程的封装
typedef struct {
pthread_t thread_id;       //线程ID
struct event_base *base;   //libevent的不是线程安全的,每个工作线程持有一个libevent实例,用于pipe管道通信和socket通信
struct event notify_event; //用于监听pipe管道的libevent事件
int notify_receive_fd;      //接收pipe管道消息描述符
int notify_send_fd;         //发送pipe管道消息描述符
struct thread_stats stats;  //每个线程对应的统计信息
struct conn_queue *new_conn_queue; //每个线程都有一个工作队列,主线程接受的连接,挂载到该消息队列中
cache_t *suffix_cache;      //后缀cache
uint8_t item_lock_type;     //线程操作hash表持有的锁类型,有局部锁和全局锁
} LIBEVENT_THREAD;
//分发线程的封装
typedef struct {
    pthread_t thread_id;        //线程id
    struct event_base *base;    //libevent实例
} LIBEVENT_DISPATCHER_THREAD;
//工作线程绑定到libevent实例
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();//创建libevent实例
    if (! me->base) {
        fprintf(stderr, &quot;Can't allocate event base\n&quot;);
        exit(1);
    }
    //创建管道读的libevent事件,事件的回调函数处理具体的业务信息,关于回调函数的处理,后续分析
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);//设置libevent实例
    //添加事件到libevent中
    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, &quot;Can't monitor libevent notify pipe\n&quot;);
        exit(1);
    }
    //创建消息队列,用于接受主线程连接
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror(&quot;Failed to allocate memory for connection queue&quot;);
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue);//消息队列初始化
    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
        perror(&quot;Failed to initialize mutex&quot;);
        exit(EXIT_FAILURE);
    }
    //创建线程的后缀cache,没搞懂这个cache有什么作用。
    me->suffix_cache = cache_create(&quot;suffix&quot;, SUFFIX_SIZE, sizeof(char*),
                                    NULL, NULL);
    if (me->suffix_cache == NULL) {
        fprintf(stderr, &quot;Failed to create suffix cache\n&quot;);
        exit(EXIT_FAILURE);
    }
}
//创建工作线程
static void create_worker(void *(*func)(void *), void *arg) {
    pthread_t       thread;
    pthread_attr_t  attr;
    int             ret;
    pthread_attr_init(&attr);//Posix线程部分,线程属性初始化
    //通过pthread_create创建线程,线程处理函数是通过外部传入的处理函数为worker_libevent
    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
        fprintf(stderr, &quot;Can't create thread: %s\n&quot;,
                strerror(ret));
        exit(1);
    }
}
//线程处理函数
static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;
    //默认的hash表的锁为局部锁
    me->item_lock_type = ITEM_LOCK_GRANULAR;
    pthread_setspecific(item_lock_type_key, &me->item_lock_type);//设定线程的属性
    //用于控制工作线程初始化,通过条件变量来控制
    register_thread_initialized();
    //工作线程的libevent实例启动
    event_base_loop(me->base, 0);
    return NULL;
}
//阻塞工作线程
static void wait_for_thread_registration(int nthreads) {
    while (init_count < nthreads) {
        pthread_cond_wait(&init_cond, &init_lock);//在条件变量init_cond上面阻塞,阻塞个数为nthreads-init_count
    }
}
//唤醒工作线程
static void register_thread_initialized(void) {
    pthread_mutex_lock(&init_lock);
    init_count++;
    pthread_cond_signal(&init_cond);
    pthread_mutex_unlock(&init_lock);
}
//每个线程持有的统计信息
struct thread_stats {
    pthread_mutex_t   mutex;
    uint64_t          get_cmds;
    uint64_t          get_misses;
    uint64_t          touch_cmds;
    uint64_t          touch_misses;
    uint64_t          delete_misses;
    uint64_t          incr_misses;
    uint64_t          decr_misses;
    uint64_t          cas_misses;
    uint64_t          bytes_read;
    uint64_t          bytes_written;
    uint64_t          flush_cmds;
    uint64_t          conn_yields; /* # of yields for connections (-R option)*/
    uint64_t          auth_cmds;
    uint64_t          auth_errors;
    struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
};
//每个slab的统计信息
struct slab_stats {
    uint64_t  set_cmds;
    uint64_t  get_hits;
    uint64_t  touch_hits;
    uint64_t  delete_hits;
    uint64_t  cas_hits;
    uint64_t  cas_badval;
    uint64_t  incr_hits;
    uint64_t  decr_hits;
};

  
  4 连接的初始化
  

static conn **freeconns;//空闲连接列表
//连接初始化
static void conn_init(void)
{
freetotal = 200;//空闲连接总数
freecurr = 0;//当前空闲的索引
        //申请200个空间
        if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL)
{
fprintf(stderr, &quot;Failed to allocate connection structures\n&quot;);
}
return;
}
  

版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-140832-1-1.html 上篇帖子: NoSQL的存储架构——键值存储Memcached和Redis 下篇帖子: Memcached源码分析之内存池
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表