NSQ源码分析(一)——nsqd的初始化及启动流程
nsq源码地址:https://github.com/nsqio/nsq
版本1.1.0
NSQ源码分析系列是我通过阅读nsq的源码及结合网上的相关文章整理而成,由于在网上没有找到很详细和完整的文章,故自己亲自整理了一份。如果有错误的地方,还请指正,希望这系列的文章给您带来帮助。
<h2 style="margin-left:0cm;">NSQD启动流程</h2>
nsqd的启动流程在nsq/apps/nsqd/nsqd.go中的<span style="color:#808080;">Start()函数</span>,以下为初始化流程
1.调用<span style="color:#f33b45;">nsqd.NewOptions()</span>加载参数及参数的默认值
2.将用户配置的参数值设置到对应的参数
3.随后判断config参数是否存在,若存在的话还需进行配置文件的读取, 如果配置文件存在并且符合toml格式,则调用cfg.Validate对配置文件的各项进行进一步的合法性检查
4.调用<span style="color:#f33b45;">nsqd.New(opts)</span>,初始化NSQD结构体,并检验Options配置信息是否有误
5.调用<span style="color:#f33b45;">LoadMetadata()</span>,主要是加载元数据信息
6.调用<span style="color:#f33b45;">PersistMetadata()</span>,主要将元数据持久化到文件中
5.调用<span style="color:#f33b45;">nsqd.Main()</span>,启动nsqd服务
<em>ps:后面会详细介绍各个启动流程</em>
<pre class="has"><code>func (p *program) Start() error { opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:]) rand.Seed(time.Now().UTC().UnixNano()) if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { fmt.Println(version.String("nsqd")) os.Exit(0) } var cfg config configFile := flagSet.Lookup("config").Value.String() if configFile != "" { _, err := toml.DecodeFile(configFile, &cfg) if err != nil { log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) } } cfg.Validate() options.Resolve(opts, flagSet, cfg) nsqd := nsqd.New(opts) err := nsqd.LoadMetadata() if err != nil { log.Fatalf("ERROR: %s", err.Error()) } err = nsqd.PersistMetadata() if err != nil { log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) } nsqd.Main() p.nsqd = nsqd return nil }</code></pre> <h3>一、配置初始化及解析</h3> <pre class="has"><code>opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:])</code></pre> <h3>二、NSQD初始化</h3>NSQD的初始化主要在nsq/nsqd/nsqd.go文件的New(opts *Options)函数中
主要作用是:初始化NSQD,并检验Options的配置信息
<h3>三、元数据加载和持久化</h3>
元数据的加载和初始化主要在LoadMetadata()和PersistMetadata()中
下面主要看下<span style="color:#f33b45;">LoadMetadata()</span>方法
1.元数据以json格式保存在nsqd可执行文件目录下的nsqd.dat文件中。
2.读取文件中的json数据并转换成反序列化成meta结构体,得到系统中存在的topic列表,遍历列表中的topic:
(1)检查topic名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-] (#ephemeral)?$),若不合法则忽略
(2) 使用GetTopic()函数通过名字获得topic对象
(3)判断当前topic对象是否处于暂停状态,是的话调用Pause函数暂停topic
(4) 获取当前topic下所有的channel,并且遍历channel,执行的操作与topic基本一致
(i)检查channel名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-] (#ephemeral)?$),若不合法则忽略
(ii)使用GetChannel函数通过名字获得channel对象
(iii)判断当前channel对象是否处于暂停状态,是的话调用Pause函数暂停channel
至此,元数据的载入完成
主要在nsq/nsqd/nsqd.go中的Main()函数
1.开启一个tcp服务,用于监听tcp连接
(1)当有新的客户端连接后,检测协议版本号
(2)最后调用protocol_v2的IOLoop(conn net.Conn)进行客户端读写操作 (以后还会详细介绍IOLoop方法)
2.开启一个协程,开启一个http的Serve,用于监听http请求
3.可选,如果配置tls信息,则新建一个https的服务,用于监听https请求
4.调用queueScanLoop()方法 作用:
(1)定期 执行根据channel的数量控制worker Pool中worker的数量
(2)定期 检测channel中是否有可以投递的消息,如果有,则投递消息
5.调用lookupLoop()方法 作用:
(1)定时检测nsqd和nsqlookupd的连接信息(默认15s,执行一次PING命令来监听)
(2)有Channel和Topic更新,则发送给所有配置的nsqlookupd
(3)更新nsqlookupd配置
6.如果配置的StatsdAddress不为空,则调用statsdLoop,用于统计相关信息
<pre class="has"><code>func (n *NSQD) Main() { var err error ctx := &context{n} n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress) if err != nil { n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err) os.Exit(1) } n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) if err != nil { n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err) os.Exit(1) } //如果有tls配置,用于监听https客户端请求数据 if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig) if err != nil { n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err) os.Exit(1) } } tcpServer := &tcpServer{ctx: ctx} //开启一个协程,用于监听客户端连接,sync.WaitGroup.add(1),如果该协程结束,则Done()减1 n.waitGroup.Wrap(func() { protocol.TCPServer(n.tcpListener, tcpServer, n.logf) }) //httpServer为http Serve的 Handler,定义了路由与对应得分方法 httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) //开启一个协程,开启一个http的Serve,用于监听http请求 n.waitGroup.Wrap(func() { http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf) }) //开启一个协程 处理一个 http的Serve,用于监听https请求 if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf) }) } n.waitGroup.Wrap(n.queueScanLoop) n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) } } </code></pre>
执行完<code>Main</code>函数后,配置和初始化工作全部完成,各个组件启动运行,而主goroutine会阻塞在<code><-signalChan</code>处,直到收到中断程序的信号,随后执行<code>nsqd.Exit()</code>函数。
<code>Exit()</code>函数将进行<code>socket</code>关闭等清理工作,随后结束整个程序的运行。
您可能感兴趣的文章:
NSQ源码分析(一)——nsqd的初始化及启动流程
CentOS 7下安装nsq
linux下PostgreSQL的安装与使用
更改MySQL数据库名实例代码
go 类型 value 不支持索引_Go语言基础(十四)
mysql导入导出数据时中文乱码的解决办法
专家教你如何有效的学习Drupal - Drupal问答
有关 mysql count(*) 与 count(col) 查询效率的比较分析
css中position相对定位和绝对定位(relative,absolute)详解
PHP无限级分类菜单实例程序