|
// 初始化aio。根据配置文件创建aio磁盘io线程
void
squidaio_init(void)
{
int i;
int done_pipe[2];
squidaio_thread_t *threadp;
if (squidaio_initialised)
return;
pthread_attr_init(&globattr);
#if HAVE_PTHREAD_ATTR_SETSCOPE
pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
#endif
#if HAVE_SCHED_H
globsched.sched_priority = 1;
#endif
main_thread = pthread_self();
#if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
#endif
#if HAVE_SCHED_H
globsched.sched_priority = 2;
#endif
#if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
pthread_attr_setschedparam(&globattr, &globsched);
#endif
/* Give each thread a smaller 256KB stack, should be more than sufficient */
pthread_attr_setstacksize(&globattr, 256 * 1024);
/* Initialize request queue */
if (pthread_mutex_init(&(request_queue.mutex), NULL))
libcore_fatalf("Failed to create mutex");
if (pthread_cond_init(&(request_queue.cond), NULL))
libcore_fatalf("Failed to create condition variable");
request_queue.head = NULL;
request_queue.tailp = &request_queue.head;
request_queue.requests = 0;
request_queue.blocked = 0;
/* Initialize done queue */
if (pthread_mutex_init(&(done_queue.mutex), NULL))
libcore_fatalf("Failed to create mutex");
if (pthread_cond_init(&(done_queue.cond), NULL))
libcore_fatalf("Failed to create condition variable");
done_queue.head = NULL;
done_queue.tailp = &done_queue.head;
done_queue.requests = 0;
done_queue.blocked = 0;
/* Initialize done pipe signal */
pipe(done_pipe);
done_fd = done_pipe[1];
done_fd_read = done_pipe[0];
fd_open(done_fd_read, FD_PIPE, "async-io completion event: main");
fd_open(done_fd, FD_PIPE, "async-io completion event: threads");
commSetNonBlocking(done_pipe[0]);
commSetNonBlocking(done_pipe[1]);
commSetCloseOnExec(done_pipe[0]);
commSetCloseOnExec(done_pipe[1]);
commSetSelect(done_pipe[0], COMM_SELECT_READ, squidaio_fdhandler, NULL, 0);
/* Create threads and get them to sit in their wait loop */
squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
/* Default to basing the thread count on THREAD_FACTOR and aiops_default_ndirs */
if (squidaio_nthreads == 0) {
int j = THREAD_FACTOR;
for (i = 0; i < aiops_default_ndirs; i++) {
squidaio_nthreads += j;
j = j * 2 / 3;
if (j < 4)
j = 4;
}
}
if (squidaio_nthreads == 0)
squidaio_nthreads = THREAD_FACTOR;
squidaio_magic1 = squidaio_nthreads * MAGIC1_FACTOR;
squidaio_magic2 = squidaio_nthreads * MAGIC2_FACTOR;
for (i = 0; i < squidaio_nthreads; i++) {
threadp = memPoolAlloc(squidaio_thread_pool);
threadp->status = _THREAD_STARTING;
threadp->current_req = NULL;
threadp->requests = 0;
threadp->next = threads;
threads = threadp;
if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
fprintf(stderr, "Thread creation failed\n");
threadp->status = _THREAD_FAILED;
continue;
}
}
/* Create request pool */
squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
squidaio_initialised = 1;
}
// 线程处理函数
static void *
squidaio_thread_loop(void *ptr)
{
squidaio_thread_t *threadp = ptr;
squidaio_request_t *request;
sigset_t new;
/*
* Make sure to ignore signals which may possibly get sent to
* the parent squid thread. Causes havoc with mutex's and
* condition waits otherwise
*/
// 清空信号空间
sigemptyset(&new);
// 给信号空间添加新信号
sigaddset(&new, SIGPIPE);
sigaddset(&new, SIGCHLD);
#ifdef _SQUID_LINUX_THREADS_
sigaddset(&new, SIGQUIT);
sigaddset(&new, SIGTRAP);
#else
sigaddset(&new, SIGUSR1);
sigaddset(&new, SIGUSR2);
#endif
sigaddset(&new, SIGHUP);
sigaddset(&new, SIGTERM);
sigaddset(&new, SIGINT);
sigaddset(&new, SIGALRM);
// 将信号掩码置阻塞
pthread_sigmask(SIG_BLOCK, &new, NULL);
while (1) {
threadp->current_req = request = NULL;
request = NULL;
/* Get a request to process */
threadp->status = _THREAD_WAITING;
pthread_mutex_lock(&request_queue.mutex);
// 循环查看请求队列,如果不是NULL。解锁,查看是否有信号请求,如果没有,进入sleep状态等待信号
while (!request_queue.head) {
pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
}
// 刚才处理的request
request = request_queue.head;
if (request)
request_queue.head = request->next;
// 处理,如果request_queue空了,那么重新初始化将request_queue的尾设成头
if (!request_queue.head)
request_queue.tailp = &request_queue.head;
// 解锁,进行处理请求阶段。设置当前线程的状态。根据请求不同类型调用不同的aio处理
pthread_mutex_unlock(&request_queue.mutex);
/* process the request */
threadp->status = _THREAD_BUSY;
request->next = NULL;
threadp->current_req = request;
errno = 0;
if (!request->cancelled) {
switch (request->request_type) {
case _AIO_OP_OPEN:
squidaio_do_open(request);
break;
case _AIO_OP_READ:
squidaio_do_read(request);
break;
case _AIO_OP_WRITE:
squidaio_do_write(request);
break;
case _AIO_OP_CLOSE:
squidaio_do_close(request);
break;
case _AIO_OP_UNLINK:
squidaio_do_unlink(request);
break;
case _AIO_OP_TRUNCATE:
squidaio_do_truncate(request);
break;
#if AIO_OPENDIR /* Opendir not implemented yet */
case _AIO_OP_OPENDIR:
squidaio_do_opendir(request);
break;
#endif
case _AIO_OP_STAT:
squidaio_do_stat(request);
break;
default:
request->ret = -1;
request->err = EINVAL;
break;
}
} else { /* cancelled */
request->ret = -1;
request->err = EINTR;
}
threadp->status = _THREAD_DONE;
/* put the request in the done queue */
// 加锁,将此请求放入完成队列,解锁
pthread_mutex_lock(&done_queue.mutex);
*done_queue.tailp = request;
done_queue.tailp = &request->next;
pthread_mutex_unlock(&done_queue.mutex);
if (!done_signalled) {
done_signalled = 1;
FD_WRITE_METHOD(done_fd, "!", 1);
}
threadp->requests++;
} /* while forever */
return NULL;
} /* squidaio_thread_loop */
// squid Aio请求队列
static void
squidaio_queue_request(squidaio_request_t * request)
{
static int high_start = 0;
debug(43, 9) ("squidaio_queue_request: %p type=%d result=%p\n",
request, request->request_type, request->resultp);
/* Mark it as not executed (failing result, no error) */
request->ret = -1;
request->err = 0;
/* Internal housekeeping */
request_queue_len += 1;
request->resultp->_data = request;
/* Play some tricks with the request_queue2 queue */
request->next = NULL;
// 如果请求队列是NULL(request_queue2阻塞队列)
if (!request_queue2.head) {
// 并且给请求队列加锁成功的话(正常情况)
if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
/* Normal path */
// 给队列的最后一个request的next赋值request
// 然后将tail赋值为此request的next
*request_queue.tailp = request;
request_queue.tailp = &request->next;
// 给阻塞等待状态发送信号,然后解锁
pthread_cond_signal(&request_queue.cond);
pthread_mutex_unlock(&request_queue.mutex);
} else {
/* Oops, the request queue is blocked, use request_queue2 */
// 如果加锁失败,将request加入到队列2
*request_queue2.tailp = request;
request_queue2.tailp = &request->next;
}
} else {
/* Secondary path. We have blocked requests to deal with */
/* add the request to the chain */
// 如果request_queue2不是空,将request加入请求队列2
*request_queue2.tailp = request;
if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
/* Ok, the queue is no longer blocked */
*request_queue.tailp = request_queue2.head;
request_queue.tailp = &request->next;
pthread_cond_signal(&request_queue.cond);
pthread_mutex_unlock(&request_queue.mutex);
request_queue2.head = NULL;
request_queue2.tailp = &request_queue2.head;
} else {
/* still blocked, bump the blocked request chain */
request_queue2.tailp = &request->next;
}
}
if (request_queue2.head) {
static int filter = 0;
static int filter_limit = 8;
if (++filter >= filter_limit) {
filter_limit += filter;
filter = 0;
debug(43, 1) ("squidaio_queue_request: WARNING - Queue congestion\n");
}
}
/* Warn if out of threads */
// 如果超出线程警告
if (request_queue_len > MAGIC1) {
static int last_warn = 0;
static int queue_high, queue_low;
if (high_start == 0) {
high_start = squid_curtime;
queue_high = request_queue_len;
queue_low = request_queue_len;
}
if (request_queue_len > queue_high)
queue_high = request_queue_len;
if (request_queue_len < queue_low)
queue_low = request_queue_len;
if (squid_curtime >= (last_warn + 15) &&
squid_curtime >= (high_start + 5)) {
debug(43, 1) ("squidaio_queue_request: WARNING - Disk I/O overloading\n");
if (squid_curtime >= (high_start + 15))
debug(43, 1) ("squidaio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%ld\n",
request_queue_len, queue_high, queue_low, (long int) (squid_curtime - high_start));
last_warn = squid_curtime;
}
} else {
high_start = 0;
}
/* Warn if seriously overloaded */
if (request_queue_len > RIDICULOUS_LENGTH) {
debug(43, 0) ("squidaio_queue_request: Async request queue growing uncontrollably!\n");
debug(43, 0) ("squidaio_queue_request: Syncing pending I/O operations.. (blocking)\n");
squidaio_sync();
debug(43, 0) ("squidaio_queue_request: Synced\n");
}
} /* squidaio_queue_request */
版权声明:本文为博主原创文章,未经博主允许不得转载。 |
|