hgtvf 发表于 2015-11-19 12:55:25

squid Aiops.c多线程IO理解

  // 初始化aio。根据配置文件创建aio磁盘io线程
void
squidaio_init(void)
{
    int i;
    int done_pipe;
    squidaio_thread_t *threadp;

    if (squidaio_initialised)
    return;

    pthread_attr_init(&globattr);
#if HAVE_PTHREAD_ATTR_SETSCOPE
    pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
#endif
#if HAVE_SCHED_H
    globsched.sched_priority = 1;
#endif
    main_thread = pthread_self();
#if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
    pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
#endif
#if HAVE_SCHED_H
    globsched.sched_priority = 2;
#endif
#if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
    pthread_attr_setschedparam(&globattr, &globsched);
#endif

    /* Give each thread a smaller 256KB stack, should be more than sufficient */
    pthread_attr_setstacksize(&globattr, 256 * 1024);

    /* Initialize request queue */
    if (pthread_mutex_init(&(request_queue.mutex), NULL))
    libcore_fatalf("Failed to create mutex");
    if (pthread_cond_init(&(request_queue.cond), NULL))
    libcore_fatalf("Failed to create condition variable");
    request_queue.head = NULL;
    request_queue.tailp = &request_queue.head;
    request_queue.requests = 0;
    request_queue.blocked = 0;

    /* Initialize done queue */
    if (pthread_mutex_init(&(done_queue.mutex), NULL))
    libcore_fatalf("Failed to create mutex");
    if (pthread_cond_init(&(done_queue.cond), NULL))
    libcore_fatalf("Failed to create condition variable");
    done_queue.head = NULL;
    done_queue.tailp = &done_queue.head;
    done_queue.requests = 0;
    done_queue.blocked = 0;

    /* Initialize done pipe signal */
    pipe(done_pipe);
    done_fd = done_pipe;
    done_fd_read = done_pipe;
    fd_open(done_fd_read, FD_PIPE, "async-io completion event: main");
    fd_open(done_fd, FD_PIPE, "async-io completion event: threads");
    commSetNonBlocking(done_pipe);
    commSetNonBlocking(done_pipe);
    commSetCloseOnExec(done_pipe);
    commSetCloseOnExec(done_pipe);
    commSetSelect(done_pipe, COMM_SELECT_READ, squidaio_fdhandler, NULL, 0);

    /* Create threads and get them to sit in their wait loop */
    squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));

    /* Default to basing the thread count on THREAD_FACTOR and aiops_default_ndirs */
    if (squidaio_nthreads == 0) {
    int j = THREAD_FACTOR;
    for (i = 0; i < aiops_default_ndirs; i&#43;&#43;) {
      squidaio_nthreads &#43;= j;
      j = j * 2 / 3;
      if (j < 4)
      j = 4;
    }
    }
    if (squidaio_nthreads == 0)
    squidaio_nthreads = THREAD_FACTOR;
    squidaio_magic1 = squidaio_nthreads * MAGIC1_FACTOR;
    squidaio_magic2 = squidaio_nthreads * MAGIC2_FACTOR;
    for (i = 0; i < squidaio_nthreads; i&#43;&#43;) {
    threadp = memPoolAlloc(squidaio_thread_pool);
    threadp->status = _THREAD_STARTING;
    threadp->current_req = NULL;
    threadp->requests = 0;
    threadp->next = threads;
    threads = threadp;
    if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
      fprintf(stderr, &quot;Thread creation failed\n&quot;);
      threadp->status = _THREAD_FAILED;
      continue;
    }
    }

    /* Create request pool */
    squidaio_request_pool = memPoolCreate(&quot;aio_request&quot;, sizeof(squidaio_request_t));

    squidaio_initialised = 1;
}

  

  // 线程处理函数

static void *
squidaio_thread_loop(void *ptr)
{
    squidaio_thread_t *threadp = ptr;
    squidaio_request_t *request;
    sigset_t new;

    /*
   * Make sure to ignore signals which may possibly get sent to
   * the parent squid thread.Causes havoc with mutex's and
   * condition waits otherwise
   */
    // 清空信号空间
  sigemptyset(&new);
  // 给信号空间添加新信号

    sigaddset(&new, SIGPIPE);
    sigaddset(&new, SIGCHLD);
#ifdef _SQUID_LINUX_THREADS_
    sigaddset(&new, SIGQUIT);
    sigaddset(&new, SIGTRAP);
#else
    sigaddset(&new, SIGUSR1);
    sigaddset(&new, SIGUSR2);
#endif
    sigaddset(&new, SIGHUP);
    sigaddset(&new, SIGTERM);
    sigaddset(&new, SIGINT);
  sigaddset(&new, SIGALRM);
  // 将信号掩码置阻塞

    pthread_sigmask(SIG_BLOCK, &new, NULL);

    while (1) {
    threadp->current_req = request = NULL;
    request = NULL;
    /* Get a request to process */
    threadp->status = _THREAD_WAITING;
  pthread_mutex_lock(&request_queue.mutex);
  // 循环查看请求队列,如果不是NULL。解锁,查看是否有信号请求,如果没有,进入sleep状态等待信号

    while (!request_queue.head) {
      pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
  }
  // 刚才处理的request

    request = request_queue.head;
    if (request)
  request_queue.head = request->next;
  // 处理,如果request_queue空了,那么重新初始化将request_queue的尾设成头

    if (!request_queue.head)
  request_queue.tailp = &request_queue.head;
  // 解锁,进行处理请求阶段。设置当前线程的状态。根据请求不同类型调用不同的aio处理

    pthread_mutex_unlock(&request_queue.mutex);
    /* process the request */
    threadp->status = _THREAD_BUSY;
    request->next = NULL;
    threadp->current_req = request;
    errno = 0;
    if (!request->cancelled) {
      switch (request->request_type) {
      case _AIO_OP_OPEN:
      squidaio_do_open(request);
      break;
      case _AIO_OP_READ:
      squidaio_do_read(request);
      break;
      case _AIO_OP_WRITE:
      squidaio_do_write(request);
      break;
      case _AIO_OP_CLOSE:
      squidaio_do_close(request);
      break;
      case _AIO_OP_UNLINK:
      squidaio_do_unlink(request);
      break;
      case _AIO_OP_TRUNCATE:
      squidaio_do_truncate(request);
      break;
#if AIO_OPENDIR            /* Opendir not implemented yet */
      case _AIO_OP_OPENDIR:
      squidaio_do_opendir(request);
      break;
#endif
      case _AIO_OP_STAT:
      squidaio_do_stat(request);
      break;
      default:
      request->ret = -1;
      request->err = EINVAL;
      break;
      }
    } else {      /* cancelled */
      request->ret = -1;
      request->err = EINTR;
    }
    threadp->status = _THREAD_DONE;
  /* put the request in the done queue */
  // 加锁,将此请求放入完成队列,解锁

    pthread_mutex_lock(&done_queue.mutex);
    *done_queue.tailp = request;
    done_queue.tailp = &request->next;
    pthread_mutex_unlock(&done_queue.mutex);
    if (!done_signalled) {
      done_signalled = 1;
      FD_WRITE_METHOD(done_fd, &quot;!&quot;, 1);
    }
    threadp->requests&#43;&#43;;
    }                /* while forever */
    return NULL;
  }                /* squidaio_thread_loop */
  

  // squid Aio请求队列

static void
squidaio_queue_request(squidaio_request_t * request)
{
    static int high_start = 0;
    debug(43, 9) (&quot;squidaio_queue_request: %p type=%d result=%p\n&quot;,
    request, request->request_type, request->resultp);
    /* Mark it as not executed (failing result, no error) */
    request->ret = -1;
    request->err = 0;
    /* Internal housekeeping */
    request_queue_len &#43;= 1;
    request->resultp->_data = request;
    /* Play some tricks with the request_queue2 queue */
  request->next = NULL;
  // 如果请求队列是NULL(request_queue2阻塞队列)

  if (!request_queue2.head) {
  // 并且给请求队列加锁成功的话(正常情况)

    if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
  /* Normal path */
  // 给队列的最后一个request的next赋值request
  // 然后将tail赋值为此request的next

      *request_queue.tailp = request;
        request_queue.tailp = &request->next;
  // 给阻塞等待状态发送信号,然后解锁

      pthread_cond_signal(&request_queue.cond);
      pthread_mutex_unlock(&request_queue.mutex);
    } else {
  /* Oops, the request queue is blocked, use request_queue2 */
  // 如果加锁失败,将request加入到队列2

      *request_queue2.tailp = request;
      request_queue2.tailp = &request->next;
    }
    } else {
    /* Secondary path. We have blocked requests to deal with */
  /* add the request to the chain */
  // 如果request_queue2不是空,将request加入请求队列2

    *request_queue2.tailp = request;
    if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
      /* Ok, the queue is no longer blocked */
      *request_queue.tailp = request_queue2.head;
      request_queue.tailp = &request->next;
      pthread_cond_signal(&request_queue.cond);
      pthread_mutex_unlock(&request_queue.mutex);
      request_queue2.head = NULL;
      request_queue2.tailp = &request_queue2.head;
    } else {
      /* still blocked, bump the blocked request chain */
      request_queue2.tailp = &request->next;
    }
    }
    if (request_queue2.head) {
    static int filter = 0;
    static int filter_limit = 8;
    if (&#43;&#43;filter >= filter_limit) {
      filter_limit &#43;= filter;
      filter = 0;
      debug(43, 1) (&quot;squidaio_queue_request: WARNING - Queue congestion\n&quot;);
    }
    }
  /* Warn if out of threads */
  // 如果超出线程警告

    if (request_queue_len > MAGIC1) {
    static int last_warn = 0;
    static int queue_high, queue_low;
    if (high_start == 0) {
      high_start = squid_curtime;
      queue_high = request_queue_len;
      queue_low = request_queue_len;
    }
    if (request_queue_len > queue_high)
      queue_high = request_queue_len;
    if (request_queue_len < queue_low)
      queue_low = request_queue_len;
    if (squid_curtime >= (last_warn &#43; 15) &&
      squid_curtime >= (high_start &#43; 5)) {
      debug(43, 1) (&quot;squidaio_queue_request: WARNING - Disk I/O overloading\n&quot;);
      if (squid_curtime >= (high_start &#43; 15))
      debug(43, 1) (&quot;squidaio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%ld\n&quot;,
            request_queue_len, queue_high, queue_low, (long int) (squid_curtime - high_start));
      last_warn = squid_curtime;
    }
    } else {
    high_start = 0;
    }
    /* Warn if seriously overloaded */
    if (request_queue_len > RIDICULOUS_LENGTH) {
    debug(43, 0) (&quot;squidaio_queue_request: Async request queue growing uncontrollably!\n&quot;);
    debug(43, 0) (&quot;squidaio_queue_request: Syncing pending I/O operations.. (blocking)\n&quot;);
    squidaio_sync();
    debug(43, 0) (&quot;squidaio_queue_request: Synced\n&quot;);
    }
}                /* squidaio_queue_request */


         版权声明:本文为博主原创文章,未经博主允许不得转载。
页: [1]
查看完整版本: squid Aiops.c多线程IO理解