教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 nsq源码channel的messagePump实现原理

nsq源码channel的messagePump实现原理

发布时间:2021-12-03   编辑:jiaochengji.com
教程集为您提供nsq源码channel的messagePump实现原理等资源,欢迎您收藏本站,我们将为您提供最新的nsq源码channel的messagePump实现原理资源

2019独角兽企业重金招聘Python工程师标准>>>

nsq里面channel类似于通道的作用,负责给consumer同步消息:

<pre><code class="language-go">func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { var err error //内存的MsgChan var memoryMsgChan chan *Message //文件的MsgChan var backendMsgChan chan []byte //订阅的频道 var subChannel *Channel //数据刷新ticker var flusherChan <-chan time.Time //客户端读取数据速率 var sampleRate int32 //client的配置 //client订阅的subChan,方便给subChannel赋值相应的channel subEventChan := client.SubEventChan //鉴权Identify对应的chan,只能鉴权一次 identifyEventChan := client.IdentifyEventChan //flushChan赋值outputBufferTicker,默认是250ms时间间隔Flush一次数据 outputBufferTicker := time.NewTicker(client.OutputBufferTimeout) heartbeatTicker := time.NewTicker(client.HeartbeatInterval) heartbeatChan := heartbeatTicker.C msgTimeout := client.MsgTimeout // v2 opportunistically buffers data to clients to reduce write system calls // we force flush in two cases: // 1. when the client is not ready to receive messages // 2. we're buffered and the channel has nothing left to send us // (ie. we would block in this loop anyway) // flushed := true //已经准备好了messagePump的时候关闭chan,阻塞在读chan的就会执行完 close(startedChan) for { //客户端没有订阅频道,客户端没有准备好读消息,消息Pause,UnPause,消息读取Timeout的时候都有可能触发条件 //从而导致client盲等待 if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... memoryMsgChan = nil backendMsgChan = nil flusherChan = nil // force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true } else if flushed { //上一次遍历的时候flush了 memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } else { memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = outputBufferTicker.C } select { case <-flusherChan: // flush数据 client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true case <-client.ReadyStateChan: //各种条件触发这个 case subChannel = <-subEventChan: // 只能订阅一次频道 subEventChan = nil case identifyData := <-identifyEventChan: // 只能提交一次identify identifyEventChan = nil outputBufferTicker.Stop() if identifyData.OutputBufferTimeout > 0 { outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout) } heartbeatTicker.Stop() heartbeatChan = nil if identifyData.HeartbeatInterval > 0 { heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval) heartbeatChan = heartbeatTicker.C } if identifyData.SampleRate > 0 { sampleRate = identifyData.SampleRate } msgTimeout = identifyData.MsgTimeout case <-heartbeatChan: //心跳 err = p.Send(client, frameTypeResponse, heartbeatBytes) if err != nil { goto exit } case b := <-backendMsgChan: //百分比的速率sampleRate if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg, err := decodeMessage(b) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } msg.Attempts //发送消息之前把消息放入FlightQueue中 subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } exit: p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client) //结束时候关闭心跳和flush同步的ticker heartbeatTicker.Stop() outputBufferTicker.Stop() if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err) } }</code></pre>

看看client如何发送数据的

<pre><code class="language-go">func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error { p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.ID, client, msg.Body) var buf = &bytes.Buffer{} //msg写入到buf _, err := msg.WriteTo(buf) if err != nil { return err } //buf里面写入client,此时并不是走网络Write了,而是先写入client.Writer的buffer里面去了 err = p.Send(client, frameTypeMessage, buf.Bytes()) if err != nil { return err } return nil } func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error { client.writeLock.Lock() var zeroTime time.Time //设置超时时间,心跳的频率和超时时间有关系 if client.HeartbeatInterval > 0 { client.SetWriteDeadline(time.Now().Add(client.HeartbeatInterval)) } else { client.SetWriteDeadline(zeroTime) } //只是写入到buffer里面去了 _, err := protocol.SendFramedResponse(client.Writer, frameType, data) if err != nil { client.writeLock.Unlock() return err } /不是消息类型的时候flush,走网络 if frameType != frameTypeMessage { err = client.Flush() } client.writeLock.Unlock() return err }</code></pre>

 

<ins class="adsbygoogle"/>

转载于:https://my.oschina.net/yang1992/blog/1922561

到此这篇关于“nsq源码channel的messagePump实现原理”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!

您可能感兴趣的文章:
nsq源码channel的messagePump实现原理
CentOS 7下安装nsq
搭建基于consul,registrator,nsq的GO体系Docker开发环境
go 类型 value 不支持索引_Go语言基础(十四)
Golang号称高并发,但高并发时性能不高解决办法
图解Go的channel底层原理
NSQ源码分析(一)——nsqd的初始化及启动流程
golang知识点
Go 语言为什么这么快,带你详细了解Golang CSP并发模型
golang的select实现原理剖析

[关闭]
~ ~