教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 nsqd解析:nsqd-channel

nsqd解析:nsqd-channel

发布时间:2021-05-13   编辑:jiaochengji.com
教程集为您提供nsqd解析:nsqd-channel等资源,欢迎您收藏本站,我们将为您提供最新的nsqd解析:nsqd-channel资源

nsqd.Channel 解析

  Channel 是消费者订阅特定 Topic 的一种抽象.对于发往 Topic 的消息,nsqd 向该 Topic 下的所有 Channel 投递消息,而同一个 Channel 只投递一次,Channel 下如果存在多个消费者,则随机选择一个消费者做投递.这种投递方式可以被用作消费者负载均衡.

  Channel 从属于特定 Topic,可以认为是 Topic 的下一级.在同一个 Topic 之下可以有零个或多个 Channel.和 Topic 一样,Channel 同样有永久和临时之分,永久的 Channel 只能通过显式删除销毁,临时的Channel 在最后一个消费者断开连接的时候被销毁.

  与服务于生产者的 Topic 不同,Channel 直接面向消费者

channel struct

先看看 channel 的内部结构

type Channel struct {
	// 放在前面保证 32 位系统对其
	requeueCount uint64 // 重新投递消息数
	messageCount uint64 // topic 发送过来消息数
	timeoutCount uint64 // 超时数

	sync.RWMutex

	topicName string   // topic name
	name      string   // channel name
	ctx       *context // 上下文

	backend BackendQueue // 存盘机制

	memoryMsgChan chan *Message // 消息队列
	exitFlag      int32         // 是否退出
	exitMutex     sync.RWMutex  // 用来对 exitFlag 加锁

	// state tracking
	clients        map[int64]Consumer // 所有客户端
	paused         int32              // 是否暂停
	ephemeral      bool               // 是否是临时 channel
	deleteCallback func(*Channel)     // 删除回调函数
	deleter        sync.Once          // 保证删除回调函数仅执行一次

	// Stats tracking
	e2eProcessingLatencyStream *quantile.Quantile

	// 延迟队列相关的三个字段
	deferredMessages map[MessageID]*pqueue.Item
	deferredPQ       pqueue.PriorityQueue
	deferredMutex    sync.Mutex
	// 待确认队列相关的三个字段
	inFlightMessages map[MessageID]*Message
	inFlightPQ       inFlightPqueue
	inFlightMutex    sync.Mutex
}

  在 channel 中存在两个优先队列,一个待确认队列,一个延迟队列.每个队列均由一个 Mutex,一个优先队列,一个 map 组成.其中 Mutex 保护 map 的读写,优先队列进行排序,排队的 key 分别是待确认过期时间以及延迟发送时间,map 则用来根据优先队列获取的节点来快速获取 map 信息.

NewChannel 操作

创建 channel 的函数,调用 initPQ 初始化两个队列:

// 创建一个 channel 并返回对应指针
func NewChannel(topicName string, channelName string, ctx *context,
	deleteCallback func(*Channel)) *Channel {

	c := &Channel{
		topicName:      topicName,
		name:           channelName,
		memoryMsgChan:  nil,
		clients:        make(map[int64]Consumer),
		deleteCallback: deleteCallback,
		ctx:            ctx,
	}
	// 消息队列,默认 10000
	if ctx.nsqd.getOpts().MemQueueSize > 0 {
		c.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
	}
	// TODO
	if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
		c.e2eProcessingLatencyStream = quantile.New(
			ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime,
			ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles,
		)
	}
	// 初始化延迟队列和待确认队列
	c.initPQ()
	// 判断是不是暂时 channel,选择消息存盘方式
	if strings.HasSuffix(channelName, "#ephemeral") {
		c.ephemeral = true
		c.backend = newDummyBackendQueue()
	} else {
		dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
			opts := ctx.nsqd.getOpts()
			lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
		}
		// backend names, for uniqueness, automatically include the topic...
		backendName := getBackendName(topicName, channelName)
		c.backend = diskqueue.New(
			backendName,
			ctx.nsqd.getOpts().DataPath,
			ctx.nsqd.getOpts().MaxBytesPerFile,
			int32(minValidMsgLength),
			int32(ctx.nsqd.getOpts().MaxMsgSize) minValidMsgLength,
			ctx.nsqd.getOpts().SyncEvery,
			ctx.nsqd.getOpts().SyncTimeout,
			dqLogf,
		)
	}
	// 通知 lookupd 创建了 channel
	c.ctx.nsqd.Notify(c)

	return c
}

  整个看下来,channel 好像和 topic 创建十分类似,区别仅仅就只有 E2EProcessingLatencyPercentiles 参数的处理和 c.initPQ(),前者暂时不看,先看看后者:

// 初始化两个队列,均使用优先队列
func (c *Channel) initPQ() {
	pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)/10))
	c.inFlightMutex.Lock()
	// 代表正在投递但是还没有投递成功的消息
	c.inFlightMessages = make(map[MessageID]*Message)
	c.inFlightPQ = newInFlightPqueue(pqSize)
	c.inFlightMutex.Unlock()

	c.deferredMutex.Lock()
	// 延迟消息和投递失败的消息
	c.deferredMessages = make(map[MessageID]*pqueue.Item)
	c.deferredPQ = pqueue.New(pqSize)
	c.deferredMutex.Unlock()
}
// 消息定义
const (
	MsgIDLength       = 16    // ID 长度
	minValidMsgLength = MsgIDLength   8   2 // Timestamp   Attempts
)
type MessageID [MsgIDLength]byte

  每个 channel 维持两个队列,一个发送待确认队列,一个延迟发送队列.二者均使用优先队列进行实现,且存储消息使用的 map 以 msgID 作为 key.二者的实现也不一样,后面慢慢讲

channel 的删除和关闭

和 topic 基本类似,没啥好说的,注意一下队列的清除即可.

// 删除 chanel
func (c *Channel) Delete() error {
	return c.exit(true)
}

// 关闭 channel
func (c *Channel) Close() error {
	return c.exit(false)
}

// 删除或关闭 channel
func (c *Channel) exit(deleted bool) error {
	c.exitMutex.Lock()
	defer c.exitMutex.Unlock()
	// 设置不可用状态
	if !atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
		return errors.New("exiting")
	}

	if deleted {
		c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name)

		// 通知 lookdup
		c.ctx.nsqd.Notify(c)
	} else {
		c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name)
	}

	c.RLock()
	// 所有关联客户端均关闭
	for _, client := range c.clients {
		client.Close()
	}
	c.RUnlock()

	if deleted {
		// 删除的话需要删除所有未发送消息
		c.Empty()
		return c.backend.Delete()
	}

	// 仅关闭的话,将所有消息放入磁盘中
	c.flush()
	return c.backend.Close()
}

// 清空所有队列中的所有消息
func (c *Channel) Empty() error {
	c.Lock()
	defer c.Unlock()
	// 重置两个队列
	c.initPQ()
	// 清空所有客户端
	for _, client := range c.clients {
		client.Empty()
	}
	// 清空所有未接收消息
	for {
		select {
		case <-c.memoryMsgChan:
		default:
			goto finish
		}
	}

finish:
	// 最后把磁盘中的消息清空
	return c.backend.Empty()
}

// 将消息全部落盘
func (c *Channel) flush() error {
	var msgBuf bytes.Buffer

	if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
		...log...
	}
	// 队列中的消息落盘
	for {
		select {
		case msg := <-c.memoryMsgChan:
			err := writeMessageToBackend(&msgBuf, msg, c.backend)
			if err != nil {
				c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
			}
		default:
			goto finish
		}
	}

finish:
	c.inFlightMutex.Lock()
	// 存盘待确认的消息
	for _, msg := range c.inFlightMessages {
		err := writeMessageToBackend(&msgBuf, msg, c.backend)
		if err != nil {
			c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
		}
	}
	c.inFlightMutex.Unlock()

	c.deferredMutex.Lock()
	// 存盘延时消息
	for _, item := range c.deferredMessages {
		msg := item.Value.(*Message)
		err := writeMessageToBackend(&msgBuf, msg, c.backend)
		if err != nil {
			c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
		}
	}
	c.deferredMutex.Unlock()

	return nil
}

put 操作

  既然通过 channel 把消息从 topic 送到 client 中,那么问题来了,channel 是怎么和 topic 联通的,以及是怎么把消息发送给 client 的?
  先看看是怎么和 topic 联通的,上一小节可以看到 topic 中通过 channel.PutMessage(chanMsg) 和 channel.PutMessageDeferred(chanMsg, chanMsg.deferred) 将消息发送给 channel,下面就来碰碰这两个函数:

// 向 channel 中投递一条消息,在 topic 的消息循环中投递
func (c *Channel) PutMessage(m *Message) error {
	c.RLock()
	defer c.RUnlock()
	// channel 是否可用
	if c.Exiting() {
		return errors.New("exiting")
	}
	// 调用 channel.put(msg)
	err := c.put(m)
	if err != nil {
		return err
	}
	// 记录 msgCount
	atomic.AddUint64(&c.messageCount, 1)
	return nil
}
// 将消息推送出去
func (c *Channel) put(m *Message) error {
	select {
	// 存入 channel 的消息队列中
	case c.memoryMsgChan <- m:
	default:
		// 消息队列满了就存入 backend 队列中
		b := bufferPoolGet()
		// 存入磁盘
		err := writeMessageToBackend(b, m, c.backend)
		bufferPoolPut(b)
		c.ctx.nsqd.SetHealth(err)
		if err != nil {
			...log...
			return err
		}
	}
	return nil
}
// 准备发送延迟消息,先添加到延时队列,时间到再 put
func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
	atomic.AddUint64(&c.messageCount, 1)
	c.StartDeferredTimeout(msg, timeout)
}
// 准备添加延迟消息
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
	// 获取消息延迟推送的目标时间
	absTs := time.Now().Add(timeout).UnixNano()
	// 包装成队列的一个节点
	item := &pqueue.Item{Value: msg, Priority: absTs}
	// 将消息存入 deferredMessage
	err := c.pushDeferredMessage(item)
	if err != nil {
		return err
	}
	// 将消添加到 deferredPQ 优先队列中,进行等待发送
	c.addToDeferredPQ(item)
	return nil
}
// 向 deferredMessages 中存入一条消息
func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
	c.deferredMutex.Lock()
	// TODO: these map lookups are costly
	id := item.Value.(*Message).ID
	_, ok := c.deferredMessages[id]
	if ok {
		c.deferredMutex.Unlock()
		return errors.New("ID already deferred")
	}
	// 加锁存入
	c.deferredMessages[id] = item
	c.deferredMutex.Unlock()
	return nil
}
// 添加到 deferredPQ 中
func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
	c.deferredMutex.Lock()
	heap.Push(&c.deferredPQ, item)
	c.deferredMutex.Unlock()
}

  channel 也是通过 put 的操作将消息放到消息缓冲通道中,现在已经知道 channel 中的消息是 topic put 进来的,这里有两种类型的消息:

  1. 正常的消息直接添加到消息队列缓冲区或者通过 backend 落地
  2. 延时消息会通过 PutMessageDeferred -> StartDeferredTimeout -> pushDeferredMessage 添加到 channel 的延时队列中

向延迟队列的添加需要两步,分别将 msg 加入到 c.deferredMessages 中和 c.deferredPQ 中即可

  这样的话,我们已经了解了,channel 对从 topic 发来的消息的处理,接下来看看 channel 到底是怎么把这两种消息发送给客户端的.

正常消息的发送

  channel 接收到正常消息会直接存放到消息缓冲区中,是否还记得在 nsqd 中我们启动了 queueScanLoop 来对所有的 channel 进行扫描,判断是否有消息需要进行传递:
  在 queueScanWorker 中会对每个待扫描的 channel 执行 processInFlightQueue, processDeferredQueue 两个函数.分别处理正常消息和延迟消息.这里我们先看一下正常消息的发送流程:

// 在 NSQD 协程中进行处理,传入当前时间,处理 channel 中的待发送消息
func (c *Channel) processInFlightQueue(t int64) bool {
	c.exitMutex.RLock()
	defer c.exitMutex.RUnlock()

	if c.Exiting() {
		return false
	}
	// dirty = 是否有新消息
	dirty := false
	for {
		c.inFlightMutex.Lock()
		// 判断待确认的消息是否超时,获取等待确认超时的一条消息
		msg, _ := c.inFlightPQ.PeekAndShift(t)
		c.inFlightMutex.Unlock()

		if msg == nil {
			// 没有消息等待确认超时,就不需要处理待确认队列
			goto exit
		}
		dirty = true
		// 清空到时消息
		_, err := c.popInFlightMessage(msg.clientID, msg.ID)
		if err != nil {
			goto exit
		}
		// 这条消息超时没有确认,记录超时数据条数
		atomic.AddUint64(&c.timeoutCount, 1)
		c.RLock()
		client, ok := c.clients[msg.clientID]
		c.RUnlock()
		if ok {
			// 消息超时后的客户端的处理
			client.TimedOutMessage()
		}
		// 重新投递消息,调用 put 表示立即发送
		c.put(msg)
	}

exit:
	return dirty
}

  这就纳闷了,怎么是待确认的消息超时处理呢?消息咋还没发给客户端就待确认了呢?其实 channel 就是把消息存放到消息缓冲区中,谁抢到就是谁的,那么客户端怎么抢呢?看一下 memoryMsgChan 就知道,原来是 client 中有一段代码如下:

	for;;{
		case msg := <-memoryMsgChan:  // 接收消息
			...

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			...
		case b := <-backendMsgChan:  // 从存盘队列接收消息
			...类似上面...
	}

  这个 channel 的所有客户端均等待从 memoryMsgChan 中获取消息,每条消息谁抢到谁执行.我们暂时只关心抢到消息的 client 会执行 subChannel.StartInFlightTimeout(),里面传入的 msgTimeout 就是这个消息等待客户端确认的时间,然后调用 SendMessage() 把消息发送给客户端.

// 推送一条消息给客户端,等待确认,timeout 是确认超时时间
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
	now := time.Now()
	msg.clientID = clientID
	// 消息发送时间
	msg.deliveryTS = now
	// 等待确认超时时间,在优先队列中按照它进行排序的
	msg.pri = now.Add(timeout).UnixNano()
	// 加到 InFlightMessage 中
	err := c.pushInFlightMessage(msg)
	if err != nil {
		return err
	}
	// 记录进优先队列
	c.addToInFlightPQ(msg)
	return nil
}
// 在 inFlightMessages 中存入一条消息
func (c *Channel) pushInFlightMessage(msg *Message) error {
	c.inFlightMutex.Lock()
	_, ok := c.inFlightMessages[msg.ID]
	if ok {
		c.inFlightMutex.Unlock()
		return errors.New("ID already in flight")
	}
	c.inFlightMessages[msg.ID] = msg
	c.inFlightMutex.Unlock()
	return nil
}
// 添加到 inFlightPQ 中,等待确认或超时
func (c *Channel) addToInFlightPQ(msg *Message) {
	c NSQ源码分析(一)——nsqd的初始化及启动流程
nsqd解析:nsqd-channel
CentOS 7下安装nsq
go 类型 value 不支持索引_Go语言基础(十四)
ubuntu环境下homestead安装运行nsq
CentOS 7 安装配置NSQ
搭建基于consul,registrator,nsq的GO体系Docker开发环境
golang nsq消费者时间过长,防止与nsqd服务断开连接
【文末有惊喜!】一文读懂golang channel
mac安装nsq随手记

[关闭]
~ ~