教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 golang nats[4] request reply模式

golang nats[4] request reply模式

发布时间:2022-02-24   编辑:jiaochengji.com
教程集为您提供golang nats[4] request reply模式等资源,欢迎您收藏本站,我们将为您提供最新的golang nats[4] request reply模式资源
<h1>请求响应模式</h1> <blockquote>

无论是发布订阅模式还是queue模式,nats都不能保证消息一定发送到订阅方,除非订阅者发送一个响应给发布者。
所以订阅者发送一个回执给发布者,就是请求响应模式。

</blockquote> <h3>这种模式有什么用?</h3> <blockquote>

nats要求订阅者一定要先完成订阅,发布消息后,订阅者才能收到消息,类似离线消息的模式nats不支持。就算先完成订阅,后发送消息,消息发送方也不知道是否有订阅者收到了消息,请求响应模式就是应对这种情况。

</blockquote> <h3>基本流程</h3> <blockquote>

A发送消息,B收到消息,发送回执给A。这就是request reply的基本流程。

</blockquote> <h3>基本实现原理</h3> <blockquote> <ul><li>A启用request模式发送消息(消息中包含了回执信息,replya主题),同步等待回执(有超时时间)。</li><li>B收到消息,在消息中取出回执信息=replay主题,对replay主题,<code>主动</code>发送普通消息(消息内容可自定义,比如server A上的service1收到msgid=xxxx的消息。)。</li><li>A在超时内收到消息,确认结束。</li><li>A在超时内未收到消息,超时结束。</li></ul></blockquote> <h3>注意</h3> <blockquote> <ul><li>因为A发送的消息中包装了回执测相关信息,订阅者B收到消息后,也要主动发送回执,所以请求响应模式,对双方都有影响。</li><li>A发送消息后,等待B的回执,需要给A设置超时时间,超时后,不在等待回执,直接结束,效果和不需要回执的消息发送一样,不在关心是否有订阅者收到消息。</li></ul></blockquote> <h2>两种模式</h2> <blockquote>

request reply有两种模式:

<ul><li>one to one 默认模式</li></ul><blockquote>

1条消息,N个订阅者,消息发送方,仅会收到一条<code>回执记录</code>(因为消息发送方收到回执消息后,就自动断开了对回执消息的订阅。),即使N个订阅都都收到了消息。注意:pub/sub和queue模式的不同

</blockquote> </blockquote> <blockquote> <ul><li>one to many 非默认模式,需要自己实现</li></ul><blockquote>

1条消息,N个订阅者,消息发送方,可以自己设定一个数量限制N,接受到N个回执消息后,断开对回执消息的订阅。

</blockquote> </blockquote> <h1>Server</h1> <pre><code class="lang-go hljs"><code class="go"><span class="hljs-keyword">package main <span class="hljs-keyword">import ( <span class="hljs-string">"github.com/nats-io/go-nats" <span class="hljs-string">"log" <span class="hljs-string">"flag" ) <span class="hljs-keyword">const ( <span class="hljs-comment">//url = "nats://192.168.3.125:4222" url = nats.DefaultURL ) <span class="hljs-keyword">var ( nc *nats.Conn encodeConn *nats.EncodedConn err error ) <span class="hljs-function"><span class="hljs-keyword">func <span class="hljs-title">init<span class="hljs-params">() { <span class="hljs-keyword">if nc, err = nats.Connect(url); checkErr(err) { <span class="hljs-comment">// <span class="hljs-keyword">if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER); checkErr(err) { } } } <span class="hljs-function"><span class="hljs-keyword">func <span class="hljs-title">main<span class="hljs-params">() { <span class="hljs-keyword">var ( servername = flag.String(<span class="hljs-string">"servername", <span class="hljs-string">"Y", <span class="hljs-string">"name for server") queueGroup = flag.String(<span class="hljs-string">"group", <span class="hljs-string">"", <span class="hljs-string">"group name for Subscribe") subj = flag.String(<span class="hljs-string">"subj", <span class="hljs-string">"yasenagat", <span class="hljs-string">"subject name") ) flag.Parse() mode := <span class="hljs-string">"queue" <span class="hljs-keyword">if *queueGroup == <span class="hljs-string">"" { mode = <span class="hljs-string">"pub/sub" } log.Printf(<span class="hljs-string">"Server[%v] Subscribe Subject[%v] in [%v]Mode", *servername, *subj, mode) startService(*subj, *servername <span class="hljs-string">" worker1", *queueGroup) startService(*subj, *servername <span class="hljs-string">" worker2", *queueGroup) startService(*subj, *servername <span class="hljs-string">" worker3", *queueGroup) nc.Flush() <span class="hljs-keyword">select {} } <span class="hljs-comment">//receive message <span class="hljs-function"><span class="hljs-keyword">func <span class="hljs-title">startService<span class="hljs-params">(subj, name, queue <span class="hljs-keyword">string) { <span class="hljs-keyword">go async(nc, subj, name, queue) } <span class="hljs-function"><span class="hljs-keyword">func <span class="hljs-title">async<span class="hljs-params">(nc *nats.Conn, subj, name, queue <span class="hljs-keyword">string) { replyMsg := name <span class="hljs-string">" Received a msg" <span class="hljs-keyword">if queue == <span class="hljs-string">"" { nc.Subscribe(subj, <span class="hljs-function"><span class="hljs-keyword">func<span class="hljs-params">(msg *nats.Msg) { nc.Publish(msg.Reply, []<span class="hljs-keyword">byte(replyMsg)) log.Println(name, <span class="hljs-string">"Received a message From Async : ", <span class="hljs-keyword">string(msg.Data)) }) } <span class="hljs-keyword">else { nc.QueueSubscribe(subj, queue, <span class="hljs-function"><span class="hljs-keyword">func<span class="hljs-params">(msg *nats.Msg) { nc.Publish(msg.Reply, []<span class="hljs-keyword">byte(replyMsg)) log.Println(name, <span class="hljs-string">"Received a message From Async : ", <span class="hljs-keyword">string(msg.Data)) }) } } <span class="hljs-function"><span class="hljs-keyword">func <span class="hljs-title">checkErr<span class="hljs-params">(err error) <span class="hljs-title">bool { <span class="hljs-keyword">if err != <span class="hljs-literal">nil { log.Println(err) <span class="hljs-keyword">return <span class="hljs-literal">false } <span class="hljs-keyword">return <span class="hljs-literal">true } </span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></code></pre> <h1>Client</h1> <pre><code class="lang-go hljs"><code class="go"><span class="hljs-keyword">package main <span class="hljs-keyword">import ( <span class="hljs-string">"github.com/nats-io/go-nats" <span class="hljs-string">"log" <span class="hljs-string">"github.com/pborman/uuid" <span class="hljs-string">"flag" <span class="hljs-string">"time" ) <span class="hljs-keyword">const ( <span class="hljs-comment">//url = "nats://192.168.3.125:4222" url = nats.DefaultURL ) <span class="hljs-keyword">var ( nc *nats.Conn encodeConn *nats.EncodedConn err error ) <span class="hljs-function"><span class="hljs-keyword">func <span class="hljs-title">init<span class="hljs-params">() { <span class="hljs-keyword">if nc, err = nats.Connect(url); checkErr(err, <span class="hljs-function"><span class="hljs-keyword">func<span class="hljs-params">() { }) { <span class="hljs-comment">// <span class="hljs-keyword">if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER); checkErr(err, <span class="hljs-function"><span class="hljs-keyword">func<span class="hljs-params">() { }) { } } } <span class="hljs-function"><span class="hljs-keyword">func <span class="hljs-title">main<span class="hljs-params">() { <span class="hljs-keyword">var ( subj = flag.String(<span class="hljs-string">"subj", <span class="hljs-string">"yasenagat", <span class="hljs-string">"subject name") ) flag.Parse() log.Println(*subj) startClient(*subj) time.Sleep(time.Second) } <span class="hljs-comment">//send message to server <span class="hljs-function"><span class="hljs-keyword">func <span class="hljs-title">startClient<span class="hljs-params">(subj <span class="hljs-keyword">string) { <span class="hljs-keyword">for i := <span class="hljs-number">0; i < <span class="hljs-number">3; i { id := uuid.New() log.Println(id) <span class="hljs-keyword">if msg, err := nc.Request(subj, []<span class="hljs-keyword">byte(id <span class="hljs-string">" hello"), time.Second); checkErr(err, <span class="hljs-function"><span class="hljs-keyword">func<span class="hljs-params">() { <span class="hljs-comment">// handle err }) { log.Println(<span class="hljs-keyword">string(msg.Data)) } } } <span class="hljs-function"><span class="hljs-keyword">func <span class="hljs-title">checkErr<span class="hljs-params">(err error, errFun <span class="hljs-keyword">func()) <span class="hljs-title">bool { <span class="hljs-keyword">if err != <span class="hljs-literal">nil { log.Println(err) errFun() <span class="hljs-keyword">return <span class="hljs-literal">false } <span class="hljs-keyword">return <span class="hljs-literal">true } </span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></code></pre> <blockquote>

pub/sub模式启动

</blockquote> <pre><code class="lang-shell hljs"><code class="shell"><span class="hljs-meta">$<span class="bash"> ./main 2018/08/18 18:54:10 Server[Y] Subscribe Subject[yasenagat] in [pub/sub]Mode 2018/08/18 18:54:26 Y worker2 Received a message From Async : b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello 2018/08/18 18:54:26 Y worker1 Received a message From Async : b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello 2018/08/18 18:54:26 Y worker3 Received a message From Async : b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello 2018/08/18 18:54:26 Y worker2 Received a message From Async : 2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello 2018/08/18 18:54:26 Y worker1 Received a message From Async : 2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello 2018/08/18 18:54:26 Y worker3 Received a message From Async : 2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello 2018/08/18 18:54:26 Y worker2 Received a message From Async : fe9f773a-129b-4919-9bc4-c8a4571fef6e hello 2018/08/18 18:54:26 Y worker1 Received a message From Async : fe9f773a-129b-4919-9bc4-c8a4571fef6e hello 2018/08/18 18:54:26 Y worker3 Received a message From Async : fe9f773a-129b-4919-9bc4-c8a4571fef6e hello </span></span></code></code></pre> <blockquote>

发送消息

</blockquote> <pre><code class="lang-shell hljs"><code class="shell"><span class="hljs-meta">$<span class="bash"> ./main 2018/08/18 18:54:26 yasenagat 2018/08/18 18:54:26 b035d7c2-e7e9-4337-bb8a-a23ec85fc31a 2018/08/18 18:54:26 Y worker3 Received a msg 2018/08/18 18:54:26 2d8dfe75-8fee-4b4c-8599-1824638dfa8c 2018/08/18 18:54:26 Y worker2 Received a msg 2018/08/18 18:54:26 fe9f773a-129b-4919-9bc4-c8a4571fef6e 2018/08/18 18:54:26 Y worker2 Received a msg </span></span></code></code></pre> <blockquote>

queue模式启动

</blockquote> <pre><code class="lang-shell hljs"><code class="shell"><span class="hljs-meta">$<span class="bash"> ./main -group=<span class="hljs-built_in">test 2018/08/18 19:14:31 Server[Y] Subscribe Subject[yasenagat] in [queue]Mode 2018/08/18 19:14:33 Y worker2 Received a message From Async : 4ecf2728-b3a7-4181-893a-aefde3bc8d2e hello Y worker2 Received a msg 2018/08/18 19:14:33 Y worker3 Received a message From Async : 4e7f1363-9a47-4705-b87a-4aaeb80164f0 hello Y worker3 Received a msg 2018/08/18 19:14:33 Y worker2 Received a message From Async : 38b1f74b-8a3b-46ba-a10e-62e50efbc127 hello Y worker2 Received a msg </span></span></span></code></code></pre> <blockquote>

发送消息

</blockquote> <pre><code class="lang-shell hljs"><code class="shell"><span class="hljs-meta">$<span class="bash"> ./main 2018/08/18 19:14:33 yasenagat 2018/08/18 19:14:33 4ecf2728-b3a7-4181-893a-aefde3bc8d2e 2018/08/18 19:14:33 Y worker2 Received a msg 2018/08/18 19:14:33 4e7f1363-9a47-4705-b87a-4aaeb80164f0 2018/08/18 19:14:33 Y worker3 Received a msg 2018/08/18 19:14:33 38b1f74b-8a3b-46ba-a10e-62e50efbc127 2018/08/18 19:14:33 Y worker2 Received a msg </span></span></code></code></pre> <h4>queue模式下,发送3条消息,3个订阅者有相同的queue,每条消息只有一个订阅者收到。</h4> <h4>pub/sub模式下,发送3条消息,3个订阅者都收到3条消息,一共9条。</h4> <h2>总结:</h2> <blockquote>

回执主要解决:订阅者是否收到消息的问题、有多少个订阅者收到消息的问题。(不是具体业务是否执行完成的回执!)
基于事件的架构模式可以构建于消息机制之上,依赖消息机制。异步调用的其中一种实现方式,就是基于事件模式。异步调用又是分布式系统中常见的任务处理方式。

</blockquote> <h2>业务模式</h2> <ul><li>业务A发送eventA给事件中心,<code>等待回执</code> </li><li>事件中心告知A收到了消息,开始对外发送广播</li><li>订阅者B订阅了eventA主题</li><li>事件中心对eventA主题发送广播,<code>等待回执</code> </li><li>B收到消息,告知事件中心,收到eventA,开始执行任务taskA</li><li>B异步执行完taskA,通知事件中心taskAComplete,<code>等待回执</code> </li><li>事件中心发送回执给B,对外发送广播,taskAComplete</li><li>........</li></ul><blockquote>

如果超时,未能收到回执,需要回执信息的确认方可以主动调用相关接口,查询任务执行状态,根据任务状态做后续的处理。

</blockquote>


作者:luckyase
链接:https://www.jianshu.com/p/89f245ec7365
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

转载于:https://www.cnblogs.com/gao88/p/10007752.html

到此这篇关于“golang nats[4] request reply模式”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!

您可能感兴趣的文章:
NATS—基础介绍
golang nats[4] request reply模式
Golang 实现 Redis(6): 实现 pipeline 模式的 redis 客户端
Golang号称高并发,但高并发时性能不高解决办法
[go]ipv6 ping测试
golang nats[3] queue模式
golang版的traceroute实现
golang微服务框架对比_Golang 微服务教程(一)
python调用api接口有几种方式
首个实时全球空中交通监控系统在北大西洋上空全面投入运行和试用

[关闭]
~ ~