设为首页 收藏本站
查看: 1006|回复: 0

[经验分享] memcached源码阅读----使用libevent和多线程模型

[复制链接]

尚未签到

发表于 2015-11-18 13:09:04 | 显示全部楼层 |阅读模式
  本篇文章主要是我今天阅读memcached源码关于进程启动,在网络这块做了哪些事情。
  


  一、libevent的使用
  首先我们知道,memcached是使用了iblievet作为网络框架的,而iblievet又是单线程模型的基于linux下epoll事件的异步模型。因此,其基本的思想就是 对可读,可写,超时,出错等事件进行绑定函数,等有其事件发生,对其绑定函数回调。
  
  可以减掉了解一下 libevent基本api调用
  

struct event_base *base;
base = event_base_new();//初始化libevent
  
  

event_base_new对比epoll,可以理解为epoll里的epoll_create。


event_base内部有一个循环,循环阻塞在epoll调用上,当有一个事件发生的时候,才会去处理这个事件。其中,这个事件是被绑定在event_base上面的,每一个事件就会对应一个struct event,可以是监听的fd。


其中struct event 使用event_new 来创建和绑定,使用event_add来启用,例如:




struct event *listener_event;
listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);


  
  
  

参数说明:


base:event_base类型,event_base_new的返回值


listener:监听的fd,listen的fd


EV_READ|EV_PERSIST事件的类型及属性


do_accept:绑定的回调函数


(void*)base:给回调函数的参数


event_add(listener_event, NULL);


对比epoll:


event_new相当于epoll中的epoll_wait,其中的epoll里的while循环,在libevent里使用event_base_dispatch。


event_add相当于epoll中的epoll_ctl,参数是EPOLL_CTL_ADD,添加事件。


注:libevent支持的事件及属性包括(使用bitfield实现,所以要用 | 来让它们合体)

EV_TIMEOUT: 超时

EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发

EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发

EV_SIGNAL: POSIX信号量

EV_PERSIST: 不指定这个属性的话,回调函数被触发后事件会被删除

EV_ET: Edge-Trigger边缘触发,相当于EPOLL的ET模式


事件创建添加之后,就可以处理发生的事件了,相当于epoll里的epoll_wait,在libevent里使用event_base_dispatch启动event_base循环,直到不再有需要关注的事件。



  
  

有了上面的分析,结合之前做的epoll服务端程序,对于一个服务器程序,流程基本是这样的:


1. 创建socketbindlisten,设置为非阻塞模式


2. 创建一个event_base,即


[cpp] view
plaincopy



  • struct event_base *  event_base_new(void)  


3. 创建一个event,将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及需要给它的参数)


[cpp] view
plaincopy



  • struct event *  event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)  


4. 启用该事件,即


[cpp] view
plaincopy



  • int  event_add(struct event *ev, const struct timeval *tv)  


5.  进入事件循环,即


[cpp] view
plaincopy



  • int  event_base_dispatch(struct event_base *event_base)



  






有了上边的基础东西,可以进入memcached的阅读了。


二、memcached源码分析


main函数启动,首先会初始化很多数据,这里我们只涉及大网络这块,其他以后分析,先忽略。
1.首先初始化 主工作线程的的iblievet对象
  /* initialize main thread libevent instance */
main_base = event_init();

最后会调用
   /* enter the event loop */
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;
}

在该对象内部循环。不退出。


2.初始化连接的对象


static void conn_init(void) {
freetotal = 200;
freecurr = 0;
if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
fprintf(stderr, "Failed to allocate connection structures\n");
}
return;
}



这里是先预先分配200个conn*的内存。等有连接上来,会从freeconns  取。
如下代码:
/*
* Returns a connection from the freelist, if any.
*/
conn *conn_from_freelist() {
conn *c;
pthread_mutex_lock(&conn_lock);
if (freecurr > 0) {
c = freeconns[--freecurr];
} else {
c = NULL;
}
pthread_mutex_unlock(&conn_lock);
return c;
}




3.那么conn的结构体内部长什么样子呢?
typedef struct conn conn;
struct conn {
int    sfd;
sasl_conn_t *sasl_conn;
enum conn_states  state;
enum bin_substates substate;
struct event event;
short  ev_flags;
short  which;   /** which events were just triggered */
char   *rbuf;   /** buffer to read commands into */
char   *rcurr;  /** but if we parsed some already, this is where we stopped */
int    rsize;   /** total allocated size of rbuf */
int    rbytes;  /** how much data, starting from rcur, do we have unparsed */
char   *wbuf;
char   *wcurr;
int    wsize;
int    wbytes;
/** which state to go into after finishing current write */
enum conn_states  write_and_go;
void   *write_and_free; /** free this memory after finishing writing */
char   *ritem;  /** when we read in an item's value, it goes here */
int    rlbytes;
/* data for the nread state */
/**
* item is used to hold an item structure created after reading the command
* line of set/add/replace commands, but before we finished reading the actual
* data. The data is read into ITEM_data(item) to avoid extra copying.
*/
void   *item;     /* for commands set/add/replace  */
/* data for the swallow state */
int    sbytes;    /* how many bytes to swallow */
/* data for the mwrite state */
struct iovec *iov;
int    iovsize;   /* number of elements allocated in iov[] */
int    iovused;   /* number of elements used in iov[] */
struct msghdr *msglist;
int    msgsize;   /* number of elements allocated in msglist[] */
int    msgused;   /* number of elements used in msglist[] */
int    msgcurr;   /* element in msglist[] being transmitted now */
int    msgbytes;  /* number of bytes in current msg */
item   **ilist;   /* list of items to write out */
int    isize;
item   **icurr;
int    ileft;
char   **suffixlist;
int    suffixsize;
char   **suffixcurr;
int    suffixleft;
enum protocol protocol;   /* which protocol this con<pre name=&quot;code&quot; class=&quot;cpp&quot;>  if (sigignore(SIGPIPE) == -1) {
perror(&quot;failed to ignore SIGPIPE; sigaction&quot;);
exit(EX_OSERR);
}

nection speaks */ enum network_transport transport; /* what transport is used by this connection */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP &quot;connection&quot; */ struct sockaddr request_addr; /* Who sent the most recent
request */ socklen_t request_addr_size; unsigned char *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers' worth of space is allocated */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ struct { char *buffer;
size_t size; size_t offset; } stats; /* Binary protocol stuff */ /* This is where the binary header goes */ protocol_binary_request_header binary_header; uint64_t cas; /* the cas to return */ short cmd; /* current command being processed */ int opaque; int
keylen; conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */};

这里的所有字段就是在处理数据需要用到的。这里不详细描述。以后会慢慢分解。

因为是memcached是多线程模型,因此在从freeconn取出一个对象的时候,是要加解锁使用。
忽略SIGIPIE信号,防止rst时的程序退出
  if (sigignore(SIGPIPE) == -1) {
perror(&quot;failed to ignore SIGPIPE; sigaction&quot;);
exit(EX_OSERR);
}

初始化多线程模型,并且每个线程一个iblievent的事件模型就是调用event_init函数。
/* start up worker threads if MT mode */
thread_init(settings.num_threads, main_base);

内部实现不详细。主要是调用pthread_create函数。


4、然后开始通过端口号启动网络监听事件



代码如下:


   if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror(&quot;failed to listen on TCP port %d&quot;, settings.port);
exit(EX_OSERR);
}


然后调用下面的函数:


static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file)

因为,一个主机可能会有多个网卡,比如双线机房,联通或者电信,因此内部实现会出现以下代码:




for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) {
/* getaddrinfo can return &quot;junk&quot; addresses,
* we make sure at least one works before erroring.
*/
if (errno == EMFILE) {
/* ...unless we're out of fds */
perror(&quot;server_socket&quot;);
exit(EX_OSERR);
}
continue;
}


而static int new_socket(struct addrinfo *ai)

该函数就是调用socket函数,设置为非阻塞。




5、然后生成一个监听的conn对象


代码如下
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, &quot;failed to create listening connection\n&quot;);
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;

static conn *listen_conn = NULL;作为全局的静态的变量。无头结点的单链表


我们继续深入conn_new 函数内部


conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c = conn_from_freelist();



该函数主要是做了哪些动作呢?


第一,从刚才的free_cnn_list取出一个conn* 来,然互分配内存,根据相关配置信息,进行相关的字段初始化工作。




第二,加入到iblievent事件库中
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
if (conn_add_to_freelist(c)) {
conn_free(c);
}
perror(&quot;event_add&quot;);
return NULL;
}

这一步就是,讲sfd上的事件绑定event_handler 函数,就是当有该连接上来的时候有数据进行可读的时候绑定,回调。


7、状态机的解读


<span style=&quot;font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);&quot;>最终event_handler函数会调用</span>
static void drive_machine(conn *c)函数。那么这个函数做了哪些工作呢?


当然是等待连接了,那就是accept函数了。
因此,入股市conn_listening状态,


  while (!stop) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1)



当然同样是 讲sfd设置成非阻塞的。


这个时候是有数据上来了。


因此就要设置读命令状态了,调用以下函数:


<pre name=&quot;code&quot; class=&quot;cpp&quot;>/*
* Dispatches a new connection to another thread. This is only ever called
* from the main thread, either during initialization (for UDP) or because
* of an incoming connection.
*/
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
char buf[1];
int tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror(&quot;Writing to thread notify pipe&quot;);
}
}




通过注释可以知道,该函数是讲一个新连接分配各其他线程,
通过代码我们可以看出
首先,分配一个item块,讲连接的socket的fd 赋&#20540;给item,同时有当前状态,标志位,读buff大小等,然后分配一个线程,讲item推送到该thread的处理队列里了。
然互,通过往管道里写入C字符,通知到管道的另一端,进行处理该操作符的事件。因此,完成了对对该连接的 分配工作。
那么我接下来看一看 线程是如果处理的。
在初始化线程的时候,已经把管道的两个操作符放入到了iblievent里了。如下代码:
  /* Listen for notifications from other threads */
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
fprintf(stderr, &quot;Can't monitor libevent notify pipe\n&quot;);
exit(1);
}





绑定了回调函数:
static void thread_libevent_process(int fd, short which, void *arg)



当读到字符'c'的时候,就从其中队列中取出一个item*,掉用一下函数




conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base)

同样,调用
conn *c = conn_from_freelist();

取出一个conn* ,然后进行初始化,这个时候和上文讲到的一样了,知识状态不同了,
因此这里使用了一个状态机的模式了。


有如下状态:


enum conn_states {
conn_listening,  /**< the socket which listens for connections */
conn_new_cmd,    /**< Prepare connection for next command */
conn_waiting,    /**< waiting for a readable socket */
conn_read,       /**< reading in a command line */
conn_parse_cmd,  /**< try to parse a command from the input buffer */
conn_write,      /**< writing out a simple response */
conn_nread,      /**< reading in a fixed number of bytes */
conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
conn_closing,    /**< closing this connection */
conn_mwrite,     /**< writing out many items sequentially */
conn_max_state   /**< Max state value (used for assertion) */
};

也就是
static void drive_machine(conn *c)的核心逻辑了。通过设置状态,然后调用不同的代码,


因此在一个状态结束之后,总是会看大如下代码调用:


/*
* Sets a connection's current state in the state machine. Any special
* processing that needs to happen on certain state transitions can
* happen here.
*/
static void conn_set_state(conn *c, enum conn_states state) {
assert(c != NULL);
assert(state >= conn_listening && state < conn_max_state);
if (state != c->state) {
if (settings.verbose > 2) {
fprintf(stderr, &quot;%d: going from %s to %s\n&quot;,
c->sfd, state_text(c->state),
state_text(state));
}
if (state == conn_write || state == conn_mwrite) {
MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
}
c->state = state;
}
}







到此,网络框架部分已经基本处理完成。起始这个框架是非常简单而且实用的。
redis也是基本的思想模型,只不过是单线程的,而memcached是多线程的模型。在开发模式上可以有效的借鉴。




该文章为原创文章,更多文章,欢迎访问 http://blog.iyunv.com/wallwind
























版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-140729-1-1.html 上篇帖子: 通过log4j关闭memcached的日志 下篇帖子: memcached 常用命令及使用说明
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表