教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 NSQ源码分析(一)——nsqd的初始化及启动流程

NSQ源码分析(一)——nsqd的初始化及启动流程

发布时间:2022-01-29   编辑:jiaochengji.com
教程集为您提供NSQ源码分析(一)——nsqd的初始化及启动流程等资源,欢迎您收藏本站,我们将为您提供最新的NSQ源码分析(一)——nsqd的初始化及启动流程资源

nsq源码地址:https://github.com/nsqio/nsq

版本1.1.0 

NSQ源码分析系列是我通过阅读nsq的源码及结合网上的相关文章整理而成,由于在网上没有找到很详细和完整的文章,故自己亲自整理了一份。如果有错误的地方,还请指正,希望这系列的文章给您带来帮助。

 

<h2 style="margin-left:0cm;">NSQD启动流程</h2>

   nsqd的启动流程在nsq/apps/nsqd/nsqd.go中的<span style="color:#808080;">Start()函数</span>,以下为初始化流程

1.调用<span style="color:#f33b45;">nsqd.NewOptions()</span>加载参数及参数的默认值
2.将用户配置的参数值设置到对应的参数
3.随后判断config参数是否存在,若存在的话还需进行配置文件的读取, 如果配置文件存在并且符合toml格式,则调用cfg.Validate对配置文件的各项进行进一步的合法性检查
4.调用<span style="color:#f33b45;">nsqd.New(opts)</span>,初始化NSQD结构体,并检验Options配置信息是否有误
5.调用<span style="color:#f33b45;">LoadMetadata()</span>,主要是加载元数据信息
6.调用<span style="color:#f33b45;">PersistMetadata()</span>,主要将元数据持久化到文件中
5.调用<span style="color:#f33b45;">nsqd.Main()</span>,启动nsqd服务

<em>ps:后面会详细介绍各个启动流程</em>

<pre class="has"><code>func (p *program) Start() error { opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:]) rand.Seed(time.Now().UTC().UnixNano()) if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { fmt.Println(version.String("nsqd")) os.Exit(0) } var cfg config configFile := flagSet.Lookup("config").Value.String() if configFile != "" { _, err := toml.DecodeFile(configFile, &cfg) if err != nil { log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) } } cfg.Validate() options.Resolve(opts, flagSet, cfg) nsqd := nsqd.New(opts) err := nsqd.LoadMetadata() if err != nil { log.Fatalf("ERROR: %s", err.Error()) } err = nsqd.PersistMetadata() if err != nil { log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) } nsqd.Main() p.nsqd = nsqd return nil }</code></pre> <h3>一、配置初始化及解析</h3> <pre class="has"><code>opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:])</code></pre> <h3>二、NSQD初始化</h3>

NSQD的初始化主要在nsq/nsqd/nsqd.go文件的New(opts *Options)函数中

主要作用是:初始化NSQD,并检验Options的配置信息

 

<h3>三、元数据加载和持久化</h3>

元数据的加载和初始化主要在LoadMetadata()和PersistMetadata()中

下面主要看下<span style="color:#f33b45;">LoadMetadata()</span>方法

1.元数据以json格式保存在nsqd可执行文件目录下的nsqd.dat文件中。
2.读取文件中的json数据并转换成反序列化成meta结构体,得到系统中存在的topic列表,遍历列表中的topic:
  (1)检查topic名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-] (#ephemeral)?$),若不合法则忽略
  (2) 使用GetTopic()函数通过名字获得topic对象
  (3)判断当前topic对象是否处于暂停状态,是的话调用Pause函数暂停topic
  (4) 获取当前topic下所有的channel,并且遍历channel,执行的操作与topic基本一致
        (i)检查channel名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-] (#ephemeral)?$),若不合法则忽略
        (ii)使用GetChannel函数通过名字获得channel对象
        (iii)判断当前channel对象是否处于暂停状态,是的话调用Pause函数暂停channel
至此,元数据的载入完成

<pre class="has"><code>func (n *NSQD) LoadMetadata() error { atomic.StoreInt32(&n.isLoading, 1) defer atomic.StoreInt32(&n.isLoading, 0) fn := newMetadataFile(n.getOpts()) data, err := readOrEmpty(fn) if err != nil { return err } if data == nil { return nil // fresh start } var m meta err = json.Unmarshal(data, &m) if err != nil { return fmt.Errorf("failed to parse metadata in %s - %s", fn, err) } for _, t := range m.Topics { if !protocol.IsValidTopicName(t.Name) { n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name) continue } topic := n.GetTopic(t.Name) if t.Paused { topic.Pause() } for _, c := range t.Channels { if !protocol.IsValidChannelName(c.Name) { n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name) continue } channel := topic.GetChannel(c.Name) if c.Paused { channel.Pause() } } topic.Start() } return nil }</code></pre> <pre>PersistMetadata() 1.当前的topic和channel信息写入nsqd.dat文件中,主要步骤是忽略#ephemeral结尾的topic和channel后将topic和channel列表json序列化后写回文件中 2.当项目启动,退出,Topic和Channel变化,以及Topic和Channel暂停状态改变都会调用方法持久化到该文件中</pre> <h3>四、NSQD服务启动</h3>

主要在nsq/nsqd/nsqd.go中的Main()函数

1.开启一个tcp服务,用于监听tcp连接
     (1)当有新的客户端连接后,检测协议版本号
     (2)最后调用protocol_v2的IOLoop(conn net.Conn)进行客户端读写操作  (以后还会详细介绍IOLoop方法)
2.开启一个协程,开启一个http的Serve,用于监听http请求
3.可选,如果配置tls信息,则新建一个https的服务,用于监听https请求
4.调用queueScanLoop()方法 作用:
   (1)定期   执行根据channel的数量控制worker Pool中worker的数量
   (2)定期   检测channel中是否有可以投递的消息,如果有,则投递消息
5.调用lookupLoop()方法 作用:
   (1)定时检测nsqd和nsqlookupd的连接信息(默认15s,执行一次PING命令来监听)
   (2)有Channel和Topic更新,则发送给所有配置的nsqlookupd
    (3)更新nsqlookupd配置
6.如果配置的StatsdAddress不为空,则调用statsdLoop,用于统计相关信息

 

<pre class="has"><code>func (n *NSQD) Main() { var err error ctx := &context{n} n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress) if err != nil { n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err) os.Exit(1) } n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) if err != nil { n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err) os.Exit(1) } //如果有tls配置,用于监听https客户端请求数据 if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig) if err != nil { n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err) os.Exit(1) } } tcpServer := &tcpServer{ctx: ctx} //开启一个协程,用于监听客户端连接,sync.WaitGroup.add(1),如果该协程结束,则Done()减1 n.waitGroup.Wrap(func() { protocol.TCPServer(n.tcpListener, tcpServer, n.logf) }) //httpServer为http Serve的 Handler,定义了路由与对应得分方法 httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) //开启一个协程,开启一个http的Serve,用于监听http请求 n.waitGroup.Wrap(func() { http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf) }) //开启一个协程 处理一个 http的Serve,用于监听https请求 if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf) }) } n.waitGroup.Wrap(n.queueScanLoop) n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) } } </code></pre>

 

执行完<code>Main</code>函数后,配置和初始化工作全部完成,各个组件启动运行,而主goroutine会阻塞在<code><-signalChan</code>处,直到收到中断程序的信号,随后执行<code>nsqd.Exit()</code>函数。
<code>Exit()</code>函数将进行<code>socket</code>关闭等清理工作,随后结束整个程序的运行。

到此这篇关于“NSQ源码分析(一)——nsqd的初始化及启动流程”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!

您可能感兴趣的文章:
NSQ源码分析(一)——nsqd的初始化及启动流程
CentOS 7下安装nsq
linux下PostgreSQL的安装与使用
更改MySQL数据库名实例代码
go 类型 value 不支持索引_Go语言基础(十四)
mysql导入导出数据时中文乱码的解决办法
专家教你如何有效的学习Drupal - Drupal问答
有关 mysql count(*) 与 count(col) 查询效率的比较分析
css中position相对定位和绝对定位(relative,absolute)详解
PHP无限级分类菜单实例程序

[关闭]
~ ~