设为首页 收藏本站
查看: 1325|回复: 0

[经验分享] Memcached源代码阅读(3) 网络通信

[复制链接]
发表于 2015-8-31 14:20:17 | 显示全部楼层 |阅读模式
  if (!(listen_conn = conn_new(l_socket, conn_listening,
                                 EV_READ | EV_PERSIST, 1, false, main_base)))
conn *conn_new(const int sfd, const int init_state, const int event_flags,
                const int read_buffer_size, const bool is_udp, struct event_base *base) {
    conn *c = conn_from_freelist();
  if (NULL == c) {
        if (!(c = (conn *)malloc(sizeof(conn)))) {
            perror("malloc()");
            return NULL;
        }
        c->rbuf = c->wbuf = 0;
        c->ilist = 0;
        c->suffixlist = 0;
        c->iov = 0;
        c->msglist = 0;
        c->hdrbuf = 0;
  c->rsize = read_buffer_size;
        c->wsize = DATA_BUFFER_SIZE;
        c->isize = ITEM_LIST_INITIAL;
        c->suffixsize = SUFFIX_LIST_INITIAL;
        c->iovsize = IOV_LIST_INITIAL;
        c->msgsize = MSG_LIST_INITIAL;
        c->hdrsize = 0;
  c->rbuf = (char *)malloc((size_t)c->rsize);
        c->wbuf = (char *)malloc((size_t)c->wsize);
        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
  if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
                c->msglist == 0 || c->suffixlist == 0) {
            if (c->rbuf != 0) free(c->rbuf);
            if (c->wbuf != 0) free(c->wbuf);
            if (c->ilist !=0) free(c->ilist);
            if (c->suffixlist != 0) free(c->suffixlist);
            if (c->iov != 0) free(c->iov);
            if (c->msglist != 0) free(c->msglist);
            free(c);
            perror("malloc()");
            return NULL;
        }
  STATS_LOCK();
        stats.conn_structs++;
        STATS_UNLOCK();
    }
  if (settings.verbose > 1) {
        if (init_state == conn_listening)
            fprintf(stderr, "<%d server listening\n", sfd);
        else if (is_udp)
            fprintf(stderr, "<%d server listening (udp)\n", sfd);
        else
            fprintf(stderr, "<%d new client connection\n", sfd);
    }
  c->sfd = sfd;
    c->udp = is_udp;
    c->state = init_state;
    c->rlbytes = 0;
    c->rbytes = c->wbytes = 0;
    c->wcurr = c->wbuf;
    c->rcurr = c->rbuf;
    c->ritem = 0;
    c->icurr = c->ilist;
    c->suffixcurr = c->suffixlist;
    c->ileft = 0;
    c->suffixleft = 0;
    c->iovused = 0;
    c->msgcurr = 0;
    c->msgused = 0;
  c->write_and_go = conn_read;
    c->write_and_free = 0;
    c->item = 0;
    c->bucket = -1;
    c->gen = 0;
  event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;
  if (event_add(&c->event, 0) == -1) {
        if (conn_add_to_freelist(c)) {
            conn_free(c);
        }
        return NULL;
    }
  STATS_LOCK();
    stats.curr_conns++;
    stats.total_conns++;
    STATS_UNLOCK();
  return c;
}
telnet 127.0.0.1 11211
当有新的链接进来时event_handler被调用
void event_handler(const int fd, const short which, void *arg) {
    conn *c;
  c = (conn *)arg;
    assert(c != NULL);
  c->which = which;
  /* sanity */
    if (fd != c->sfd) {
        if (settings.verbose > 0)
            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
        conn_close(c);
        return;
    }
  drive_machine(c);
  /* wait for next event */
    return;
}
  drive_machine()
处理以下连接状态
conn_listening 监听socket连接
conn_read 读取请求
conn_nread 从客户端读取固定长度的数据
conn_write 向客户端写入response
conn_mwrite 向客户端写入items
conn_swallow 连接异常时输出一些错误消息
conn_closing 关闭这次连接
  
static void drive_machine(conn *c) {
    bool stop = false;
    int sfd, flags = 1;
    socklen_t addrlen;
    struct sockaddr addr;
    int res;
  assert(c != NULL);
  while (!stop) {
  switch(c->state) {
        case conn_listening:
            addrlen = sizeof(addr);
            if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    /* these are transient, so don't log anything */
                    stop = true;
                } else if (errno == EMFILE) {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Too many open connections\n");
                    accept_new_conns(false);
                    stop = true;
                } else {
                    perror("accept()");
                    stop = true;
                }
                break;
            }
            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
                perror("setting O_NONBLOCK");
                close(sfd);
                break;
            }
            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, false);
            break;
  case conn_read:
            ...
        }
    }
  return;
}
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, false);
# define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base)
dispatch_conn_new 仍然是一个新的连接
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, false);
conn *conn_new(const int sfd, const int init_state, const int event_flags,
                const int read_buffer_size, const bool is_udp, struct event_base *base)
  这个新的连接仍然使用event_handler
event_handler就是一个状态机。
新建立的客户端连接处于conn_read状态,
而原有的侦听连接处于listening状态。
  在telnet终端中输入命令stats
这时候进入conn_read分支
static void drive_machine(conn *c) {
    bool stop = false;
    int sfd, flags = 1;
    socklen_t addrlen;
    struct sockaddr addr;
    int res;
  assert(c != NULL);
  while (!stop) {
  switch(c->state) {
      
  case conn_read:
            if (try_read_command(c) != 0) {
                continue;
            }
            if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
                continue;
            }
            /* we have no command line and no data to read from network */
            if (!update_event(c, EV_READ | EV_PERSIST)) {
                if (settings.verbose > 0)
                    fprintf(stderr, "Couldn't update event\n");
                conn_set_state(c, conn_closing);
                break;
            }
            stop = true;
            break;
  
  return;
}
  try_read_command()
进入命令解析阶段
if (try_read_command(c) != 0) {
                continue;
            }
  
static int try_read_command(conn *c) {
    char *el, *cont;
  assert(c != NULL);
    assert(c->rcurr <= (c->rbuf + c->rsize));
  if (c->rbytes == 0)
        return 0;
    el = memchr(c->rcurr, '\n', c->rbytes);
    if (!el)
        return 0;
    cont = el + 1;
    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
        el--;
    }
    *el = '\0';
  assert(cont <= (c->rcurr + c->rbytes));
  process_command(c, c->rcurr);
  c->rbytes -= (cont - c->rcurr);
    c->rcurr = cont;
  assert(c->rcurr <= (c->rbuf + c->rsize));
  return 1;
}
try_read_command()的作用就是找到命令行的结尾,如果找到行末尾,就调用process_command(c, c->rcurr);处理这行命令。
否则返回0,返回0后,则调用try_read_network获取网络数据。
  static int try_read_network(conn *c) {
    int gotdata = 0;
    int res;
  assert(c != NULL);
  if (c->rcurr != c->rbuf) {
        if (c->rbytes != 0) /* otherwise there's nothing to copy */
            memmove(c->rbuf, c->rcurr, c->rbytes);
        c->rcurr = c->rbuf;
    }
  while (1) {
        if (c->rbytes >= c->rsize) {
            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
            if (!new_rbuf) {
                if (settings.verbose > 0)
                    fprintf(stderr, "Couldn't realloc input buffer\n");
                c->rbytes = 0; /* ignore what we read */
                out_string(c, "SERVER_ERROR out of memory");
                c->write_and_go = conn_closing;
                return 1;
            }
            c->rcurr = c->rbuf = new_rbuf;
            c->rsize *= 2;
        }
  /* unix socket mode doesn't need this, so zeroed out.  but why
         * is this done for every command?  presumably for UDP
         * mode.  */
        if (!settings.socketpath) {
            c->request_addr_size = sizeof(c->request_addr);
        } else {
            c->request_addr_size = 0;
        }
  res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
        if (res > 0) {
            STATS_LOCK();
            stats.bytes_read += res;
            STATS_UNLOCK();
            gotdata = 1;
            c->rbytes += res;
            continue;
        }
        if (res == 0) {
            /* connection closed */
            conn_set_state(c, conn_closing);
            return 1;
        }
        if (res == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
            else return 0;
        }
    }
    return gotdata;
}
  res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
if (res == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
这里判断返回
因为这里的socket都是非阻塞的,EAGAIN 和EWOULDBLOCK都不算严重错误, 因此如果出错应该重新读取。
  EWOULDBLOCK的意思是如果你不把socket设成非阻塞(即阻塞)模式时,这个读操作将阻塞,也就是说数据还未准备好(但系统知道数据来了,所以select告诉你那个socket可读)。使用非阻塞模式做I/O操作的细心的人会检查errno是不是EAGAIN、EWOULDBLOCK、EINTR,如果是就应该重读,一般是用循环。如果你不是一定要用非阻塞就不要设成这样,这就是为什么系统的默认模式是阻塞。
当没有网络数据也没有换行的时候调用update_event,结束driver_machine
  if (!update_event(c, EV_READ | EV_PERSIST)) {
                if (settings.verbose > 0)
                    fprintf(stderr, "Couldn't update event\n");
                conn_set_state(c, conn_closing);
                break;
            }
            stop = true;
  输入命令stats,
try_read_command 读取成功后,进入process_command处理命令
static void process_command(conn *c, char *command) {
  token_t tokens[MAX_TOKENS];
    size_t ntokens;
    int comm;
  assert(c != NULL);
  if (settings.verbose > 1)
        fprintf(stderr, "<%d %s\n", c->sfd, command);
  /*
     * for commands set/add/replace, we build an item and read the data
     * directly into it, then continue in nread_complete().
     */
  c->msgcurr = 0;
    c->msgused = 0;
    c->iovused = 0;
    if (add_msghdr(c) != 0) {
        out_string(c, "SERVER_ERROR out of memory");
        return;
    }
  ntokens = tokenize_command(command, tokens, MAX_TOKENS);
    if (ntokens >= 3 &&
        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
  process_get_command(c, tokens, ntokens, false);
  } else if (ntokens == 6 &&
               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
  process_update_command(c, tokens, ntokens, comm, false);
  } else if (ntokens == 7 && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
  process_update_command(c, tokens, ntokens, comm, true);
  } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
  process_arithmetic_command(c, tokens, ntokens, 1);
  } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
  process_get_command(c, tokens, ntokens, true);
  } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
  process_arithmetic_command(c, tokens, ntokens, 0);
  } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
  process_delete_command(c, tokens, ntokens);
  } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
        unsigned int bucket, gen;
        if (!settings.managed) {
            out_string(c, "CLIENT_ERROR not a managed instance");
            return;
        }
  if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
                out_string(c, "CLIENT_ERROR bucket number out of range");
                return;
            }
            buckets[bucket] = gen;
            out_string(c, "OWNED");
            return;
        } else {
            out_string(c, "CLIENT_ERROR bad format");
            return;
        }
  } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
  int bucket;
        if (!settings.managed) {
            out_string(c, "CLIENT_ERROR not a managed instance");
            return;
        }
        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
                out_string(c, "CLIENT_ERROR bucket number out of range");
                return;
            }
            buckets[bucket] = 0;
            out_string(c, "DISOWNED");
            return;
        } else {
            out_string(c, "CLIENT_ERROR bad format");
            return;
        }
  } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
        int bucket, gen;
        if (!settings.managed) {
            out_string(c, "CLIENT_ERROR not a managed instance");
            return;
        }
        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
            /* we never write anything back, even if input's wrong */
            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
                /* do nothing, bad input */
            } else {
                c->bucket = bucket;
                c->gen = gen;
            }
            conn_set_state(c, conn_read);
            return;
        } else {
            out_string(c, "CLIENT_ERROR bad format");
            return;
        }
  } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
  process_stat(c, tokens, ntokens);
  } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
        time_t exptime = 0;
        set_current_time();
  if(ntokens == 2) {
            settings.oldest_live = current_time - 1;
            item_flush_expired();
            out_string(c, "OK");
            return;
        }
  exptime = strtol(tokens[1].value, NULL, 10);
        if(errno == ERANGE) {
            out_string(c, "CLIENT_ERROR bad command line format");
            return;
        }
  settings.oldest_live = realtime(exptime) - 1;
        item_flush_expired();
        out_string(c, "OK");
        return;
  } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
  out_string(c, "VERSION " VERSION);
  } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
  conn_set_state(c, conn_closing);
  } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
#ifdef ALLOW_SLABS_REASSIGN
  
#else
        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
#endif
    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
        process_verbosity_command(c, tokens, ntokens);
    } else {
        out_string(c, "ERROR");
    }
    return;
}
tokenize_command(char *command, token_t *tokens, const size_t max_tokens)
把命令分解成一个个终止符,把空格替换成\0。
typedef struct token_s {
    char *value;
    size_t length;
} token_t;
每个token结构如上,包括一个token的字符串和长度。
stats\n命令解析后如下
-  tokens 0x0018f8b0 {value=0x005a4fd0 "stats" length=5 } token_s [7]
+  [0] {value=0x005a4fd0 "stats" length=5 } token_s
+  [1] {value=0x00000000 <错误的指针> length=0 } token_s
然后process_command进入分支
(ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
  process_stat(c, tokens, ntokens);
  }
  
static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
    rel_time_t now = current_time;
    char *command;
    char *subcommand;
  assert(c != NULL);
  if(ntokens < 2) {
        out_string(c, "CLIENT_ERROR bad command line");
        return;
    }
  command = tokens[COMMAND_TOKEN].value;
  if (ntokens == 2 && strcmp(command, "stats") == 0) {
        char temp[1024];
        pid_t pid = getpid();
        char *pos = temp;
  #ifndef WIN32
        struct rusage usage;
        getrusage(RUSAGE_SELF, &usage);
#endif /* !WIN32 */
  STATS_LOCK();
        pos += sprintf(pos, "STAT pid %u\r\n", pid);
        pos += sprintf(pos, "STAT uptime %u\r\n", now);
        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
        pos += sprintf(pos, "STAT version " VERSION "\r\n");
        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
#ifndef WIN32
        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
#endif /* !WIN32 */
        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
        pos += sprintf(pos, "END");
        STATS_UNLOCK();
        out_string(c, temp);
        return;
    }
  subcommand = tokens[SUBCOMMAND_TOKEN].value;
  if (strcmp(subcommand, "reset") == 0) {
        stats_reset();
        out_string(c, "RESET");
        return;
    }
  #ifdef HAVE_MALLOC_H
#endif /* HAVE_MALLOC_H */
  #ifndef WIN32
   
#endif
  if (strcmp(subcommand, "cachedump") == 0) {
  char *buf;
        unsigned int bytes, id, limit = 0;
  if(ntokens < 5) {
            out_string(c, "CLIENT_ERROR bad command line");
            return;
        }
  id = strtoul(tokens[2].value, NULL, 10);
        limit = strtoul(tokens[3].value, NULL, 10);
  if(errno == ERANGE) {
            out_string(c, "CLIENT_ERROR bad command line format");
            return;
        }
  buf = item_cachedump(id, limit, &bytes);
        write_and_free(c, buf, bytes);
        return;
    }
  if (strcmp(subcommand, "slabs") == 0) {
        int bytes = 0;
        char *buf = slabs_stats(&bytes);
        write_and_free(c, buf, bytes);
        return;
    }
  if (strcmp(subcommand, "items") == 0) {
        int bytes = 0;
        char *buf = item_stats(&bytes);
        write_and_free(c, buf, bytes);
        return;
    }
  if (strcmp(subcommand, "detail") == 0) {
        if (ntokens < 4)
            process_stats_detail(c, "");  /* outputs the error message */
        else
            process_stats_detail(c, tokens[2].value);
        return;
    }
  if (strcmp(subcommand, "sizes") == 0) {
        int bytes = 0;
        char *buf = item_stats_sizes(&bytes);
        write_and_free(c, buf, bytes);
        return;
    }
  out_string(c, "ERROR");
}
  这个命令生成输出后,调用out_string(c, temp);写入socket
static void out_string(conn *c, const char *str) {
    size_t len;
  assert(c != NULL);
  if (settings.verbose > 1)
        fprintf(stderr, ">%d %s\n", c->sfd, str);
  len = strlen(str);
    if ((len + 2) > c->wsize) {
        /* ought to be always enough. just fail for simplicity */
        str = "SERVER_ERROR output line too long";
        len = strlen(str);
    }
  memcpy(c->wbuf, str, len);
    memcpy(c->wbuf + len, "\r\n", 3);
    c->wbytes = len + 2;
    c->wcurr = c->wbuf;
  conn_set_state(c, conn_write);
    c->write_and_go = conn_read;
    return;
}
  static void conn_set_state(conn *c, int state) {
    assert(c != NULL);
  if (state != c->state) {
        if (state == conn_read) {
            conn_shrink(c);
            assoc_move_next_bucket();
        }
        c->state = state;
    }
}
  
之后drive_machine()函数进入写入分支
case conn_write:
            /*
             * We want to write out a simple response. If we haven't already,
             * assemble it into a msgbuf list (this will be a single-entry
             * list for TCP or a two-entry list for UDP).
             */
            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
                    (c->udp && build_udp_headers(c) != 0)) {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Couldn't build response\n");
                    conn_set_state(c, conn_closing);
                    break;
                }
            }
  static int add_iov(conn *c, const void *buf, int len) {
    struct msghdr *m;
    int leftover;
    bool limit_to_mtu;
  assert(c != NULL);
  do {
        m = &c->msglist[c->msgused - 1];
  /*
         * Limit UDP packets, and the first payloads of TCP replies, to
         * UDP_MAX_PAYLOAD_SIZE bytes.
         */
        limit_to_mtu = c->udp || (1 == c->msgused);
  /* We may need to start a new msghdr if this one is full. */
        if (m->msg_iovlen == IOV_MAX ||
            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
            add_msghdr(c);
            m = &c->msglist[c->msgused - 1];
        }
  if (ensure_iov_space(c) != 0)
            return -1;
  /* If the fragment is too big to fit in the datagram, split it up */
        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
            len -= leftover;
        } else {
            leftover = 0;
        }
  m = &c->msglist[c->msgused - 1];
        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
        m->msg_iov[m->msg_iovlen].iov_len = len;
  c->msgbytes += len;
        c->iovused++;
        m->msg_iovlen++;
  buf = ((char *)buf) + len;
        len = leftover;
    } while (leftover > 0);
  return 0;
}
  case conn_write:
add_iov直接进入conn_mwrite状态。
case conn_mwrite:
            switch (transmit(c)) {
            case TRANSMIT_COMPLETE:
                if (c->state == conn_mwrite) {
                    while (c->ileft > 0) {
                        item *it = *(c->icurr);
                        assert((it->it_flags & ITEM_SLABBED) == 0);
                        item_remove(it);
                        c->icurr++;
                        c->ileft--;
                    }
                    while (c->suffixleft > 0) {
                        char *suffix = *(c->suffixcurr);
                        if(suffix_add_to_freelist(suffix)) {
                            /* Failed to add to freelist, don't leak */
                            free(suffix);
                        }
                        c->suffixcurr++;
                        c->suffixleft--;
                    }
                    conn_set_state(c, conn_read);
                } else if (c->state == conn_write) {
                    if (c->write_and_free) {
                        free(c->write_and_free);
                        c->write_and_free = 0;
                    }
                    conn_set_state(c, c->write_and_go);
                } else {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Unexpected state %d\n", c->state);
                    conn_set_state(c, conn_closing);
                }
                break;
  case TRANSMIT_INCOMPLETE:
            case TRANSMIT_HARD_ERROR:
                break;                   /* Continue in state machine. */
  case TRANSMIT_SOFT_ERROR:
                stop = true;
                break;
            }
            break;

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-106872-1-1.html 上篇帖子: memcached配置 启动 下篇帖子: 配置spring-mvc + simple-spring-memcached
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表