教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 golang nats[3] queue模式

golang nats[3] queue模式

发布时间:2022-01-30   编辑:jiaochengji.com
教程集为您提供golang nats[3] queue模式等资源,欢迎您收藏本站,我们将为您提供最新的golang nats[3] queue模式资源
<h1>队列订阅模式</h1> <blockquote>

此模式中,订阅者要指定两个属性,主题和队列(queue,其实就是队列名称)

</blockquote> <blockquote>

注意:下面所有前提=必须订阅同一个主题

</blockquote> <blockquote>

发布消息后,N个具有同样的主题和queue的订阅者,只有一个会收到消息。(random算法)

</blockquote> <blockquote>

说明:queue=工作组,工作组中有N个worker,发布消息后,同一个工作组中,仅有一个worker会收到消息。

</blockquote> <blockquote>

相同主题,不同queue的订阅者之间,不符合上面的描述。这种情况下,可以把同一个queue的订阅者们,当成一个订阅者来处理,这样就和普通的发布订阅模式一样了。

</blockquote> <blockquote>

主题subj1,queue=q1的订阅者有sub1-q1,sub2-q1,sub3-q1
主题subj1,queue=q2的订阅者有sub1-q2,sub2-q2,sub3-q2
一个主题,两组订阅者,每组订阅者中各有3个订阅者。
对sub1发布消息,q1,q2两个组都会收到消息(发布订阅模式),q1,q2每个组中,分别仅有一个订阅者会收到消息(queue模式)

</blockquote> <h2>server</h2> <pre class="has"><code class="language-Go">package main import ( "github.com/nats-io/go-nats" "log" "flag" ) const ( //url = "nats://192.168.3.125:4222" url = nats.DefaultURL ) var ( nc *nats.Conn err error ) func init() { if nc, err = nats.Connect(url); checkErr(err) { // } } func main() { var ( servername = flag.String("servername", "y", "name for server") queueGroup = flag.String("group", "", "group name for Subscribe") subj = flag.String("subj", "", "subject name") ) flag.Parse() log.Println(*servername, *queueGroup, *subj) startService(*subj, *servername " worker1", *queueGroup) startService(*subj, *servername " worker2", *queueGroup) startService(*subj, *servername " worker3", *queueGroup) select {} } //receive message func startService(subj, name, queue string) { go async(nc, subj, name, queue) } func async(nc *nats.Conn, subj, name, queue string) { nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) { log.Println(name, "Received a message From Async : ", string(msg.Data)) }) } func checkErr(err error) bool { if err != nil { log.Println(err) return false } return true } </code></pre> <h2>client</h2> <pre class="has"><code class="language-Go">package main import ( "github.com/nats-io/go-nats" "log" "strconv" "github.com/pborman/uuid" "flag" "time" ) const ( //url = "nats://192.168.3.125:4222" url = nats.DefaultURL ) var ( nc *nats.Conn err error ) func init() { if nc, err = nats.Connect(url); checkErr(err) { // } } func main() { var ( subj = flag.String("subj", "", "subject name") ) flag.Parse() log.Println(*subj) startClient(*subj) time.Sleep(time.Second) } //send message to server func startClient(subj string) { for i := 0; i < 1; i { id := uuid.New() log.Println(id) nc.Publish(subj, []byte(id " Sun " strconv.Itoa(i))) nc.Publish(subj, []byte(id " Rain " strconv.Itoa(i))) nc.Publish(subj, []byte(id " Fog " strconv.Itoa(i))) nc.Publish(subj, []byte(id " Cloudy " strconv.Itoa(i))) } } func checkErr(err error) bool { if err != nil { log.Println(err) return false } return true } </code></pre> <blockquote>

启动server A queue=g1,订阅主题=weather

</blockquote> <pre class="has"><code>./main -servername=A -group=g1 -subj=weather 2018/08/18 11:32:16 A g1 weather </code></pre> <blockquote>

启动server B queue=g1,订阅主题=weather

</blockquote> <pre class="has"><code>./main -servername=B -group=g1 -subj=weather 2018/08/18 11:32:21 B g1 weather </code></pre> <blockquote>

发送消息

</blockquote> <pre class="has"><code>./main -subj=weather 2018/08/18 11:32:24 weather 2018/08/18 11:32:24 3005ae7c-85ab-42d3-ad09-d44688d129ad </code></pre> <blockquote>

结果 server A收到消息

</blockquote> <pre class="has"><code>2018/08/18 11:32:24 A worker3 Received a message From Async : 3005ae7c-85ab-42d3-ad09-d44688d129ad Rain 0 2018/08/18 11:32:24 A worker2 Received a message From Async : 3005ae7c-85ab-42d3-ad09-d44688d129ad Sun 0 </code></pre> <blockquote>

结果 server B收到消息

</blockquote> <pre class="has"><code>2018/08/18 11:32:24 B worker3 Received a message From Async : 3005ae7c-85ab-42d3-ad09-d44688d129ad Fog 0 2018/08/18 11:32:24 B worker3 Received a message From Async : 3005ae7c-85ab-42d3-ad09-d44688d129ad Cloudy 0 </code></pre> <h2>主题相同,queue不同</h2> <blockquote>

启动server c queue=test,订阅主题=weather

</blockquote> <pre class="has"><code>> ./main -servername=C -group=test -subj=weather 2018/08/18 11:37:43 C test weather </code></pre> <blockquote>

发消息

</blockquote> <pre class="has"><code>./main -subj=weather 2018/08/18 11:37:47 weather 2018/08/18 11:37:47 b4e201dd-ea4a-4ec3-aa45-99489695f0c2 </code></pre> <blockquote>

Server c 收到了全部消息

</blockquote> <pre class="has"><code>2018/08/18 11:37:47 C worker1 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Fog 0 2018/08/18 11:37:47 C worker3 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Sun 0 2018/08/18 11:37:47 C worker3 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Rain 0 2018/08/18 11:37:47 C worker3 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Cloudy 0 </code></pre> <blockquote>

Server A 收到3条消息

</blockquote> <pre class="has"><code>2018/08/18 11:37:47 A worker1 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Rain 0 2018/08/18 11:37:47 A worker3 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Sun 0 2018/08/18 11:37:47 A worker3 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Cloudy 0 </code></pre> <blockquote>

Server B 收到1条消息

</blockquote> <pre class="has"><code>2018/08/18 11:37:47 B worker2 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Fog 0 </code></pre> <h2>总结:queue模式,在分发消息时,进行负载均衡,随机发送给同一组中的任意一个订阅者,可以随时增加删除订阅者,配合响应的监控数据和统计数据,对下游的业务进行自动伸缩。</h2> <blockquote> <h2>提高系统的可用性,避免业务在单点处理导致系统瓶颈。</h2> </blockquote>

栗子:
比如用户登录,对login主题发送消息,积分系统订阅了login主题,收到login的消息后,对用户的积分进行处理。为了保证积分处理的高可用,可以使用相同的queue=score,启动多个积分处理服务。
监控积分业务的处理时间,如果某个积分处理服务,业务执行时间过长(比如由于某些/某类用户的特殊情况,积分算法不同等),造成了消息积压,不能及时处理。

<blockquote>

在积分系统的下游仍有处理能力的时候(比如依赖下游的某个接口,此接口的处理能力依然是正常的),可以自动启动多个积分处理服务,订阅主题login,queue=score,分散计算压力。
如果是下游的处理能力受限,则可能要进行限流处理,不但不能启动多个积分处理服务,还要限制积分业务的处理速度。

</blockquote> 到此这篇关于“golang nats[3] queue模式”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!

您可能感兴趣的文章:
NATS—基础介绍
golang nats[3] queue模式
golang nats[4] request reply模式
Golang号称高并发,但高并发时性能不高解决办法
golang https 全局代理_Golang 调度器
Python如何进行进程间的通信
如何保证消息队列的高可用?
jquery1.83 之前所有与异步列队相关的模块详细介绍
Golang的协程调度
golang runtime 简析

[关闭]
~ ~