古城堡 发表于 2015-11-19 04:56:40

Memcached源码阅读之网络监听的建立

  Memcahced是一个服务器程序,所以需要建立网络监听来接受其他客户端机器的连接,下面分析下其过程,这次分析是基于Memcached 1.4.15版本分析的。
  

//如果socketpath为空,则表示使用的TCP/UDP,不是使用unix socket
if (settings.socketpath == NULL)
{       //可以从环境变量读取端口文件所在的文件路径
const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
char temp_portnumber_filename;
FILE *portnumber_file = NULL;
      //如果端口文件不为空,则打开
if (portnumber_filename != NULL)
{
snprintf(temp_portnumber_filename, sizeof(temp_portnumber_filename),
"%s.lck", portnumber_filename);
portnumber_file = fopen(temp_portnumber_filename, "a");
if (portnumber_file == NULL)
{
fprintf(stderr, "Failed to open \"%s\": %s\n",
temp_portnumber_filename, strerror(errno));
}
}
      //settings.port表示Memcached采用的是TCP协议,创建TCP Socket,监听并且绑定
errno = 0;
if (settings.port
&& server_sockets(settings.port, tcp_transport, portnumber_file))
{
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
//settings.udpport表示Memcached采用的是UDP协议,创建UDP Socket,监听并且绑定
errno = 0;
if (settings.udpport
&& server_sockets(settings.udpport, udp_transport, portnumber_file))
{
vperror("failed to listen on UDP port %d", settings.udpport);
exit(EX_OSERR);
}
      //端口文件不为空
if (portnumber_file)
{
fclose(portnumber_file);//关闭文件
rename(temp_portnumber_filename, portnumber_filename);//重命名端口文件
}
}
//TCP和UDP使用的是同一个接口来创建监听和绑定
static int server_sockets(int port, enum network_transport transport,
    FILE *portnumber_file)
{
//settings.inter指定的是要绑定的ip地址信息,如果为空,则表示是绑定本机一个ip
if (settings.inter == NULL)
{    //执行监听和绑定操作      
    return server_socket(settings.inter, port, transport, portnumber_file);
}
else//如果服务器有多个ip信息,可以在每个(ip,port)上面绑定一个Memcached实例,下面是一些输入参数的解析,解析完毕之后,执行绑定
{
    // tokenize them and bind to each one of them..
    char *b;
    int ret = 0;
    char *list = strdup(settings.inter);
    if (list == NULL)
    {
      fprintf(stderr,
                "Failed to allocate memory for parsing server interface string\n");
      return 1;
    }
    for (char *p = strtok_r(list, ";,", &b); p != NULL;
            p = strtok_r(NULL, ";,", &b))
    {
      int the_port = port;
      char *s = strchr(p, ':');
      if (s != NULL)
      {
            *s = '\0';
            ++s;
            if (!safe_strtol(s, &the_port))
            {
                fprintf(stderr, "Invalid port number: \"%s\"", s);
                return 1;
            }
      }
      if (strcmp(p, "*") == 0)
      {
            p = NULL;
      }
      //绑定多次,循环调用单个的绑定函数      
      ret |= server_socket(p, the_port, transport, portnumber_file);
    }
    free(list);
    return ret;
}
}
//执行真正的绑定
static int server_socket(const char *interface, int port,
    enum network_transport transport, FILE *portnumber_file)
{
int sfd;
struct linger ling = { 0, 0 };
struct addrinfo *ai;
struct addrinfo *next;
//设定协议无关,用于监听的标志位
struct addrinfo hints ={.ai_flags = AI_PASSIVE,.ai_family = AF_UNSPEC};
char port_buf;
int error;
int success = 0;
int flags = 1;
//指定socket的类型,如果是udp,则用数据报协议,如果是tcp,则用数据流协议
hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
if (port == -1)
{
    port = 0;
}
snprintf(port_buf, sizeof(port_buf), "%d", port);
//调用getaddrinfo,将主机地址和端口号映射成为socket地址信息,地址信息由ai带回
error = getaddrinfo(interface, port_buf, &hints, &ai);
if (error != 0)
{
    if (error != EAI_SYSTEM)
      fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
    else
      perror("getaddrinfo()");
    return 1;
}
/*getaddrinfo返回多个addrinfo的情形有如下两种:
1.如果与interface参数关联的地址有多个,那么适用于所请求地址簇的每个地址都返回一个对应的结构。
2.如果port_buf参数指定的服务支持多个套接口类型,那么每个套接口类型都可能返回一个对应的结构。
*/
for (next = ai; next; next = next->ai_next)
{
    conn *listen_conn_add;
    //为每个地址信息建立socket
    if ((sfd = new_socket(next)) == -1)
    {
      //建立socket过程中可能发生的,比如打开文件描述符过多等
      if (errno == EMFILE)
      {
            perror("server_socket");
            exit(EX_OSERR);
      }
      continue;
    }
#ifdef IPV6_V6ONLY
    if (next->ai_family == AF_INET6)
    {   //设定IPV6的选项值,设置了IPV6_V6ONLY,表示只收发IPV6的数据包,此时IPV4和IPV6可以绑定到同一个端口而不影响数据的收发
      error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags,
                sizeof(flags));
      if (error != 0)
      {
            perror("setsockopt");
            close(sfd);
            continue;
      }
    }
#endif
    //设定socket选项,SO_REUSEADDR表示重用地址信息,具体重用哪些东西自行学习,必须在bind操作之前设置
    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *) &flags, sizeof(flags));
    if (IS_UDP(transport))//如果是UDP协议
    {
      maximize_sndbuf(sfd);//扩大发送缓冲区
    }
    else
    {   //设定socket选项,SO_KEEPALIVE表示保活
      error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *) &flags,
                sizeof(flags));
      if (error != 0)
            perror("setsockopt");
      //设定socket选项,SO_LINGER表示执行close操作时,如果缓冲区还有数据,可以继续发送
      error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *) &ling,
                sizeof(ling));
      if (error != 0)
            perror("setsockopt");
      //设定IP选项,TCP_NODELAY表示禁用Nagle算法
      error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *) &flags,
                sizeof(flags));
      if (error != 0)
            perror("setsockopt");
    }
    //执行绑定操作
    if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1)
    {
      if (errno != EADDRINUSE)
      {
            perror("bind()");
            close(sfd);
            freeaddrinfo(ai);
            return 1;
      }
      close(sfd);
      continue;
    }
    else
    {
      success++;
      //如果不是UDP协议,则执行监听操作,监听队列为初始启动的值
      if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1)
      {
            perror("listen()");
            close(sfd);
            freeaddrinfo(ai);
            return 1;
      }
      if (portnumber_file != NULL
                && (next->ai_addr->sa_family == AF_INET
                        || next->ai_addr->sa_family == AF_INET6))
      {
            union
            {
                struct sockaddr_in in;
                struct sockaddr_in6 in6;
            } my_sockaddr;
            socklen_t len = sizeof(my_sockaddr);
            //这时还没连接建立,调用getsockname不知道有什么用?
            if (getsockname(sfd, (struct sockaddr*) &my_sockaddr, &len) == 0)
            {
                if (next->ai_addr->sa_family == AF_INET)
                {
                  fprintf(portnumber_file, "%s INET: %u\n",
                  IS_UDP(transport) ? "UDP" : "TCP",
                            ntohs(my_sockaddr.in.sin_port));
                }
                else
                {
                  fprintf(portnumber_file, "%s INET6: %u\n",
                  IS_UDP(transport) ? "UDP" : "TCP",
                            ntohs(my_sockaddr.in6.sin6_port));
                }
            }
      }
    }
    if (IS_UDP(transport))
    {
      int c;
      for (c = 0; c < settings.num_threads_per_udp; c++)
      {   
            //分发连接,因为UDP没有连接建立的过程,直接进行连接的分发
            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,UDP_READ_BUFFER_SIZE, transport);
      }
    }
    else
    {   
      //TCP,建立连接
      if (!(listen_conn_add = conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1, transport, main_base)))
      {
            fprintf(stderr, &quot;failed to create listening connection\n&quot;);
            exit(EXIT_FAILURE);
      }
      //建立的连接组成链表
      listen_conn_add->next = listen_conn;
      listen_conn = listen_conn_add;
    }
}
//释放资源
freeaddrinfo(ai);
return success == 0;
}
//建立socket
static int new_socket(struct addrinfo *ai)
{
int sfd;
int flags;
//调用系统函数建立socket
if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
{
    return -1;
}
//设定socket为非阻塞的
if ((flags = fcntl(sfd, F_GETFL, 0)) < 0
      || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)
{
    perror(&quot;setting O_NONBLOCK&quot;);
    close(sfd);
    return -1;
}
return sfd;
}
//如果是UDP协议,调整发送缓存到最大值
static void maximize_sndbuf(const int sfd)
{
socklen_t intsize = sizeof(int);
int last_good = 0;
int min, max, avg;
int old_size;
//读取socket的选项,SO_SNDBF表示发送缓存
if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
{
    if (settings.verbose > 0)
      perror(&quot;getsockopt(SO_SNDBUF)&quot;);
    return;
}
//二分搜索来设定,很巧的设计
min = old_size;
max = MAX_SENDBUF_SIZE;
while (min <= max)
{
    avg = ((unsigned int) (min + max)) / 2;
    if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *) &avg, intsize) == 0)
    {
      last_good = avg;
      min = avg + 1;
    }
    else
    {
      max = avg - 1;
    }
}
if (settings.verbose > 1)
    fprintf(stderr, &quot;<%d send buffer was %d, now %d\n&quot;, sfd, old_size,
            last_good);
}
至此,网络相关的部分已经完成,后向连接的建立(conn_new)和连接分发(dispatch_conn_new),我们放到其他博客进行分析。


  

版权声明:本文为博主原创文章,未经博主允许不得转载。
页: [1]
查看完整版本: Memcached源码阅读之网络监听的建立