xiaowei8782088 发表于 2015-9-1 11:59:38

memcached源代码阅读(2)-main函数

  Memcached源代码阅读笔记
采用单步跟踪的方式对源代码进行阅读
  调试参数 start
  
if(WSAStartup(MAKEWORD(2,0), &wsaData) != 0) {
      fprintf(stderr, "Socket Initialization Error. Programaborted\n");
      return;
    }
  
  /* init settings */
    settings_init();
  初始化设置,这里主要是设置一些默认的启动参数
  static void settings_init(void) {
    settings.access=0700;
    settings.port = 11211;
    settings.udpport = 0;
    settings.interf.s_addr = htonl(INADDR_ANY);
    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
    settings.verbose = 0;
    settings.oldest_live = 0;
    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
    settings.socketpath = NULL;       /* by default, not using a unix socket */
    settings.managed = false;
    settings.factor = 1.25;
    settings.chunk_size = 48;         /* space for a modest key and value */
#ifdef USE_THREADS
    settings.num_threads = 4;
#else
    settings.num_threads = 1;
#endif
    settings.prefix_delimiter = ':';
    settings.detail_enabled = 0;
}
  
  setbuf(stderr, NULL); //设置错误输出缓冲区为NULL,即发现错误立即显示
  
获取选项,非主要流程,暂时略过
  while ((c = getopt(argc, argv, "a:bp:s:U:m:Mc:khirvd:l:u:P:f:s:n:t:D:")) != -1) {
  。。。
  }
/* create the listening socket and bind it */
    if (settings.socketpath == NULL) {
      l_socket = server_socket(settings.port, 0);
      if (l_socket == -1) {
            fprintf(stderr, "failed to listen\n");
            exit(EXIT_FAILURE);
      }
    }
static int server_socket(const int port, const bool is_udp) {
    int sfd;
    struct linger ling = {0, 0};
    struct sockaddr_in addr;
    int flags =1;
  if ((sfd = new_socket(is_udp)) == -1) {
      return -1;
    }
  setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));//  SO_REUSEADDR BOOL 允许套接口和一个已在使用中的地址捆绑(参见bind())
    if (is_udp) {
      maximize_sndbuf(sfd);
    } else {
      setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));//SO_KEEPALIVE BOOL 发送“保持活动”包。
      setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));//不要因为数据未发送就阻塞关闭操作。设置本选项相当于将SO_LINGER的l_onoff元素置为零。
      setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));//禁止发送合并的Nagle算法。
    }
  /*
   * the memset call clears nonstandard fields in some impementations
   * that otherwise mess things up.
   */
    memset(&addr, 0, sizeof(addr));
  addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr = settings.interf;
    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {//绑定端口
      perror("bind()");
      close(sfd);
      return -1;
    }
    if (!is_udp && listen(sfd, 1024) == -1) {//监听端口
      perror("listen()");
      close(sfd);
      return -1;
    }
    return sfd;
}
static int new_socket(const bool is_udp) {
    int sfd;
    int flags;
  if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
      perror("socket()");
      return -1;
    }
  if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
      fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) //设置为非阻塞
{
      perror("setting O_NONBLOCK");
      close(sfd);
      return -1;
    }
    return sfd;
}
/*
* Sets a socket's send buffer size to the maximum allowed by the system.
*/
static void maximize_sndbuf(const int sfd) {
    socklen_t intsize = sizeof(int);
    int last_good = 0;
    int min, max, avg;
    char old_size;
  /* Start with the default size. */
    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {//获取缺省缓冲区大小
      if (settings.verbose > 0)
            perror("getsockopt(SO_SNDBUF)");
      return;
    }
  /* Binary-search for the real maximum. */
    min = old_size;
    max = MAX_SENDBUF_SIZE;//256M
  while (min <= max) {
      avg = ((unsigned int)(min + max)) / 2;
      if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {//设置发送缓冲区大小,多次设置,在min和max找到一个最大的可以使用的发送缓冲区
            last_good = avg;
            min = avg + 1;
      } else {
            max = avg - 1;
      }
    }
  if (settings.verbose > 1)
      fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
}
  好继续,
  main_base = event_init();//调用Libevent 初始化函数
   /* initialize other stuff */
    item_init();//Item 初始化
    stats_init();
    assoc_init();
    conn_init();
  static conn **freeconns;
static int freetotal;
static int freecurr;
  
static void conn_init(void) {
    freetotal = 200;
    freecurr = 0;
    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
      perror("malloc()");
    }
    return;
}
  /* Hacky suffix buffers. */
    suffix_init();
    slabs_init(settings.maxbytes, settings.factor);
  /**
* Determines the chunk sizes and initializes the slab class descriptors
* accordingly.
*/
  
void slabs_init(const size_t limit, const double factor) {
    int i = POWER_SMALLEST - 1;
    unsigned int size = sizeof(item) + settings.chunk_size;
  /* Factor of 2.0 means use the default memcached behavior */
    if (factor == 2.0 && size < 128)
      size = 128;
  mem_limit = limit;
    memset(slabclass, 0, sizeof(slabclass));
  while (++i < POWER_LARGEST && size <= POWER_BLOCK / 2) {
      /* Make sure items are always n-byte aligned */
      if (size % CHUNK_ALIGN_BYTES)
            size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);//字节对齐 ,32位系统是4字节对齐,64位系统是8字节对齐
  slabclass.size = size;
      slabclass.perslab = POWER_BLOCK / slabclass.size;
      size *= factor;
      if (settings.verbose > 1) {
            fprintf(stderr, "slab class %3d: chunk size %6u perslab %5u\n",
                  i, slabclass.size, slabclass.perslab);
      }
    }
  power_largest = i;
    slabclass.size = POWER_BLOCK;
    slabclass.perslab = 1;
  /* for the test suite:faking of how much we've already malloc'd */
    {
      char *t_initial_malloc = getenv("T_MEMD_INITIAL_MALLOC");
      if (t_initial_malloc) {
            mem_malloced = (size_t)atol(t_initial_malloc);
      }
  }
  }
  
用到的几个常数
#define POWER_SMALLEST 1
#define POWER_LARGEST200
#define POWER_BLOCK 1048576 //2的20次方
#define CHUNK_ALIGN_BYTES (sizeof(void *))
  函数生成如下序列(factor在1.25的情况)
-slabclass 0x0046eb80 slabclass {size=0 perslab=0 slots=0x00000000 ...} slabclass_t
+ {size=0 perslab=0 slots=0x00000000 ...} slabclass_t
+ {size=88 perslab=11915 slots=0x00000000 ...} slabclass_t
+ {size=112 perslab=9362 slots=0x00000000 ...} slabclass_t
+ {size=140 perslab=7489 slots=0x00000000 ...} slabclass_t
+ {size=176 perslab=5957 slots=0x00000000 ...} slabclass_t
+ {size=220 perslab=4766 slots=0x00000000 ...} slabclass_t
+ {size=276 perslab=3799 slots=0x00000000 ...} slabclass_t
+ {size=348 perslab=3013 slots=0x00000000 ...} slabclass_t
+ {size=436 perslab=2404 slots=0x00000000 ...} slabclass_t
+ {size=548 perslab=1913 slots=0x00000000 ...} slabclass_t
+ {size=688 perslab=1524 slots=0x00000000 ...} slabclass_t
+ {size=860 perslab=1219 slots=0x00000000 ...} slabclass_t
+ {size=1076 perslab=974 slots=0x00000000 ...} slabclass_t
+ {size=1348 perslab=777 slots=0x00000000 ...} slabclass_t
+ {size=1688 perslab=621 slots=0x00000000 ...} slabclass_t
+ {size=2112 perslab=496 slots=0x00000000 ...} slabclass_t
+ {size=2640 perslab=397 slots=0x00000000 ...} slabclass_t
+ {size=3300 perslab=317 slots=0x00000000 ...} slabclass_t
+ {size=4128 perslab=254 slots=0x00000000 ...} slabclass_t
+ {size=5160 perslab=203 slots=0x00000000 ...} slabclass_t
+ {size=6452 perslab=162 slots=0x00000000 ...} slabclass_t
+ {size=8068 perslab=129 slots=0x00000000 ...} slabclass_t
+ {size=10088 perslab=103 slots=0x00000000 ...} slabclass_t
+ {size=12612 perslab=83 slots=0x00000000 ...} slabclass_t
+ {size=15768 perslab=66 slots=0x00000000 ...} slabclass_t
+ {size=19712 perslab=53 slots=0x00000000 ...} slabclass_t
+ {size=24640 perslab=42 slots=0x00000000 ...} slabclass_t
+ {size=30800 perslab=34 slots=0x00000000 ...} slabclass_t
+ {size=38500 perslab=27 slots=0x00000000 ...} slabclass_t
+ {size=48128 perslab=21 slots=0x00000000 ...} slabclass_t
+ {size=60160 perslab=17 slots=0x00000000 ...} slabclass_t
+ {size=75200 perslab=13 slots=0x00000000 ...} slabclass_t
+ {size=94000 perslab=11 slots=0x00000000 ...} slabclass_t
+ {size=117500 perslab=8 slots=0x00000000 ...} slabclass_t
+ {size=146876 perslab=7 slots=0x00000000 ...} slabclass_t
+ {size=183596 perslab=5 slots=0x00000000 ...} slabclass_t
+ {size=229496 perslab=4 slots=0x00000000 ...} slabclass_t
+ {size=286872 perslab=3 slots=0x00000000 ...} slabclass_t
+ {size=358592 perslab=2 slots=0x00000000 ...} slabclass_t
+ {size=448240 perslab=2 slots=0x00000000 ...} slabclass_t
+ {size=1048576 perslab=1 slots=0x00000000 ...} slabclass_t
  
/* create the initial listening connection */
    if (!(listen_conn = conn_new(l_socket, conn_listening,
                                 EV_READ | EV_PERSIST, 1, false, main_base))) {
      fprintf(stderr, "failed to create listening connection");
      exit(EXIT_FAILURE);
    }
  
conn *conn_new(const int sfd, const int init_state, const int event_flags,
                const int read_buffer_size, const bool is_udp, struct event_base *base) {
    conn *c = conn_from_freelist();
  if (NULL == c) {
      if (!(c = (conn *)malloc(sizeof(conn)))) {
            perror("malloc()");
            return NULL;
      }
      c->rbuf = c->wbuf = 0;
      c->ilist = 0;
      c->suffixlist = 0;
      c->iov = 0;
      c->msglist = 0;
      c->hdrbuf = 0;
  c->rsize = read_buffer_size;
      c->wsize = DATA_BUFFER_SIZE;
      c->isize = ITEM_LIST_INITIAL;
      c->suffixsize = SUFFIX_LIST_INITIAL;
      c->iovsize = IOV_LIST_INITIAL;
      c->msgsize = MSG_LIST_INITIAL;
      c->hdrsize = 0;
  c->rbuf = (char *)malloc((size_t)c->rsize);
      c->wbuf = (char *)malloc((size_t)c->wsize);
      c->ilist = (item **)malloc(sizeof(item *) * c->isize);
      c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
      c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
      c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
  if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
                c->msglist == 0 || c->suffixlist == 0) {
            if (c->rbuf != 0) free(c->rbuf);
            if (c->wbuf != 0) free(c->wbuf);
            if (c->ilist !=0) free(c->ilist);
            if (c->suffixlist != 0) free(c->suffixlist);
            if (c->iov != 0) free(c->iov);
            if (c->msglist != 0) free(c->msglist);
            free(c);
            perror("malloc()");
            return NULL;
      }
  STATS_LOCK();
      stats.conn_structs++;
      STATS_UNLOCK();
    }
  if (settings.verbose > 1) {
      if (init_state == conn_listening)
            fprintf(stderr, "<%d server listening\n", sfd);
      else if (is_udp)
            fprintf(stderr, "<%d server listening (udp)\n", sfd);
      else
            fprintf(stderr, "<%d new client connection\n", sfd);
    }
  c->sfd = sfd;
    c->udp = is_udp;
    c->state = init_state;
    c->rlbytes = 0;
    c->rbytes = c->wbytes = 0;
    c->wcurr = c->wbuf;
    c->rcurr = c->rbuf;
    c->ritem = 0;
    c->icurr = c->ilist;
    c->suffixcurr = c->suffixlist;
    c->ileft = 0;
    c->suffixleft = 0;
    c->iovused = 0;
    c->msgcurr = 0;
    c->msgused = 0;
  c->write_and_go = conn_read;
    c->write_and_free = 0;
    c->item = 0;
    c->bucket = -1;
    c->gen = 0;
  event_set(&c->event, sfd, event_flags, event_handler, (void *)c);//Libevent eventset 设置事件处理函数
    event_base_set(base, &c->event); //libevent函数加入到eventbase
    c->ev_flags = event_flags;
  if (event_add(&c->event, 0) == -1) {//libevent函数 加入到进入事件循环
      if (conn_add_to_freelist(c)) {
            conn_free(c);
      }
      return NULL;
    }
  STATS_LOCK();
    stats.curr_conns++;
    stats.total_conns++;
    STATS_UNLOCK();
  return c;
}
# define conn_from_freelist()      do_conn_from_freelist()
conn *do_conn_from_freelist() {
    conn *c;
  if (freecurr > 0) {
      c = freeconns[--freecurr];
    } else {
      c = NULL;
    }
  return c;
}
conn定义如下
typedef struct {
    int    sfd;
    int    state;
    struct event event;
    shortev_flags;
    shortwhich;   /** which events were just triggered */
  char   *rbuf;   /** buffer to read commands into */
    char   *rcurr;/** but if we parsed some already, this is where we stopped */
    int    rsize;   /** total allocated size of rbuf */
    int    rbytes;/** how much data, starting from rcur, do we have unparsed */
  char   *wbuf;
    char   *wcurr;
    int    wsize;
    int    wbytes;
    int    write_and_go; /** which state to go into after finishing current write */
    void   *write_and_free; /** free this memory after finishing writing */
  char   *ritem;/** when we read in an item's value, it goes here */
    int    rlbytes;
  /* data for the nread state */
  /**
   * item is used to hold an item structure created after reading the command
   * line of set/add/replace commands, but before we finished reading the actual
   * data. The data is read into ITEM_data(item) to avoid extra copying.
   */
  void   *item;   /* for commands set/add/replace*/
    int    item_comm; /* which one is it: set/add/replace */
  /* data for the swallow state */
    int    sbytes;    /* how many bytes to swallow */
  /* data for the mwrite state */
    struct iovec *iov;
    int    iovsize;   /* number of elements allocated in iov[] */
    int    iovused;   /* number of elements used in iov[] */
  struct msghdr *msglist;
    int    msgsize;   /* number of elements allocated in msglist[] */
    int    msgused;   /* number of elements used in msglist[] */
    int    msgcurr;   /* element in msglist[] being transmitted now */
    int    msgbytes;/* number of bytes in current msg */
  item   **ilist;   /* list of items to write out */
    int    isize;
    item   **icurr;
    int    ileft;
  char   **suffixlist;
    int    suffixsize;
    char   **suffixcurr;
    int    suffixleft;
  /* data for UDP clients */
    bool   udp;       /* is this is a UDP "connection" */
    int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
    struct sockaddr request_addr; /* Who sent the most recent request */
    socklen_t request_addr_size;
    unsigned char *hdrbuf; /* udp packet headers */
    int    hdrsize;   /* number of headers' worth of space is allocated */
  int    binary;    /* are we in binary mode */
    int    bucket;    /* bucket number for the next command, if running as
                         a managed instance. -1 (_not_ 0) means invalid. */
    int    gen;       /* generation requested for the bucket */
} conn;
  state字段取值如下枚举
enum conn_states {
    conn_listening,/** the socket which listens for connections */
    conn_read,       /** reading in a command line */
    conn_write,      /** writing out a simple response */
    conn_nread,      /** reading in a fixed number of bytes */
    conn_swallow,    /** swallowing unnecessary bytes w/o storing */
    conn_closing,    /** closing this connection */
    conn_mwrite,   /** writing out many items sequentially */
};
  conn_new这个函数负责将原始套接字封装成为一个conn对象,同时会注册与该conn对象相关的IO事件,并指定该连接(conn)的初始状态。
listen_conn = conn_new(l_socket, conn_listening,
                                 EV_READ | EV_PERSIST, 1, false, main_base)
这个连接的初始状态为conn_listening,
监听READ事件,EV_PERSIST,表明是一个永久事件
  /* initialise clock event */
    clock_handler(0, 0, 0);
  static void clock_handler(const int fd, const short which, void *arg) {
    struct timeval t = {t.tv_sec = 1, t.tv_usec = 0};
    static bool initialized = false;
  if (initialized) {
      /* only delete the event if it's actually there. */
      evtimer_del(&clockevent);
    } else {
      initialized = true;
    }
  evtimer_set(&clockevent, clock_handler, 0);
    event_base_set(main_base, &clockevent);
    evtimer_add(&clockevent, &t);
  set_current_time();
}
设置时钟事件,奇怪的是const short which, void *arg这两个参数都没用上。
时钟事件每秒出发一次struct timeval t = {t.tv_sec = 1, t.tv_usec = 0};
delete_handler(0, 0, 0); /* sets up the event */
每5秒触发一次,清理删除的item项。
static void delete_handler(const int fd, const short which, void *arg) {
    struct timeval t = {t.tv_sec = 5, t.tv_usec = 0};
    static bool initialized = false;
  if (initialized) {
      /* some versions of libevent don't like deleting events that don't exist,
         so only delete once we know this event has been added. */
      evtimer_del(&deleteevent);
    } else {
      initialized = true;
    }
  evtimer_set(&deleteevent, delete_handler, 0);
    event_base_set(main_base, &deleteevent);
    evtimer_add(&deleteevent, &t);
    run_deferred_deletes();
}
  event_base_loop(main_base, 0);
开始事件循环
  main()函数分析完毕。
页: [1]
查看完整版本: memcached源代码阅读(2)-main函数