设为首页 收藏本站
查看: 1219|回复: 0

[经验分享] Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实现

[复制链接]

尚未签到

发表于 2017-2-23 08:08:49 | 显示全部楼层 |阅读模式
  声明:本文为原创博文,转载请注明出处。
  Nodejs编程是全异步的,这就意味着我们不必每次都阻塞等待该次操作的结果,而事件完成(就绪)时会主动回调通知我们。在网络编程中,一般都是基于Reactor线程模型的变种,无论其怎么演化,其核心组件都包含了Reactor实例(提供事件注册、注销、通知功能)、多路复用器(由操作系统提供,比如kqueue、select、epoll等)、事件处理器(负责事件的处理)以及事件源(linux中这就是描述符)这四个组件。一般,会单独启动一个线程运行Reactor实例来实现真正的异步操作。但是,依赖操作系统提供的系统调用来实现异步是有局限的,比如在Reactor模型中我们只能监听到:网络IO事件、signel(信号)、超时事件以及一些管道事件等,但这些事件也只是通知我们资源可读或者可写,真正的读写操作(read和write)还是同步的(也就是你必须等到read或者write返回,虽然linux提供了aio,但是其有诸多槽点),那么Nodejs的全异步是如何做到的呢?你可能会很快想到,就是启用单独的线程来做同步的事情,这也是libuv的设计思路,借用官网的一张图,说明一切:
DSC0000.png

  由上图可以看到,libuv实现了一套自己的线程池来处理所有同步操作(从而模拟出异步的效果),下面就来看一下该线程池的具体实现吧!
  一、线程池模型
  说道线程池,在java领域中,jdk本身就提供了多种线程池实现,几乎所有的线程池都遵循以下模型(任务队列+线程池):
DSC0001.png

  libuv自身定义了一个非常精炼、高效的队列(双向循环链表),只用了几个简单的宏定义将其实现,具体实现方式可以参见我的另一篇博文:libuv高效队列的实现。现在队列有了,来看一下task的定义:



struct uv__work {
   void (*work)(struct uv__work *w);
   void (*done)(struct uv__work *w, int status);
   struct uv_loop_s* loop;
   void* wq[2];
};
  uv__work就代表一个task,可以看到里面有两个函数指针(work代表任务实际操作,done用于对任务进行状态确认)。wq成员就是一个QUEUE的节点,  uv__work就是通过wq与其他  uv__work连接成一个队列。
  下面来看一下threadpool的初始化,代码如下:



#define MAX_THREADPOOL_SIZE 128
static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond;
static uv_mutex_t mutex;
static unsigned int idle_threads;//当前空闲的线程数
static unsigned int nthreads;
static uv_thread_t* threads;
static uv_thread_t default_threads[4];
static QUEUE exit_message;
static QUEUE wq;//线程池全部会检查这个queue,一旦发现有任务就执行,但是只能有一个线程抢占到
static volatile int initialized;

static void init_once(void) {
   unsigned int i;
   const char* val;
   // 线程池中的线程数,默认值为4
   nthreads = ARRAY_SIZE(default_threads);
   val = getenv("UV_THREADPOOL_SIZE");
   if (val != NULL)
     nthreads = atoi(val);
   if (nthreads == 0)
     nthreads = 1;
   if (nthreads > MAX_THREADPOOL_SIZE)
     nthreads = MAX_THREADPOOL_SIZE;

   threads = default_threads;
   if (nthreads > ARRAY_SIZE(default_threads)) {
     // 分配线程句柄
     threads = uv__malloc(nthreads * sizeof(threads[0]));
     if (threads == NULL) {
       nthreads = ARRAY_SIZE(default_threads);
       threads = default_threads;
     }
   }
   // 初始化条件变量
   if (uv_cond_init(&cond))
     abort();
   // 初始化互斥锁
   if (uv_mutex_init(&mutex))
     abort();

   // 初始化任务队列
   QUEUE_INIT(&wq);

   // 创建nthreads个线程
   for (i = 0; i < nthreads; i++)
     if (uv_thread_create(threads + i, worker, NULL))
       abort();

   initialized = 1;
}
  上面的代码中,一共创建了nthreads个线程,那么每个线程的执行代码是什么呢?由线程创建代码:uv_thread_create(threads + i, worker, NULL),可以看到,每一个线程都是执行worker函数,下面看看worker函数都在做什么:



/* To avoid deadlock with uv_cancel() it's crucial that the worker
  * never holds the global mutex and the loop-local mutex at the same time.
  */
static void worker(void* arg) {
   struct uv__work* w;
   QUEUE* q;

   (void) arg;

   for (;;) {
     // 因为是多线程访问,因此需要加锁同步
     uv_mutex_lock(&mutex);

     // 如果任务队列是空的
     while (QUEUE_EMPTY(&wq)) {
       // 空闲线程数加1
       idle_threads += 1;
       // 等待条件变量
       uv_cond_wait(&cond, &mutex);
       // 被唤醒之后,说明有任务被post到队列,因此空闲线程数需要减1
       idle_threads -= 1;
     }
     
     // 取出队列的头部节点(第一个task)
     q = QUEUE_HEAD(&wq);

     if (q == &exit_message)
       uv_cond_signal(&cond);
     else {
       // 从队列中移除这个task
       QUEUE_REMOVE(q);
       QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is
                              executing. */
     }

     uv_mutex_unlock(&mutex);

     if (q == &exit_message)
       break;

     // 取出uv__work首地址
     w = QUEUE_DATA(q, struct uv__work, wq);
     // 调用task的work,执行任务
     w->work(w);

     uv_mutex_lock(&w->loop->wq_mutex);
     w->work = NULL;  /* Signal uv_cancel() that the work req is done
                         executing. */
     QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
     uv_async_send(&w->loop->wq_async);
     uv_mutex_unlock(&w->loop->wq_mutex);
   }
}
  可以看到,多个线程都会在worker方法中等待在conn条件变量上,一旦有任务加入队列,线程就会被唤醒,然后只有一个线程会得到任务的执行权,其他的线程只能继续等待。
  那么如何向队列提交一个task呢?看以下代码:



void uv__work_submit(uv_loop_t* loop,
                      struct uv__work* w,
                      void (*work)(struct uv__work* w),
                      void (*done)(struct uv__work* w, int status)) {
   uv_once(&once, init_once);
   // 构造一个task
   w->loop = loop;
   w->work = work;
   w->done = done;
   // 将其插入任务队列
   post(&w->wq);
}
  接着看post做了什么:



static void post(QUEUE* q) {
   // 同步队列操作
   uv_mutex_lock(&mutex);
   // 将task插入队列尾部
   QUEUE_INSERT_TAIL(&wq, q);
   // 如果当前有空闲线程,就向条件变量发送信号
   if (idle_threads > 0)
     uv_cond_signal(&cond);
   uv_mutex_unlock(&mutex);
}
  有提交任务,就肯定会有取消一个任务的操作,是的,他就是uv__work_cancel,代码如下:



static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
   int cancelled;

   uv_mutex_lock(&mutex);
   uv_mutex_lock(&w->loop->wq_mutex);
   
   // 只有当前队列不为空并且要取消的uv__work有效时才会继续执行
   cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
   if (cancelled)
     QUEUE_REMOVE(&w->wq);// 从队列中移除task
   uv_mutex_unlock(&w->loop->wq_mutex);
   uv_mutex_unlock(&mutex);

   if (!cancelled)
     return UV_EBUSY;

   // 更新这个task的状态
   w->work = uv__cancelled;
   uv_mutex_lock(&loop->wq_mutex);
   QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
   uv_async_send(&loop->wq_async);
   uv_mutex_unlock(&loop->wq_mutex);

   return 0;
}
  至此,一个线程池的组成以及实现原理都说完了,可以看到,libuv几乎是用了最少的代码完成了高效的线程池,这对于我们平时写代码时具有很好的借鉴意义,文中涉及到uv_req_t以及uv_loop_t等结构我都直接跳过,因为这牵扯到libuv的其他组件,我将在以后的源码剖析中逐步阐述,谢谢你能看到这里。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-345866-1-1.html 上篇帖子: 前端MVC学习总结(四)——NodeJS+MongoDB+AngularJS+Bootstrap书店示例 下篇帖子: Nodejs之MEAN栈开发(五)---- Angular入门与页面改造
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表