nsq 之 nsqlookupd 源码解析

nsq 之 nsqlookupd 源码解析

nsqlookupd是nsq管理集群拓扑信息以及用于注册和发现nsqd服务;所以,也可以把nsqlookupd理解为注册发现服务;当nsq集群中有多个nsqlookupd服务时,因为每个nsqd都会向所有的nsqlookupd上报本地信息,因此nsqlookupd具有最终一致性。

重点数据结构

  • NSQLookupd

    NSQLookupdnsqlookupd的核心数据结构,它包含了nsqlookupd 启动和服务的核心配置和组件。

    type NSQLookupd struct {
       sync.RWMutex
       // 配置项
       opts         *Options
       // TCP listener
       tcpListener  net.Listener
       // HTTP listener
       httpListener net.Listener
       // TCP handler
       tcpServer    *tcpServer
       waitGroup    util.WaitGroupWrapper
      // 存储topic、channel对应的生产者信息,可以看作是一个注册中心
       DB           *RegistrationDB
    }

    源代码

  • Registration

    Registration 相关可以看作类似注册中心,负责注册记录各个nsqd实例上的topicchannel信息。

    // RegistrationDB
    type RegistrationDB struct {
       sync.RWMutex
       // registrationMap 记录了每个topic/channel 对应的一组生产者
       registrationMap map[Registration]ProducerMap
    }
    // RegistrationDB.AddRegistration 添加一个Registration
    // RegistrationDB.AddProducer 为一个Registration 添加一个producer
    // RegistrationDB.RemoveProducer 为一个Registration 移除一个producer
    // RegistrationDB.RemoveRegistration 移除一个Registration以及它的所有producer
    // RegistrationDB.FindRegistrations 根据 Category、key、subKey找到一组对应的Registration
    // RegistrationDB.FindProducers 根据 Category、key、subKey找到一组对应的producer
    // RegistrationDB.LookupRegistrations 根据producer peerInfo.id 找到其对应的Registrations
    
    // Registration 唯一标识一个消费单位(topic级别或者channel级别)
    type Registration struct {
        // category 表明该Registration是topic级别还是channel级别
        Category string
        // topic name
        Key      string
        // 如果Category 是channel则subKey 是channel name, 如果Category是topic则SubKey为空
        SubKey   string
    }
    
    type ProducerMap map[string]*Producer
    
    type Producer struct {
        peerInfo     *PeerInfo
    }
    
    // 记录prodcer的信息
    type PeerInfo struct {
        lastUpdate       int64
        id               string
        RemoteAddress    string `json:"remote_address"`
        Hostname         string `json:"hostname"`
        BroadcastAddress string `json:"broadcast_address"`
        TCPPort          int    `json:"tcp_port"`
        HTTPPort         int    `json:"http_port"`
        Version          string `json:"version"`
    }
    
    type Registrations []Registration

    源代码

源码分析流程

  • NSQLookupd.Main

    nsqLookupd启动主流程入口

    func (l *NSQLookupd) Main() error {
       ctx := &Context{l}
    
       exitCh := make(chan error)
       var once sync.Once
       exitFunc := func(err error) {
          once.Do(func() {
             if err != nil {
                l.logf(LOG_FATAL, "%s", err)
             }
             exitCh <- err
          })
       }
    
       l.tcpServer = &tcpServer{ctx: ctx}
       // 启动TCPServer 监听响应nsqd register/unregister/ping 请求
       l.waitGroup.Wrap(func() {
          exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf))
       })
       // 启动HTTPServer 监听响应HTTP 请求(主要是topic/channel 的管理以及一些供debug 的信息)
       httpServer := newHTTPServer(ctx)
       l.waitGroup.Wrap(func() {
          exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
       })
    
       err := <-exitCh
       return err
    }

    源代码

  • nsqlookupd TCPServer

    TCPServer 负责监听和处理TCP请求。

    func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
       logf(lg.INFO, "TCP: listening on %s", listener.Addr())
    
       var wg sync.WaitGroup
    
       for {
          // accept TCP conn
          clientConn, err := listener.Accept()
          if err != nil {
             if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                logf(lg.WARN, "temporary Accept() failure - %s", err)
                runtime.Gosched()
                continue
             }
             // theres no direct way to detect this error because it is not exposed
             if !strings.Contains(err.Error(), "use of closed network connection") {
                return fmt.Errorf("listener.Accept() error - %s", err)
             }
             break
          }
    
          wg.Add(1)
          go func() {
             // go TCP handler.Handle
             handler.Handle(clientConn)
             wg.Done()
          }()
       }
    
       // wait to return until all handler goroutines complete
       wg.Wait()
    
       logf(lg.INFO, "TCP: closing %s", listener.Addr())
    
       return nil
    }

    源代码

    这个TCPServer 函数和nsqd 的是共用的,但handler 不是同一个handler,nsqlookupd TCP Handle 方法如下:

    func (p *tcpServer) Handle(clientConn net.Conn) {
       p.ctx.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())
    
       /**
       读取并解析 protocolMagic
        */
       buf := make([]byte, 4)
       _, err := io.ReadFull(clientConn, buf)
       if err != nil {
          clientConn.Close()
          return
       }
       protocolMagic := string(buf)
    
       var prot protocol.Protocol
       switch protocolMagic {
       case "  V1":
          prot = &LookupProtocolV1{ctx: p.ctx}
       default: // 返回协议错误
          protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
          clientConn.Close()
          return
       }
    
       p.conns.Store(clientConn.RemoteAddr(), clientConn)
    
       // LookupProtocolV1.IOLoop 处理请求
       err = prot.IOLoop(clientConn)
       if err != nil {
          p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clien

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

go高薪必备:面试框架17讲 文章被收录于专栏

<p> <span style="font-size:14px;">本专刊是Go开源项目源码分析专栏,共 17 篇文章,挑选了Go 开源界知名的 4 个开源项目gnet(高效的网络库)、gin(知名的Go微型web框架)、fasthttp(高性能web框架)、nsq(Go消息队列)来对它们进行源码分析,分析它们的设计思想和代码实现。每个项目的讲解都是由浅入深,由设计思想的剖析到源码实现的分析,更易于读者理解。</span> </p> <p> <br /> </p> <h2> <b><span style="font-size:16px;line-height:1;">购买须知:</span></b> </h2> <span style="font-size:14px;">订阅成功后,用户即可通过牛客网 PC 端、App 端享有永久阅读的权限;</span><br /> <span style="font-size:14px;">牛客专刊为虚拟内容服务,订阅成功后概不退款;</span><br /> <span style="font-size:14px;line-height:1;">在专刊阅</span><span style="font-size:14px;line-height:1;">读过程中,如有任何问题,可在文章评论区底部留言,或添加牛客导师,加入读者交流群;</span><br /> <span style="font-size:14px;">想成为牛客作者,请邮件联系yinxiaoxiao@nowcoder.com,邮件主题【牛客作者+写作方向】,并附上个人简历一份及近期作品一份;</span><br /> <p> <span style="font-size:14px;">牛客专刊版权归本平台所有,任何机构、媒体、网站或个人未经本网协议授权不得转载、链接、转贴或以其他方式复制发布 / 发表,违者将依法追究责任</span><span style="font-size:14px;">。</span> </p> <p> <br /> </p>

全部评论

相关推荐

头像
04-26 15:05
已编辑
腾讯_后端开发
小红书 iOS社区技术 年薪52w+包三餐大小周
点赞 评论 收藏
转发
点赞 收藏 评论
分享
牛客网
牛客企业服务