设为首页 收藏本站
查看: 1369|回复: 0

[经验分享] Golang网络库中socket阻塞调度源码剖析

[复制链接]

尚未签到

发表于 2018-9-20 09:14:31 | 显示全部楼层 |阅读模式
  本文分析了Golang的socket文件描述符和goroutine阻塞调度的原理。代码中大部分是Go代码,小部分是汇编代码。完整理解本文需要Go语言知识,并且用Golang写过网络程序。更重要的是,需要提前理解goroutine的调度原理。

1. TCP的连接对象:

连接对象:
  在net.go中有一个名为Conn的接口,提供了对于连接的读写和其他操作:
  

type Conn interface {  
Read(b []byte) (n int, err error)
  
Write(b []byte) (n int, err error)
  
Close() error
  
LocalAddr() Addr
  
RemoteAddr() Addr
  
SetReadDeadline(t time.Time) error
  
SetWriteDeadline(t time.Time) error
  
}
  

  这个接口就是对下面的结构体conn的抽象。conn结构体包含了对连接的读写和其他操作:
  

type conn struct {  
fd *netFD
  
}
  

从连接读取数据:
  

// Read implements the Conn Read method.  
func (c *conn) Read(b []byte) (int, error) {
  
if !c.ok() {
  
return 0, syscall.EINVAL
  
}
  
return c.fd.Read(b)
  
}
  

向连接写入数据:
  

// Write implements the Conn Write method.  
func (c *conn) Write(b []byte) (int, error) {
  
if !c.ok() {
  
return 0, syscall.EINVAL
  
}
  
return c.fd.Write(b)
  
}
  

关闭连接:
  

// Close closes the connection.  
func (c *conn) Close() error {
  
if !c.ok() {
  
return syscall.EINVAL
  
}
  
return c.fd.Close()
  
}
  

设置读写超时:
  

// SetDeadline implements the Conn SetDeadline method.  
func (c *conn) SetDeadline(t time.Time) error {
  
if !c.ok() {
  
return syscall.EINVAL
  
}
  
return c.fd.setDeadline(t)
  
}
  
// SetReadDeadline implements the Conn SetReadDeadline method.
  
func (c *conn) SetReadDeadline(t time.Time) error {
  
if !c.ok() {
  
return syscall.EINVAL
  
}
  
return c.fd.setReadDeadline(t)
  
}
  
// SetWriteDeadline implements the Conn SetWriteDeadline method.
  
func (c *conn) SetWriteDeadline(t time.Time) error {
  
if !c.ok() {
  
return syscall.EINVAL
  
}
  
return c.fd.setWriteDeadline(t)
  
}
  

可以看到,对连接的所有操作,都体现在对*netFD的操作上。我们继续跟踪c.fd.Read()函数.

2.文件描述符
  net/fd_unix.go:

网络连接的文件描述符:
  

// Network file descriptor.  
type netFD struct {
  
// locking/lifetime of sysfd + serialize access to Read and Write methods
  
fdmu fdMutex
  
// immutable until Close
  
sysfd       int
  
family      int
  
sotype      int
  
isConnected bool
  
net         string
  
laddr       Addr
  
raddr       Addr
  
// wait server
  
pd pollDesc
  
}
  

文件描述符读取数据:
  

func (fd *netFD) Read(p []byte) (n int, err error) {  
if err := fd.readLock(); err != nil {
  
return 0, err
  
}
  
defer fd.readUnlock()
  
if err := fd.pd.PrepareRead(); err != nil {
  
return 0, &OpError{"read", fd.net, fd.raddr, err}
  
}
  
// 调用system call,循环从fd.sysfd读取数据
  
for {
  
// 系统调用Read读取数据
  
n, err = syscall.Read(int(fd.sysfd), p)
  
// 如果发生错误,则需要处理
  
// 并且只处理EAGAIN类型的错误,其他错误一律返回给调用者
  
if err != nil {
  
n = 0
  
// 对于非阻塞的网络连接的文件描述符,如果错误是EAGAIN
  
// 说明Socket的缓冲区为空,未读取到任何数据
  
// 则调用fd.pd.WaitRead,
  
if err == syscall.EAGAIN {
  
if err = fd.pd.WaitRead(); err == nil {
  
continue
  
}
  
}
  
}
  
err = chkReadErr(n, err, fd)
  
break
  
}
  
if err != nil && err != io.EOF {
  
err = &OpError{"read", fd.net, fd.raddr, err}
  
}
  
return
  
}
  

网络轮询器
  网络轮询器是Golang中针对每个socket文件描述符建立的轮询机制。 此处的轮询并不是一般意义上的轮询,而是Golang的runtime在调度goroutine或者GC完成之后或者指定时间之内,调用epoll_wait获取所有产生IO事件的socket文件描述符。当然在runtime轮询之前,需要将socket文件描述符和当前goroutine的相关信息加入epoll维护的数据结构中,并挂起当前goroutine,当IO就绪后,通过epoll返回的文件描述符和其中附带的goroutine的信息,重新恢复当前goroutine的执行。
  

// Integrated network poller (platform-independent part).  
// 网络轮询器(平台独立部分)
  
// A particular implementation (epoll/kqueue) must define the following functions:
  
// 实际的实现(epoll/kqueue)必须定义以下函数:
  
// func netpollinit()           // to initialize the poller,初始化轮询器
  
// func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications, 为fd和pd启动边缘触发通知
  
// and associate fd with pd.
  
// 一个实现必须调用下面的函数,用来指示pd已经准备好
  
// An implementation must call the following function to denote that the pd is ready.
  
// func netpollready(gpp **g, pd *pollDesc, mode int32)
  
// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
  
// goroutines respectively. The semaphore can be in the following states:
  
// pollDesc包含了2个二进制的信号,分别负责读写goroutine的暂停.
  
// 信号可能处于下面的状态:
  
// pdReady - IO就绪通知被挂起;
  
//           一个goroutine将次状态置为nil来消费一个通知。
  
// pdReady - io readiness notification is pending;
  
//           a goroutine consumes the notification by changing the state to nil.
  
// pdWait - 一个goroutine准备暂停在信号上,但是还没有完成暂停。
  
// 这个goroutine通过把这个状态改变为G指针去提交这个暂停动作。
  
// 或者,替代性的,并行的其他通知将状态改变为READY.
  
// 或者,替代性的,并行的超时/关闭会将次状态变为nil
  
// pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
  
//          the goroutine commits to park by changing the state to G pointer,

  
//          or,>
  
//          or,>  
// G指针 - 阻塞在信号上的goroutine
  
// IO通知或者超时/关闭会分别将此状态置为READY或者nil.
  
// G pointer - the goroutine is blocked on the semaphore;
  
//             io notification or timeout/close changes the state to READY or nil respectively
  
//             and unparks the goroutine.
  
// nil - nothing of the above.
  
const (
  
pdReady uintptr = 1
  
pdWait  uintptr = 2
  
)
  

  网络轮询器的数据结构如下:
  

// Network poller descriptor.  
// 网络轮询器描述符
  
type pollDesc struct {
  
link *pollDesc // in pollcache, protected by pollcache.lock
  
// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
  
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
  
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
  
// proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
  
// in a lock-free way by all operations.
  
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
  
// that will blow up when GC starts moving objects.
  
//
  
// lock锁对象保护了pollOpen, pollSetDeadline, pollUnblock和deadlineimpl操作。
  
// 而这些操作又完全包含了对seq, rt, tw变量。
  
// fd在PollDesc整个生命过程中都是一个常量。
  
// 处理pollReset, pollWait, pollWaitCanceled和runtime.netpollready(IO就绪通知)不需要用到锁。
  
// 所以closing, rg, rd, wg和wd的所有操作都是一个无锁的操作。
  
lock    mutex // protectes the following fields
  
fd      uintptr
  
closing bool
  
seq     uintptr        // protects from stale timers and ready notifications
  
rg      uintptr        // pdReady, pdWait, G waiting for read or nil
  
rt      timer          // read deadline timer (set if rt.f != nil)
  
rd      int64          // read deadline
  
wg      uintptr        // pdReady, pdWait, G waiting for write or nil
  
wt      timer          // write deadline timer
  
wd      int64          // write deadline
  
user    unsafe.Pointer // user settable cookie
  
}
  

将当前goroutine设置为阻塞在fd上:
  pd.WaitRead():
  

func (pd *pollDesc) WaitRead() error {  
return pd.Wait('r')
  
}
  
func (pd *pollDesc) Wait(mode int) error {
  
res := runtime_pollWait(pd.runtimeCtx, mode)
  
return convertErr(res)
  
}
  

  res是runtime_pollWait函数返回的结果,由conevertErr函数包装后返回:
  

func convertErr(res int) error {  
switch res {
  
case 0:
  
return nil
  
case 1:
  
return errClosing
  
case 2:
  
return errTimeout
  
}
  
println("unreachable: ", res)
  
panic("unreachable")
  
}
  


  • 函数返回0,表示IO已经准备好,返回nil。
  • 返回1,说明连接已关闭,应该放回errClosing。
  • 返回2,说明对IO进行的操作发生超时,应该返回errTimeout。
  runtime_pollWait会调用runtime/thunk.s中的函数:
  

TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0  
JMP runtime·netpollWait(SB)
  

  这是一个包装函数,没有参数,直接跳转到runtime/netpoll.go中的函数netpollWait:
  

func netpollWait(pd *pollDesc, mode int) int {  
// 检查pd的状态是否异常
  
err := netpollcheckerr(pd, int32(mode))
  
if err != 0 {
  
return err
  
}
  
// As for now only Solaris uses level-triggered IO.
  
if GOOS == "solaris" {
  
onM(func() {
  
netpollarm(pd, mode)
  
})
  
}
  
// 循环中检查pd的状态是不是已经被设置为pdReady
  
// 即检查IO是不是已经就绪
  
for !netpollblock(pd, int32(mode), false) {
  
err = netpollcheckerr(pd, int32(mode))
  
if err != 0 {
  
return err
  
}
  
// Can happen if timeout has fired and unblocked us,
  
// but before we had a chance to run, timeout has been reset.
  
// Pretend it has not happened and retry.
  
}
  
return 0
  
}
  

  netpollcheckerr函数检查pd是否出现异常:
  

  
// 检查pd的异常
  
func netpollcheckerr(pd *pollDesc, mode int32) int {
  
// 是否已经关闭
  
if pd.closing {
  
return 1 // errClosing
  
}
  
// 当读写状态下,deadline小于0,表示pd已经过了超时时间
  
if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
  
return 2 // errTimeout
  
}
  
// 正常情况返回0
  
return 0
  
}
  

  netpollblock():
  

// returns true if IO is ready, or false if timedout or closed  
// waitio - wait only for completed IO, ignore errors
  
// 这个函数被netpollWait循环调用
  
// 返回true说明IO已经准备好,返回false说明IO操作已经超时或者已经关闭
  
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
  
// 获取pd的rg
  
gpp := &pd.rg
  
// 如果模式是w,则获取pd的wg
  
if mode == 'w' {
  
gpp = &pd.wg
  
}
  
// set the gpp semaphore to WAIT
  
// 在循环中设置pd的gpp为pdWait
  
// 因为casuintptr是自旋锁,所以需要在循环中调用
  
for {
  
// 如果在循环中发现IO已经准备好(pg的rg或者wg为pdReady状态)
  
// 则设置rg/wg为0,返回true
  
old := *gpp
  
if old == pdReady {
  
*gpp = 0
  
return true
  
}
  
// 每次netpollblock执行完毕之后,gpp重置为0
  
// 非0表示重复wait
  
if old != 0 {
  
gothrow("netpollblock: double wait")
  
}
  
// CAS操作改变gpp为pdWait
  
if casuintptr(gpp, 0, pdWait) {
  
break
  
}
  
}
  
// need to recheck error states after setting gpp to WAIT
  
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
  
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
  
//
  
// 当设置gpp为pdWait状态后,重新检查gpp的状态
  
// 这是必要的,因为runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl会做相反的操作
  
// 如果状态正常则挂起当前的goroutine
  
//
  
// 当netpollcheckerr检查io出现超时或者错误,waitio为true可用于等待ioReady
  
// 否则当waitio为false, 且io不出现错误或者超时才会挂起当前goroutine
  
if waitio || netpollcheckerr(pd, mode) == 0 {
  
// 解锁函数,设置gpp为pdWait,如果设置不成功
  
// 说明已经是发生其他事件,可以让g继续运行,而不是挂起当前g
  
f := netpollblockcommit
  
// 尝试挂起当前g
  
gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
  
}
  
// be careful to not lose concurrent READY notification
  
old := xchguintptr(gpp, 0)
  
if old > pdWait {
  
gothrow("netpollblock: corrupted state")
  
}
  
return old == pdReady
  
}
  

  runtime/proc.go: gopark():
  

// Puts the current goroutine into a waiting state and calls unlockf.  
// If unlockf returns false, the goroutine is resumed.
  
// 将当前goroutine置为waiting状态,然后调用unlockf
  
func gopark(unlockf unsafe.Pointer, lock unsafe.Pointer, reason string) {
  
// 获取当前M
  
mp := acquirem()
  
// 获取当前G
  
gp := mp.curg
  
// 获取G的状态
  
status := readgstatus(gp)
  
// 如果不是_Grunning或者_Gscanrunning,则报错
  
if status != _Grunning && status != _Gscanrunning {
  
gothrow("gopark: bad g status")
  
}
  
// 设置lock和unlockf
  
mp.waitlock = lock
  
mp.waitunlockf = unlockf
  
gp.waitreason = reason
  
releasem(mp)
  
// can't do anything that might move the G between Ms here.
  
// 在m->g0这个栈上调用park_m,而不是当前g的栈
  
mcall(park_m)
  
}
  

  mcall函数是一段汇编,在m->g0的栈上调用park_m,而不是在当前goroutine的栈上。mcall的功能分两部分,第一部分保存当前G的PC/SP到G的gobuf的pc/sp字段,第二部分调用park_m函数:
  

// func mcall(fn func(*g))  
// Switch to m->g0's stack, call fn(g).
  
// Fn must never return.  It should gogo(&g->sched)
  
// to keep running g.
  
TEXT runtime·mcall(SB), NOSPLIT, $0-8
  
// 将需要执行的函数保存在DI
  
MOVQ    fn+0(FP), DI
  
// 将M的TLS存放在CX
  
get_tls(CX)
  
// 将G对象存放在AX
  
MOVQ    g(CX), AX   // save state in g->sched
  
// 将调用者的PC存放在BX
  
MOVQ    0(SP), BX   // caller's PC
  
// 将调用者的PC保存到g->sched.pc
  
MOVQ    BX, (g_sched+gobuf_pc)(AX)
  
// 第一个参数的地址,即栈顶的地址,保存到BX
  
LEAQ    fn+0(FP), BX    // caller's SP
  
// 保存SP的地址到g->sched.sp
  
MOVQ    BX, (g_sched+gobuf_sp)(AX)
  
// 将g对象保存到g->sched->g
  
MOVQ    AX, (g_sched+gobuf_g)(AX)
  
// switch to m->g0 & its stack, call fn
  
// 将g对象指针保存到BX
  
MOVQ    g(CX), BX
  
// 将g->m保存到BX
  
MOVQ    g_m(BX), BX
  
// 将m->g0保存到SI
  
MOVQ    m_g0(BX), SI
  
CMPQ    SI, AX  // if g == m->g0 call badmcall
  
JNE 3(PC)
  
MOVQ    $runtime·badmcall(SB), AX
  
JMP AX
  
// 将m->g0保存到g
  
MOVQ    SI, g(CX)   // g = m->g0
  
// 将g->sched.sp恢复到SP寄存器
  
// 即使用g0的栈
  
MOVQ    (g_sched+gobuf_sp)(SI), SP  // sp = m->g0->sched.sp
  
// AX进栈
  
PUSHQ   AX
  
MOVQ    DI, DX
  
// 将fn的地址复制到DI
  
MOVQ    0(DI), DI
  
// 调用函数
  
CALL    DI
  
// AX出栈
  
POPQ    AX
  
MOVQ    $runtime·badmcall2(SB), AX
  
JMP AX
  
RET
  

  park_m函数的功能分为三部分,第一部分让当前G和当前M脱离关系,第二部分是调用解锁函数,这里是调用netpoll.go源文件中的netpollblockcommit函数:
  

// runtime·park continuation on g0.  
void
  
runtime·park_m(G *gp)
  
{
  
bool ok;
  
// 设置当前g为Gwaiting状态
  
runtime·casgstatus(gp, Grunning, Gwaiting);
  
// 让当前g和m脱离关系
  
dropg();
  
if(g->m->waitunlockf) {
  
ok = g->m->waitunlockf(gp, g->m->waitlock);
  
g->m->waitunlockf = nil;
  
g->m->waitlock = nil;
  
// 返回0为false,非0为true
  
// 0说明g->m->waitlock发生了变化,即不是在gopark是设置的(pdWait)
  
// 说明了脱离了WAIT状态,应该设置为Grunnable,并执行g
  
if(!ok) {
  
runtime·casgstatus(gp, Gwaiting, Grunnable);
  
execute(gp);  // Schedule it back, never returns.
  
}
  
}
  
// 这里是调度当前m继续执行其他g
  
// 而不是上面执行execute
  
schedule();
  
}
  

  netpollblockcommit函数,设置gpp为pdWait,设置成功返回1,否则返回0。1为true,0为false:
  

func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {  
return casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
  
}
  

  到这里当前goroutine对socket文件描述符的等待IO继续的行为已经完成。过程中首先尽早尝试判断IO是否已经就绪,如果未就绪则挂起当前goroutine,挂起之后再次判断IO是否就绪,如果还未就绪则调度当前M运行其他G。如果是在调度goroutine之前IO已经就绪,则不会使当前goroutine进入调度队列,会直接运行刚才挂起的G。否则当前goroutine会进入调度队列。
  接下来是等待runtime将其唤醒。runtime在执行findrunnablequeue、starttheworld,sysmon函数时,都会调用netpoll_epoll.go中的netpoll函数,寻找到IO就绪的socket文件描述符,并找到这些socket文件描述符对应的轮询器中附带的信息,根据这些信息将之前等待这些socket文件描述符就绪的goroutine状态修改为Grunnable。在以上函数中,执行完netpoll之后,会找到一个就绪的goroutine列表,接下来将就绪的goroutine加入到调度队列中,等待调度运行。
  在netpoll_epoll.go中的netpoll函数中,epoll_wait函数返回N个发生事件的文件描述符对应的epollevent,接着对于每个event使用其data属性,将event.data转换为*pollDesc类型,再调用netpoll.go中的netpollready函数,将*pollDesc类型中的G数据类型去除,并附加到netpoll函数的调用者传递的G链表中:
  

// 将ev.data转换为*pollDesc类型  
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
  
// 调用netpollready将取出pd中保存的G,并添加到链表中
  
netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
  

  所以runtime在执行findrunnablequeue、starttheworld,sysmon函数中会执行netpoll函数,并返回N个goroutine。这些goroutine期待的网络事件已经发生,runtime会将这些goroutine放入到当前P的可运行队列中,接下来调度它们并运行。
  http://ju.outofmemory.cn/entry/168649



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-598646-1-1.html 上篇帖子: golang中ip地址int string互转 下篇帖子: golang动态调用方法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表