设为首页 收藏本站
查看: 1243|回复: 0

[经验分享] squid Aiops.c多线程IO理解

[复制链接]

尚未签到

发表于 2015-11-19 12:55:25 | 显示全部楼层 |阅读模式
  // 初始化aio。根据配置文件创建aio磁盘io线程
void
squidaio_init(void)
{
    int i;
    int done_pipe[2];
    squidaio_thread_t *threadp;

    if (squidaio_initialised)
    return;

    pthread_attr_init(&globattr);
#if HAVE_PTHREAD_ATTR_SETSCOPE
    pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
#endif
#if HAVE_SCHED_H
    globsched.sched_priority = 1;
#endif
    main_thread = pthread_self();
#if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
    pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
#endif
#if HAVE_SCHED_H
    globsched.sched_priority = 2;
#endif
#if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
    pthread_attr_setschedparam(&globattr, &globsched);
#endif

    /* Give each thread a smaller 256KB stack, should be more than sufficient */
    pthread_attr_setstacksize(&globattr, 256 * 1024);

    /* Initialize request queue */
    if (pthread_mutex_init(&(request_queue.mutex), NULL))
    libcore_fatalf("Failed to create mutex");
    if (pthread_cond_init(&(request_queue.cond), NULL))
    libcore_fatalf("Failed to create condition variable");
    request_queue.head = NULL;
    request_queue.tailp = &request_queue.head;
    request_queue.requests = 0;
    request_queue.blocked = 0;

    /* Initialize done queue */
    if (pthread_mutex_init(&(done_queue.mutex), NULL))
    libcore_fatalf("Failed to create mutex");
    if (pthread_cond_init(&(done_queue.cond), NULL))
    libcore_fatalf("Failed to create condition variable");
    done_queue.head = NULL;
    done_queue.tailp = &done_queue.head;
    done_queue.requests = 0;
    done_queue.blocked = 0;

    /* Initialize done pipe signal */
    pipe(done_pipe);
    done_fd = done_pipe[1];
    done_fd_read = done_pipe[0];
    fd_open(done_fd_read, FD_PIPE, "async-io completion event: main");
    fd_open(done_fd, FD_PIPE, "async-io completion event: threads");
    commSetNonBlocking(done_pipe[0]);
    commSetNonBlocking(done_pipe[1]);
    commSetCloseOnExec(done_pipe[0]);
    commSetCloseOnExec(done_pipe[1]);
    commSetSelect(done_pipe[0], COMM_SELECT_READ, squidaio_fdhandler, NULL, 0);

    /* Create threads and get them to sit in their wait loop */
    squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));

    /* Default to basing the thread count on THREAD_FACTOR and aiops_default_ndirs */
    if (squidaio_nthreads == 0) {
    int j = THREAD_FACTOR;
    for (i = 0; i < aiops_default_ndirs; i&#43;&#43;) {
        squidaio_nthreads &#43;= j;
        j = j * 2 / 3;
        if (j < 4)
        j = 4;
    }
    }
    if (squidaio_nthreads == 0)
    squidaio_nthreads = THREAD_FACTOR;
    squidaio_magic1 = squidaio_nthreads * MAGIC1_FACTOR;
    squidaio_magic2 = squidaio_nthreads * MAGIC2_FACTOR;
    for (i = 0; i < squidaio_nthreads; i&#43;&#43;) {
    threadp = memPoolAlloc(squidaio_thread_pool);
    threadp->status = _THREAD_STARTING;
    threadp->current_req = NULL;
    threadp->requests = 0;
    threadp->next = threads;
    threads = threadp;
    if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
        fprintf(stderr, &quot;Thread creation failed\n&quot;);
        threadp->status = _THREAD_FAILED;
        continue;
    }
    }

    /* Create request pool */
    squidaio_request_pool = memPoolCreate(&quot;aio_request&quot;, sizeof(squidaio_request_t));

    squidaio_initialised = 1;
}

  

  // 线程处理函数

static void *
squidaio_thread_loop(void *ptr)
{
    squidaio_thread_t *threadp = ptr;
    squidaio_request_t *request;
    sigset_t new;

    /*
     * Make sure to ignore signals which may possibly get sent to
     * the parent squid thread.  Causes havoc with mutex's and
     * condition waits otherwise
     */
    // 清空信号空间
  sigemptyset(&new);
  // 给信号空间添加新信号

    sigaddset(&new, SIGPIPE);
    sigaddset(&new, SIGCHLD);
#ifdef _SQUID_LINUX_THREADS_
    sigaddset(&new, SIGQUIT);
    sigaddset(&new, SIGTRAP);
#else
    sigaddset(&new, SIGUSR1);
    sigaddset(&new, SIGUSR2);
#endif
    sigaddset(&new, SIGHUP);
    sigaddset(&new, SIGTERM);
    sigaddset(&new, SIGINT);
  sigaddset(&new, SIGALRM);
  // 将信号掩码置阻塞

    pthread_sigmask(SIG_BLOCK, &new, NULL);

    while (1) {
    threadp->current_req = request = NULL;
    request = NULL;
    /* Get a request to process */
    threadp->status = _THREAD_WAITING;
  pthread_mutex_lock(&request_queue.mutex);
  // 循环查看请求队列,如果不是NULL。解锁,查看是否有信号请求,如果没有,进入sleep状态等待信号

    while (!request_queue.head) {
        pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
  }
  // 刚才处理的request

    request = request_queue.head;
    if (request)
  request_queue.head = request->next;
  // 处理,如果request_queue空了,那么重新初始化将request_queue的尾设成头

    if (!request_queue.head)
  request_queue.tailp = &request_queue.head;
  // 解锁,进行处理请求阶段。设置当前线程的状态。根据请求不同类型调用不同的aio处理

    pthread_mutex_unlock(&request_queue.mutex);
    /* process the request */
    threadp->status = _THREAD_BUSY;
    request->next = NULL;
    threadp->current_req = request;
    errno = 0;
    if (!request->cancelled) {
        switch (request->request_type) {
        case _AIO_OP_OPEN:
        squidaio_do_open(request);
        break;
        case _AIO_OP_READ:
        squidaio_do_read(request);
        break;
        case _AIO_OP_WRITE:
        squidaio_do_write(request);
        break;
        case _AIO_OP_CLOSE:
        squidaio_do_close(request);
        break;
        case _AIO_OP_UNLINK:
        squidaio_do_unlink(request);
        break;
        case _AIO_OP_TRUNCATE:
        squidaio_do_truncate(request);
        break;
#if AIO_OPENDIR            /* Opendir not implemented yet */
        case _AIO_OP_OPENDIR:
        squidaio_do_opendir(request);
        break;
#endif
        case _AIO_OP_STAT:
        squidaio_do_stat(request);
        break;
        default:
        request->ret = -1;
        request->err = EINVAL;
        break;
        }
    } else {        /* cancelled */
        request->ret = -1;
        request->err = EINTR;
    }
    threadp->status = _THREAD_DONE;
  /* put the request in the done queue */
  // 加锁,将此请求放入完成队列,解锁

    pthread_mutex_lock(&done_queue.mutex);
    *done_queue.tailp = request;
    done_queue.tailp = &request->next;
    pthread_mutex_unlock(&done_queue.mutex);
    if (!done_signalled) {
        done_signalled = 1;
        FD_WRITE_METHOD(done_fd, &quot;!&quot;, 1);
    }
    threadp->requests&#43;&#43;;
    }                /* while forever */
    return NULL;
  }                /* squidaio_thread_loop */
  

  // squid Aio请求队列

static void
squidaio_queue_request(squidaio_request_t * request)
{
    static int high_start = 0;
    debug(43, 9) (&quot;squidaio_queue_request: %p type=%d result=%p\n&quot;,
    request, request->request_type, request->resultp);
    /* Mark it as not executed (failing result, no error) */
    request->ret = -1;
    request->err = 0;
    /* Internal housekeeping */
    request_queue_len &#43;= 1;
    request->resultp->_data = request;
    /* Play some tricks with the request_queue2 queue */
  request->next = NULL;
  // 如果请求队列是NULL(request_queue2阻塞队列)

  if (!request_queue2.head) {
  // 并且给请求队列加锁成功的话(正常情况)

    if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
  /* Normal path */
  // 给队列的最后一个request的next赋&#20540;request
  // 然后将tail赋&#20540;为此request的next

        *request_queue.tailp = request;
          request_queue.tailp = &request->next;
  // 给阻塞等待状态发送信号,然后解锁

        pthread_cond_signal(&request_queue.cond);
        pthread_mutex_unlock(&request_queue.mutex);
    } else {
  /* Oops, the request queue is blocked, use request_queue2 */
  // 如果加锁失败,将request加入到队列2

        *request_queue2.tailp = request;
        request_queue2.tailp = &request->next;
    }
    } else {
    /* Secondary path. We have blocked requests to deal with */
  /* add the request to the chain */
  // 如果request_queue2不是空,将request加入请求队列2

    *request_queue2.tailp = request;
    if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
        /* Ok, the queue is no longer blocked */
        *request_queue.tailp = request_queue2.head;
        request_queue.tailp = &request->next;
        pthread_cond_signal(&request_queue.cond);
        pthread_mutex_unlock(&request_queue.mutex);
        request_queue2.head = NULL;
        request_queue2.tailp = &request_queue2.head;
    } else {
        /* still blocked, bump the blocked request chain */
        request_queue2.tailp = &request->next;
    }
    }
    if (request_queue2.head) {
    static int filter = 0;
    static int filter_limit = 8;
    if (&#43;&#43;filter >= filter_limit) {
        filter_limit &#43;= filter;
        filter = 0;
        debug(43, 1) (&quot;squidaio_queue_request: WARNING - Queue congestion\n&quot;);
    }
    }
  /* Warn if out of threads */
  // 如果超出线程警告

    if (request_queue_len > MAGIC1) {
    static int last_warn = 0;
    static int queue_high, queue_low;
    if (high_start == 0) {
        high_start = squid_curtime;
        queue_high = request_queue_len;
        queue_low = request_queue_len;
    }
    if (request_queue_len > queue_high)
        queue_high = request_queue_len;
    if (request_queue_len < queue_low)
        queue_low = request_queue_len;
    if (squid_curtime >= (last_warn &#43; 15) &&
        squid_curtime >= (high_start &#43; 5)) {
        debug(43, 1) (&quot;squidaio_queue_request: WARNING - Disk I/O overloading\n&quot;);
        if (squid_curtime >= (high_start &#43; 15))
        debug(43, 1) (&quot;squidaio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%ld\n&quot;,
            request_queue_len, queue_high, queue_low, (long int) (squid_curtime - high_start));
        last_warn = squid_curtime;
    }
    } else {
    high_start = 0;
    }
    /* Warn if seriously overloaded */
    if (request_queue_len > RIDICULOUS_LENGTH) {
    debug(43, 0) (&quot;squidaio_queue_request: Async request queue growing uncontrollably!\n&quot;);
    debug(43, 0) (&quot;squidaio_queue_request: Syncing pending I/O operations.. (blocking)\n&quot;);
    squidaio_sync();
    debug(43, 0) (&quot;squidaio_queue_request: Synced\n&quot;);
    }
}                /* squidaio_queue_request */


         版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-141174-1-1.html 上篇帖子: Squid实践日志 下篇帖子: Web加速-Squid反向加速配置文档
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表