Go net库解析
Go net
库的优势
网络编程
是 Go
日常开发中一个很大的优势,也是目前Go
应用最多的领域之一,Go
团队设计Go
的初衷就是解决当下主流编程语言的痛点,而在当下一个比较明显的痛点就是在网络编程中尤其是异步和并发网络编程中往往不能在开发简易程度和运行性能之间有一个好的平衡点。c
和c++
运行效率足够但对程序员的心智要求很高,而像Python
的运行效率又不尽人意。而在Go
中借助goroutine
以及Go
强大的runtime
调度机制可以让开发人员以同步
的逻辑来开发异步高效
的程序,把沉重繁琐的异步逻辑封装在runtime
里,对开发人员暴露同步编程的模型,在降低心智负担的同时获得了更好的维护性和更高的性能。可谓是在开发性能和简易中找到了绝佳的平衡点。而这一切都被封装到了Go
的标准库net
库里,不用像其它语言那样需要借助五花八门的第三方库(往往兼容性差,规范不统一)或者自己吭哧吭哧的去封装,效果还是一个大大的问号。那Go net
库是怎么做到的呢,让我们一起到源码里面一探究竟。
写在前面
首先申明下我的go
版本,命令行下输入go version
, 可以看到我的go
版本是go1.13.4
go version go version go1.13.4 darwin/amd64
然后本文会涉及到以下知识点或者说概念
goroutine
及goroutine
调度,Go net
库的网络模式是每个连接一个goroutine
,当连接发生可读和可写事件时在go runtime
会发生goroutine
的调度io
多路复用模型epoll
(linux
下为epoll
,mac
下为kqueue
, 为本文阐述方便,默认epoll
),用于IO 事件驱动
- 简单的
TCP socket
编程
以上概念有个基础的了解就行了,也可网上自行搜索了解。
一个简单的TCP
程序
下面是一个最简单的TCP
连接处理Go
程序:
func main() { ln, err := net.Listen("tcp", ":8000") if err != nil { fmt.Println("listen error: ", err) return } for { // main 负责循环监听 conn, err := ln.Accept() if err != nil { fmt.Println("accept error: ", err) break } // 开启一个新的goroutine处理 go HandleConn(conn) } } func HandleConn(conn net.Conn) { defer conn.Close() packet := make([]byte, 1024) for { // 如果没有可读数据,也就是读 buffer 为空,则阻塞 _, _ = conn.Read(packet) // 同理,不可写则阻塞 _, _ = conn.Write(packet) } }
可以看到以上的代码就是Go
中经常写的,main
程中负责accept
连接,之后在另一个goroutine
中handle
,可以很清楚地看到这是一种goroutine-per-connection
,也就是每来一个连接则分配一个goroutine
,从代码上看完全是同步的模式,但实际上在go runtime
发生了goroutine
的调度和切换,只不过这封装在go runtime
,开发者一般不需要去关心,其实底层也是基于epoll
的 IO事件驱动
,下面就重点来看看Go
是怎么做的。
重要的数据结构
好的程序员关注数据结构和它们之间的关系,如果我们想理清它的主要流程首先要做的就是把它的主要数据结构拎出来然后找到它们之间是怎么联系起来的,它们之间的关系是什么。在Go
源码中有以下几个比较重要的数据结构:
// TCPListener 负责监听 TCP 网络连接 type TCPListener struct { fd *netFD // netFD 是抽象出来的网络描述符 lc ListenConfig } type netFD struct { pfd poll.FD //poll.FD 是GO 对文件描述符的封装,不论是网络连接还是文件操作本质上都是对文件描述符的操作 ... } type FD struct { fdmu fdMutex Sysfd int // 通过系统调用获取到的系统文件描述符 pd pollDesc // 对底层事件驱动的封装 ... } type pollDesc struct { link *pollDesc fd uintptr // 系统文件描述符 closing bool // 通过以下字段保存了相关 G 的运行信息,和 Go runtime 调度有关 rg uintptr // pdReady, pdWait, G waiting for read or nil rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline wseq uintptr // protects from stale write timers wg uintptr // pdReady, pdWait, G waiting for write or nil wt timer // write deadline timer wd int64 // write deadline }
上面就是重要的几个结构体,有很多字段,但我们这次的目的是搞清主流程即可,有很多字段我们可以不去关注,所以上面我也省去了很多不在此次分析流程里的结构,弄懂重要字段间的联系即可。首先调用net.Listen
返回TCPListener
,其中的ListenConfig
从名字上看来就是一些配置选项,不去管它,这个netFD
是一个很重要的数据结构,它是Go
抽象出来的一个网络描述符,类似文件描述符的概念,这个数据结构相当的重要,所有Go
的网络接口最终都会转化为对该结构的方法,netFd
结构体里面有一个poll.FD
结构,这个poll.FD
中的Sysfd
就是真正的底层系统文件描述符(在系统里,一切都是文件描述符,包括网络连接),而poll.FD
中的 pollDesc
就是对事件驱动的封装,所有的IO操作都是调用这个结构体的相关方法完成的。对这些数据结构和它们之间的关系有了最基本的概念之后,我们就试着来源码分析一下一个TCP
的Listen
过程。
TCP
Listen
借助IDE
的跳转我们可以直接从net.Listen
追踪到tcpsock_posix.go
文件的 listenTCP
方法,它的代码如下:
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) { fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control) if err != nil { return nil, err } return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil }
可以看到最后是返回了一个TCPListener
, 这个结构我们在上面的数据结构里面列出来了,可以看到在返回这个TCPListener
结构体之前已经给它把fd
字段赋值了,而这个fd
字段的类型就是我们上面提到的netFD
,那这个netFd
是怎么生成的呢?我们点进internetSocket
这个方法里面:
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) { if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd" || runtime.GOOS == "nacl") && mode == "dial" && raddr.isWildcard() { raddr = raddr.toLocal(net) } family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode) return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn) }
其中不同系统的兼容我们可以不用看,可以看到netFD
是通过socket
方法生成的,我们再跟踪进去socket
方法:
// socket 返回一个已经初始化好了的netFD(网络文件描述符),通过network poller可以实现异步的网络IO func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) { s, err := sysSocket(family, sotype, proto) // 通过系统调用获取系统文件描述符 if err != nil { return nil, err } if fd, err = newFD(s, family, sotype, net); err != nil { // 封装系统文件描述符获取到新的 netFD poll.CloseFunc(s) return nil, err } //bind、listen、注册到epoll实例监听该fd上的可读和可写事件 if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil }
这里先是通过sysSocket
系统调用获得系统文件描述符,然后作为参数传入newFD
方法封装后返回的就是netFD
(网络描述符)。创建完之后还要系统调用syscall.Bind
和 syscal.Listen
来监听端口,并且还要将系统文件描述符注册到epoll
实例上去来监控该文件描述符上面的可读和可写事件。这个是通过fd.listenStream
方法做到的,点进fd.listenStream
方法里面看到如下代码:
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error { var err error if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { // 系统调用 syscall.Bind return os.NewSyscallError("bind", err) } if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil { // 系统调用 syscall.Listen return os.NewSyscallError("listen", err) } if err = fd.init(); err != nil { // poll.FD.init 初始化 return err } return nil }
上面代码分别调用了syscall.Bind
和 listenFunc
(也就是syscall.Listen
) ,然后上面这里还有一个重要的方法是调用了netFd.init
方法也就是上面我们说的创建文件epoll
实例并将之前初始化好的系统文件描述符注册到epoll实例来监听可读和可写事件。netFd.init
调用了poll.FD.init
, 而poll.FD.init
又调用了pollDesc.init
:
// netFD 的初始化 func (fd *netFD) init() error { return fd.pfd.Init(fd.net, true) // 调用poll.FD.Init } // poll.FD 的初始化 func (fd *FD) Init(net string, pollable bool) error { err := fd.pd.init(fd) // 调用 pollDesc.init return err } // 底层 pollDesc 的初始化 func (pd *pollDesc) init(fd *FD) error { // 创建epoll或者kqueue实例(linux系统为epoll, mac下为kqueue) serverInit.Do(runtime_pollServerInit) ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) // 将文件描述符注册到 network poller if errno != 0 { if ctx != 0 { runtime_pollUnblock(ctx) // 设置非阻塞模式 runtime_pollClose(ctx) } return errnoErr(syscall.Errno(errno)) } pd.runtimeCtx = ctx // 将初始化好的pollDesc 保存 return nil }
而底层的pollDesc
结构的init
方法调用了runtime_pollServerInit
创建epoll
实例,调用runtime_pollOpen
将文件描述符注册到epoll
实例并返回pollDesc
指针,调用runtime_pollUnblock
设置为非阻塞模式,pollDesc
初始化完成后保存指针以便后续操作。
所以从上面的分析可以看出来 TCP Listen
主要做了以下几件事:
- 通过
linux
系统调用创建fd
给listener
- 调用
netFD.init
初始化netFD
- 调用
poll.FD.init
初始化poll.FD
- 调用
pollDesc.init
初始化pollDesc
pollDesc.init
中创建了epoll
实例并且将listener fd
注册到epoll
实例,监听在它上面发生的事件,也就是accept
事件
这里还有一个值得注意的细节:
serverInit.Do(runtime_pollServerInit)
这里serverInit
是一个sync.Once
类型,这么做可以保证只初始化一个epoll
实例。
TCP Accept
之前listen
的一系列动作创建了listener netFD
并且能够通过epoll
监听它的IO
事件。接下来就是在listener netFD
上进行accept
操作了。
// accept 对接受的网络连接做一些初始化(本质上也是对netFD、poll.FD、pollDesc的初始化)后返回一个TCPConn表示这个TCP连接 func (ln *TCPListener) accept() (*TCPConn, error) { fd, err := ln.fd.accept() // 调用netFD.accept if err != nil { return nil, err } tc := newTCPConn(fd) // 传入netFD.accept 获取到的netFD,返回TCPConn return tc, nil }
ln.fd.accept
,这里调用了listener netFD
的accept
(一切网络操作最终都会转化在netFD
结构体 上的方法进行操作),进入这个netFD.accept
方法:
// netFd.accept 返回初始化好的netFD用来表示这个TCPConn func (fd *netFD) accept() (netfd *netFD, err error) { d, rsa, errcall, err := fd.pfd.Accept() // 调用poll.FD.Accept来获取系统文件描述符 if err != nil { if errcall != "" { err = wrapSyscallError(errcall, err) } return nil, err } if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil { // 创建新的netFD poll.CloseFunc(d) return nil, err } if err = netfd.init(); err != nil { // 初始化netFD fd.Close() return nil, err } return netfd, nil }
从函数签名上来看在listener fd
上调用的accept
最终又返回了一个新的netFD
,这个新的netFD
就是分配给新的TCPConn
的。 在这个方法里面newFD
和netfd.init
上面listen
的过程中已经分析过了,就是系统调用获取sys fd
初始化netFD
结构并将该fd
加入epoll
实例监听IO
事件。这里主要看看poll.FD.Accept
:
// poll.FD.Accept 会一直阻塞住直到该listener fd 上来了新的连接才会通过系统调用accept拿到该连接的底层系统文件描述符返回 func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { ... for { // 系统调用accept获取系统文件描述符,如果没有连接accpet 返回错误 s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { case syscall.EAGAIN: if fd.pd.pollable() { //pollDesc.waitRead会停住当前的listener goroutine直到有新连接到来 if err = fd.pd.waitRead(fd.isFile); err == nil{ continue } } case syscall.ECONNABORTED: // syscall.ECONNABORTED 错误表示一个socket在accept之前就已经被关闭 // 这时候只需要重试就可以 continue } return -1, nil, errcall, err } }
循环中accept
,如果err == nil
表示正常建立连接直接返回即可,如果是syscall.ECONNABORTED
重试即可,否则如果是syscall.EAGAIN
错误说明Socket的缓冲区为空不可读需要进入pollDesc.waitRead
方法park
住当前goroutine
直到有新的连接来到。那么有两个问题:
pollDesc.waitRead
是怎么实现park
住当前goroutine
的呢?- 当有新连接来的时候又是怎么唤醒 当前
goroutine
的呢?
首先看第一个问题,进入pollDesc.waitRead
方法:
//调用pollDesc.wait func (pd *pollDesc) waitRead(isFile bool) error { return pd.wait('r', isFile) } // pollDesc.wait 调用runtime_pollWait func (pd *pollDesc) wait(mode int, isFile bool) error { if pd.runtimeCtx == 0 { return errors.New("waiting for unsupported file type") } res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res, isFile) } // poll_runtime_pollWait 通过 netpollblock 来检查IO 是否ready func poll_runtime_pollWait(pd *pollDesc, mode int) int { err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } } return 0 } // netpollblock 如果该文件描述符上IO ready,则返回true,否则返回false func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } // 设置该goroutine状态为wait for { old := *gpp if old == pdReady { *gpp = 0 return true } if old != 0 { throw("runtime: double wait") } if atomic.Casuintptr(gpp, 0, pdWait) { break } } if waitio || netpollcheckerr(pd, mode) == 0 { gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)//park住当前goroutine } old := atomic.Xchguintptr(gpp, 0) if old > pdWait { throw("runtime: corrupted polldesc") } return old == pdReady }
pollDesc.waitReady
-> pollDesc.wait
-> poll_runtime_pollWait
-> netpollblock
->gopark
调用gopark
过后会将当前goroutine
的状态由 _Grunning
变为 _Gwaitting
放入等待队列中直到被唤醒
,而被唤醒goroutine
继续执行也意味着新的期待的事件发生了。大家可以在gopark
调用这一行打个断点然后启动一个TCP
服务端监听会发现该程序会一直被阻塞在gopark
这一行,这是因为当前listener fd
上没有IO
事件发生,所以通过上述流程将当前goroutine
停住了,放入了Go
的runtime
调度等待队列,等待IO事件
。这时候倘若新开启命令行窗口,telnet
对应的端口,则当前listener goroutine
会被唤醒继续执行下面的逻辑。至于gopark
的具体逻辑大家有兴趣可以去看看,这涉及到Go
的runtime
调度,我对这方面也没有深入的研究就不在本文里过多阐述了。
所以我们解决了第一个问题,goroutine
是如何被停住的,既然有阻塞就应该有唤醒
,那么是如何知道期待的IO
事件已经发生在fd
上了呢?如何在知道期待的IO
事件已经发生的时候唤醒相关的goroutine
的呢?其实是通过epoll_wait
来实现的,Go runtime
会在多个场景下调用epoll_wait
来获取就绪的fd
列表,比如Go
的监控线程sysmon
就会调用到epoll_wait
,而fd
和goroutine
运行信息又被封装到pollDesc
结构体关联了起来,所以 Go runtime
就可以将这些goroutine
重新从等待队列里捞出来调度运行。换句话说就是go runtime
通过调用epoll_wait
拿到了就绪的fd
列表,而fd
(这里是TCP连接的可读和可写事件)又在之前的初始化中和goroutine
的运行信息关联在了pollDesc
上,因此go runtime
就可以知道哪些goroutine
可以被唤醒干活了。至此accpet
的主要流程都分析完了。
至于TCP
的read
和write
的基本流程和accept
是一样的,当新的连接来到的时候listener fd
上会发生IO
事件,从而listener groutine
被唤醒通过accept
获取到了TCPConn
,然后初始化了这个TCPConn
的netFd
,将这个netFd
注册到epoll
实例后就可以监听这个TCPCoon
上的可读和可写事件。 在这里受限于篇幅就不再过多阐述了,读者可以自行去分析read
和write
流程。
总结
Go
将大量细节封装在Go runtime
中,经过大量的封装后对外才能暴露给开发者的是高性能、简单的网络编程API
,让开发者可以轻松的以同步的思想来开发异步的网络编程而完全避免了回调地狱,极大的降低了程序员的心智负担。本文从部分主要源码角度阐述了Go net
库,受限于篇幅以及让文章脉络更清晰,本文展示的源码片段是根据作者节选的认为流程中最重要的也是本文重点分析的流程,并不是完整的Go
源码,作者水平有限,如有错误和疏忽望指正,万分感激。
参考资料 && 延伸阅读
<p> <span style="font-size:14px;">本专刊是Go开源项目源码分析专栏,共 17 篇文章,挑选了Go 开源界知名的 4 个开源项目gnet(高效的网络库)、gin(知名的Go微型web框架)、fasthttp(高性能web框架)、nsq(Go消息队列)来对它们进行源码分析,分析它们的设计思想和代码实现。每个项目的讲解都是由浅入深,由设计思想的剖析到源码实现的分析,更易于读者理解。</span> </p> <p> <br /> </p> <h2> <b><span style="font-size:16px;line-height:1;">购买须知:</span></b> </h2> <span style="font-size:14px;">订阅成功后,用户即可通过牛客网 PC 端、App 端享有永久阅读的权限;</span><br /> <span style="font-size:14px;">牛客专刊为虚拟内容服务,订阅成功后概不退款;</span><br /> <span style="font-size:14px;line-height:1;">在专刊阅</span><span style="font-size:14px;line-height:1;">读过程中,如有任何问题,可在文章评论区底部留言,或添加牛客导师,加入读者交流群;</span><br /> <span style="font-size:14px;">想成为牛客作者,请邮件联系yinxiaoxiao@nowcoder.com,邮件主题【牛客作者+写作方向】,并附上个人简历一份及近期作品一份;</span><br /> <p> <span style="font-size:14px;">牛客专刊版权归本平台所有,任何机构、媒体、网站或个人未经本网协议授权不得转载、链接、转贴或以其他方式复制发布 / 发表,违者将依法追究责任</span><span style="font-size:14px;">。</span> </p> <p> <br /> </p>