fasthttp重点源码分析

fasthttp重点源码分析

前文讲解了fasthttp高性能的原理在于复用,包括goroutine 的复用、request context 的复用、reader 的复用、writer 的复用,以及各个方面减少内存分配和拷贝,减轻了GC 的压力,本文就通过跟踪fasthttp 的源码来进一步体会它的设计。

helloword Server 开始

import (
    "flag"
    "fmt"
    "log"

    "github.com/valyala/fasthttp"
)

var (
  // 启动参数 address
    addr = flag.String("addr", ":8080", "TCP address to listen to")
)

func main() {
    flag.Parse()

    h := requestHandler

    if err := fasthttp.ListenAndServe(*addr, h); err != nil {
        log.Fatalf("Error in ListenAndServe: %s", err)
    }
}

/*
requestHandler 为请求处理函数
*/
func requestHandler(ctx *fasthttp.RequestCtx) {
    var c fasthttp.Cookie
    c.SetKey("cookie-name")
    c.SetValue("cookie-value")
    ctx.Response.Header.SetCookie(&c)
}

可以看到fasthttp 启动一个http server 也是很简单,定义一个requestHandler 作为参数调用fasthttp.ListenAndServe 即可。还是老办法,一步步追踪进去,来看看fasthttp.ListenAndServe

func ListenAndServe(addr string, handler RequestHandler) error {
    s := &Server{
        Handler: handler,
    }
    return s.ListenAndServe(addr)
}

初始化了Server 结构体的Handler字段为用户定义好的requestHandler ,然后调用Server.ListenAndServe启动htt server来监听端口处理请求。来看看Server 这个核心的结构体是什么样的。

Server struct

type Server struct {
    Handler RequestHandler
    // Server name
    Name string

  // server 最大的并行处理连接数(注意是并行,实际上该值就是worker pool 里面最大的worker)
    Concurrency int

    // 是否关闭keep alive
    DisableKeepalive bool

  // buffer size for requests reading,读取请求的buffer的最大容量
    ReadBufferSize int

  // buffer size for write response
    WriteBufferSize int

  // read timeout
    ReadTimeout time.Duration

    // write timeout
    WriteTimeout time.Duration

  ...

  // 当前server 正在处理中的请求连接数量
    concurrency      uint32
  // request context pool
    ctxPool        sync.Pool
  // reader pool
    readerPool     sync.Pool
  // writer pool
    writerPool     sync.Pool

    // 所有连接 net listener 都会存放在这里面,便于shutdown server 时close 所有的listener
    ln []net.Listener

    mu   sync.Mutex // 并发锁
    done chan struct{} // 服务关闭通知channel
}

源代码

fasthttp Server 结构体有很多字段,Handler是开发者定义的请求处理函数,另外就是fasthttp server的配置项,比如连接读写timeout 和最大连接数等等。这里为了方便大家抓重点,省略掉了很多配置选项,只留下了我认为和主流程最相关的字段,大家有兴趣可以去看看Server 完整的结构体。再就是用于在server运行过程中存储对象的几个字段:

  • ctxPool Context 池,用于存储和复用请求的Context,每次请求都会尝试先从Context池去获取Context,如果获取不到会创建新的供本次请求使用,请求结束后放入池子里面供后面其他的请求使用。
  • readerPoolreader对象的存储池,读取request需要reader对象,同样reader对象也是复用的,同Context
  • writerPoolwriter对象的存储池,处理完业务逻辑之后需要writer对象将结果写入连接响应给客户端,writer对象也是复用的,同前两者
  • ln,是一个数组,用来存储所有的net listener,这样在server关闭的时候可以close掉所有的net listener

此外还有mu保证多个goroutine并发安全的读写Serverdone用来发送和监听Server的退出。

看完Server struct 之后再接着看Server.ListenAndServe,这也是启动主流程的地方。

Server.ListenAndServe

func (s *Server) ListenAndServe(addr string) error {
    ln, err := net.Listen("tcp4", addr) // 调用标准库net.Listen 得到listener
    if err != nil {
        return err
    }
    return s.Serve(ln)
}

源代码

首先调用net.Listen来获得listener,然后调用Server.Servelistener传入,这样Serve.Serves将会处理该listener上获取到的连接。

再来看Serve.Serve:

func (s *Server) Serve(ln net.Listener) error {
    var c net.Conn
    var err error

    maxWorkersCount := s.getConcurrency() // 获取 Server.Concurrency,如果没有设置,默认256 * 1024

    s.mu.Lock()
    {
        s.ln = append(s.ln, ln) // 储存进 Server.ln 数组
        if s.done == nil { // 初始化done channel
            s.done = make(chan struct{})
        }
    }
    s.mu.Unlock()

  // 初始化server worker pool
    wp := &workerPool{
    ... // 省略一些次要字段
        WorkerFunc:      s.serveConn, // WorkerFunc 初始化为ServerlserveConn,每个worker拿到一个分配到它的conn后会调用该函数
        MaxWorkersCount: maxWorkersCount, // worker pool 中worker 的最大数量
    }
    wp.Start() // workerPool start
    for {
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { // accept connection
            wp.Stop()
            if err == io.EOF {
                return nil
            }
            return err
        }
        if !wp.Serve(c) {
            s.writeFastError(c, StatusServiceUnavailable,
                "The connection cannot be served because Server.Concurrency limit exceeded")
            c.Close()
            s.setState(c, StateClosed)
            }
        }
        c = nil
    }
}

源代码

可以看到Server.serve 函数做了一些初始化动作后开启了goroutine pool ,然后循环accpet连接 ,调用worker pool.Serve 函数处理请求连接,这里就要来到一个重要的部分了:fasthttp goroutine pool

goroutine pool

先来看worker pool 的结构体:

type workerPool struct {
    // 获取到conn之后调用的处理函数,Server.serveConn
    WorkerFunc ServeHandler

  // 最大worker 数量
    MaxWorkersCount int

    // pool中worker最大空闲时间,空闲超过这个时间的worker将会从pool中clear掉,达到动态调整worker pool的效果
    MaxIdleWorkerDuration time.Duration

    lock         sync.Mutex
  // 当前worker 数量
    workersCount int
    mustStop     bool

  // 储存每个worker 的channel,这里缓存的是运行中的worker channel

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

go高薪必备:面试框架17讲 文章被收录于专栏

<p> <span style="font-size:14px;">本专刊是Go开源项目源码分析专栏,共 17 篇文章,挑选了Go 开源界知名的 4 个开源项目gnet(高效的网络库)、gin(知名的Go微型web框架)、fasthttp(高性能web框架)、nsq(Go消息队列)来对它们进行源码分析,分析它们的设计思想和代码实现。每个项目的讲解都是由浅入深,由设计思想的剖析到源码实现的分析,更易于读者理解。</span> </p> <p> <br /> </p> <h2> <b><span style="font-size:16px;line-height:1;">购买须知:</span></b> </h2> <span style="font-size:14px;">订阅成功后,用户即可通过牛客网 PC 端、App 端享有永久阅读的权限;</span><br /> <span style="font-size:14px;">牛客专刊为虚拟内容服务,订阅成功后概不退款;</span><br /> <span style="font-size:14px;line-height:1;">在专刊阅</span><span style="font-size:14px;line-height:1;">读过程中,如有任何问题,可在文章评论区底部留言,或添加牛客导师,加入读者交流群;</span><br /> <span style="font-size:14px;">想成为牛客作者,请邮件联系yinxiaoxiao@nowcoder.com,邮件主题【牛客作者+写作方向】,并附上个人简历一份及近期作品一份;</span><br /> <p> <span style="font-size:14px;">牛客专刊版权归本平台所有,任何机构、媒体、网站或个人未经本网协议授权不得转载、链接、转贴或以其他方式复制发布 / 发表,违者将依法追究责任</span><span style="font-size:14px;">。</span> </p> <p> <br /> </p>

全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务