|
1. 线程的定义
1.1 线程定义在scheduler.h文件中,其定义如下所示
/* Thread itself. */
typedef struct _thread {
unsigned long id; /*identify*/
unsigned char type; /* thread type */
struct _thread *next; /* next pointer of the thread */
struct _thread *prev; /* previous pointer of the thread */
struct _thread_master *master; /* pointer to the struct thread_master. */
int (*func) (struct _thread *); /* event function */
void *arg; /* event argument */
timeval_t sands; /* rest of time sands value. */
union {
int val; /* second argument of the event. */
int fd; /* file descriptor in case of read/write. */
struct {
pid_t pid; /* process id a child thread is wanting. */
int status; /* return status of the process */
} c;
} u;
} thread_t;
1.2. 线程链表定义
/* Linked list of thread. */
typedef struct _thread_list {
thread_t *head;
thread_t *tail;
int count;
} thread_list_t;
线程类型的定义如下:
/* Thread types. */
#define THREAD_READ 0 //读线程
#define THREAD_WRITE 1 //写线程
#define THREAD_TIMER 2 //计时器线程
#define THREAD_EVENT 3 //事件线程
#define THREAD_CHILD 4 //子线程
#define THREAD_READY 5 //就绪线程
#define THREAD_UNUSED 6 //未使用线程
#define THREAD_WRITE_TIMEOUT 7 //写超时线程
#define THREAD_READ_TIMEOUT 8 //读超时线程
#define THREAD_CHILD_TIMEOUT 9 //子超时线程
#define THREAD_TERMINATE 10 //停止线程
#define THREAD_READY_FD 11
1.3.主线程定义
/* Master of the theads. */
typedef struct _thread_master {
thread_list_t read;
thread_list_t write;
thread_list_t timer;
thread_list_t child;
thread_list_t event;
thread_list_t ready;
thread_list_t unuse;
fd_set readfd;
fd_set writefd;
fd_set exceptfd;
unsigned long alloc;
} thread_master_t;
2. 线程操作
2.1 生成主线程
/* global vars */
thread_master_t *master = NULL;
/* Make thread master. */
thread_master_t *
thread_make_master(void)
{
thread_master_t *new;
new = (thread_master_t *) MALLOC(sizeof (thread_master_t));
return new;
}
2.2 销毁一个主线程
/* Stop thread scheduler. */
void
thread_destroy_master(thread_master_t * m)
{
thread_cleanup_master(m);
FREE(m);
}
//调用子函数,清空主线程的内容
/* Cleanup master */
static void
thread_cleanup_master(thread_master_t * m)
{
/* Unuse current thread lists */
thread_destroy_list(m, m->read);
thread_destroy_list(m, m->write);
thread_destroy_list(m, m->timer);
thread_destroy_list(m, m->event);
thread_destroy_list(m, m->ready);
/* Clear all FDs */
FD_ZERO(&m->readfd);
FD_ZERO(&m->writefd);
FD_ZERO(&m->exceptfd);
/* Clean garbage */
thread_clean_unuse(m);
}
//回收主线程内存
FREE(m);
2.3 增加一个简单的事件线程
/* Add simple event thread. */
thread_t *
thread_add_terminate_event(thread_master_t * m)
{
thread_t *thread;
assert(m != NULL);
thread = thread_new(m);
thread->type = THREAD_TERMINATE;
thread->id = 0;
thread->master = m;
thread->func = NULL;
thread->arg = NULL;
thread->u.val = 0;
thread_list_add(&m->event, thread);
return thread;
}
2.4 创建不同类型的线程,并加入主线程中的对应线程链表,如读线程为例介绍
/* Add new read thread. */
thread_t *
thread_add_read(thread_master_t * m, int (*func) (thread_t *)
, void *arg, int fd, long timer)
{
thread_t *thread;
assert(m != NULL);
if (FD_ISSET(fd, &m->readfd)) {
log_message(LOG_WARNING, "There is already read fd [%d]", fd);
return NULL;
}
thread = thread_new(m);
thread->type = THREAD_READ;
thread->id = 0;
thread->master = m;
thread->func = func;
thread->arg = arg;
FD_SET(fd, &m->readfd);
thread->u.fd = fd;
/* Compute read timeout value */
set_time_now();
thread->sands = timer_add_long(time_now, timer);
/* Sort the thread. */
thread_list_add_timeval(&m->read, thread);
return thread;
}
2.4.1 创建一个新的线程
/* Make new thread. */
thread_t *
thread_new(thread_master_t * m)
{
thread_t *new;
/* If one thread is already allocated return it */
if (m->unuse.head) {
new = thread_trim_head(&m->unuse);
memset(new, 0, sizeof (thread_t));
return new;
}
new = (thread_t *) MALLOC(sizeof (thread_t));
m->alloc++;
return new;
}
2.4.2 设置为读线程
thread->type = THREAD_READ;
thread->id = 0;
thread->master = m;
thread->func = func;
thread->arg = arg;
FD_SET(fd, &m->readfd);
thread->u.fd = fd;
2.4.3 根据超时时间将读进程加入读进程列表中
/* Add a thread in the list sorted by timeval */
void
thread_list_add_timeval(thread_list_t * list, thread_t * thread)
{
thread_t *tt;
for (tt = list->head; tt; tt = tt->next) {
if (timer_cmp(thread->sands, tt->sands) <= 0)
break;
}
if (tt)
thread_list_add_before(list, tt, thread);
else
thread_list_add(list, thread);
}
2.5 取消线程,从对应类型的线程列表中去除该线程,将它设置为unused类型,并加入unused线程链表。
/* Cancel thread from scheduler. */
void
thread_cancel(thread_t * thread)
{
switch (thread->type) {
case THREAD_READ:
assert(FD_ISSET(thread->u.fd, &thread->master->readfd));
FD_CLR(thread->u.fd, &thread->master->readfd);
thread_list_delete(&thread->master->read, thread);
break;
case THREAD_WRITE:
assert(FD_ISSET(thread->u.fd, &thread->master->writefd));
FD_CLR(thread->u.fd, &thread->master->writefd);
thread_list_delete(&thread->master->write, thread);
break;
case THREAD_TIMER:
thread_list_delete(&thread->master->timer, thread);
break;
case THREAD_CHILD:
/* Does this need to kill the child, or is that the
* caller's job?
* This function is currently unused, so leave it for now.
*/
thread_list_delete(&thread->master->child, thread);
break;
case THREAD_EVENT:
thread_list_delete(&thread->master->event, thread);
break;
case THREAD_READY:
case THREAD_READY_FD:
thread_list_delete(&thread->master->ready, thread);
break;
default:
break;
}
thread->type = THREAD_UNUSED;
thread_add_unuse(thread->master, thread);
}
2.6 获取下一个就绪进程
/* Fetch next ready thread. */
thread_t *
thread_fetch(thread_master_t * m, thread_t * fetch)
{
int ret, old_errno;
thread_t *thread;
fd_set readfd;
fd_set writefd;
fd_set exceptfd;
timeval_t timer_wait;
int signal_fd;
#ifdef _WITH_SNMP_
timeval_t snmp_timer_wait;
int snmpblock = 0;
int fdsetsize;
#endif
assert(m != NULL);
/* Timer initialization */
memset(&timer_wait, 0, sizeof (timeval_t));
retry: /* When thread can't fetch try to find next thread again. */
/* If there is event process it first. */
while ((thread = thread_trim_head(&m->event))) {
*fetch = *thread;
/* If daemon hanging event is received return NULL pointer */
if (thread->type == THREAD_TERMINATE) {
thread->type = THREAD_UNUSED;
thread_add_unuse(m, thread);
return NULL;
}
thread->type = THREAD_UNUSED;
thread_add_unuse(m, thread);
return fetch;
}
/* If there is ready threads process them */
while ((thread = thread_trim_head(&m->ready))) {
*fetch = *thread;
thread->type = THREAD_UNUSED;
thread_add_unuse(m, thread);
return fetch;
}
/*
* Re-read the current time to get the maximum accuracy.
* Calculate select wait timer. Take care of timeouted fd.
*/
set_time_now();
thread_compute_timer(m, &timer_wait);
/* Call select function. */
readfd = m->readfd;
writefd = m->writefd;
exceptfd = m->exceptfd;
signal_fd = signal_rfd();
FD_SET(signal_fd, &readfd);
#ifdef _WITH_SNMP_
/* When SNMP is enabled, we may have to select() on additional
* FD. snmp_select_info() will add them to `readfd'. The trick
* with this function is its last argument. We need to set it
* to 0 and we need to use the provided new timer only if it
* is still set to 0. */
fdsetsize = FD_SETSIZE;
snmpblock = 0;
memcpy(&snmp_timer_wait, &timer_wait, sizeof(timeval_t));
snmp_select_info(&fdsetsize, &readfd, &snmp_timer_wait, &snmpblock);
if (snmpblock == 0)
memcpy(&timer_wait, &snmp_timer_wait, sizeof(timeval_t));
#endif
ret = select(FD_SETSIZE, &readfd, &writefd, &exceptfd, &timer_wait);
/* we have to save errno here because the next syscalls will set it */
old_errno = errno;
/* Handle SNMP stuff */
#ifdef _WITH_SNMP_
if (ret > 0)
snmp_read(&readfd);
else if (ret == 0)
snmp_timeout();
#endif
/* handle signals synchronously, including child reaping */
if (FD_ISSET(signal_fd, &readfd))
signal_run_callback();
/* Update current time */
set_time_now();
if (ret < 0) {
if (old_errno == EINTR)
goto retry;
/* Real error. */
DBG("select error: %s", strerror(old_errno));
assert(0);
}
/* Timeout children */
thread = m->child.head;
while (thread) {
thread_t *t;
t = thread;
thread = t->next;
if (timer_cmp(time_now, t->sands) >= 0) {
thread_list_delete(&m->child, t);
thread_list_add(&m->ready, t);
t->type = THREAD_CHILD_TIMEOUT;
}
}
/* Read thead. */
thread = m->read.head;
while (thread) {
thread_t *t;
t = thread;
thread = t->next;
if (FD_ISSET(t->u.fd, &readfd)) {
assert(FD_ISSET(t->u.fd, &m->readfd));
FD_CLR(t->u.fd, &m->readfd);
thread_list_delete(&m->read, t);
thread_list_add(&m->ready, t);
t->type = THREAD_READY_FD;
} else {
if (timer_cmp(time_now, t->sands) >= 0) {
FD_CLR(t->u.fd, &m->readfd);
thread_list_delete(&m->read, t);
thread_list_add(&m->ready, t);
t->type = THREAD_READ_TIMEOUT;
}
}
}
/* Write thead. */
thread = m->write.head;
while (thread) {
thread_t *t;
t = thread;
thread = t->next;
if (FD_ISSET(t->u.fd, &writefd)) {
assert(FD_ISSET(t->u.fd, &writefd));
FD_CLR(t->u.fd, &m->writefd);
thread_list_delete(&m->write, t);
thread_list_add(&m->ready, t);
t->type = THREAD_READY_FD;
} else {
if (timer_cmp(time_now, t->sands) >= 0) {
FD_CLR(t->u.fd, &m->writefd);
thread_list_delete(&m->write, t);
thread_list_add(&m->ready, t);
t->type = THREAD_WRITE_TIMEOUT;
}
}
}
/* Exception thead. */
/*... */
/* Timer update. */
thread = m->timer.head;
while (thread) {
thread_t *t;
t = thread;
thread = t->next;
if (timer_cmp(time_now, t->sands) >= 0) {
thread_list_delete(&m->timer, t);
thread_list_add(&m->ready, t);
t->type = THREAD_READY;
}
}
/* Return one event. */
thread = thread_trim_head(&m->ready);
#ifdef _WITH_SNMP_
run_alarms();
netsnmp_check_outstanding_agent_requests();
#endif
/* There is no ready thread. */
if (!thread)
goto retry;
*fetch = *thread;
thread->type = THREAD_UNUSED;
thread_add_unuse(m, thread);
return fetch;
}
2.7 子线程处理,便利子线程链表取出子线程,并放入就绪线程链表。
/* Synchronous signal handler to reap child processes */
void
thread_child_handler(void * v, int sig)
{
thread_master_t * m = v;
/*
* This is O(n^2), but there will only be a few entries on
* this list.
*/
thread_t *thread;
pid_t pid;
int status = 77;
while ((pid = waitpid(-1, &status, WNOHANG))) {
if (pid == -1) {
if (errno == ECHILD)
return;
DBG("waitpid error: %s", strerror(errno));
assert(0);
} else {
thread = m->child.head;
while (thread) {
thread_t *t;
t = thread;
thread = t->next;
if (pid == t->u.c.pid) {
thread_list_delete(&m->child, t);
thread_list_add(&m->ready, t);
t->u.c.status = status;
t->type = THREAD_READY;
break;
}
}
}
}
}
2.8 线程调用
/* Call thread ! */
void
thread_call(thread_t * thread)
{
thread->id = thread_get_id();
(*thread->func) (thread);
}
2.9 启动调度器
/* Our infinite scheduling loop */
void
launch_scheduler(void)
{
thread_t thread;
signal_set(SIGCHLD, thread_child_handler, master);
/*
* Processing the master thread queues,
* return and execute one ready thread.
*/
while (thread_fetch(master, &thread)) {
/* Run until error, used for debuging only */
#ifdef _DEBUG_
if ((debug & 520) == 520) {
debug &= ~520;
thread_add_terminate_event(master);
}
#endif
thread_call(&thread);
}
}
|
|