教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 golang 实现 简易pub/sub模型

golang 实现 简易pub/sub模型

发布时间:2021-12-28   编辑:jiaochengji.com
教程集为您提供golang 实现 简易pub/sub模型等资源,欢迎您收藏本站,我们将为您提供最新的golang 实现 简易pub/sub模型资源
<svg xmlns="http://www.w3.org/2000/svg" style="display: none;"><path stroke-linecap="round" d="M5,0 0,2.5 5,5z" id="raphael-marker-block" style="-webkit-tap-highlight-color: rgba(0, 0, 0, 0);"/></svg>

一直对Redis里面的pub/sub很好奇,于是用golang做下简单的实现,做个原型,效果如图:

<h2>大体模型</h2>

对pub/sub 模型来说,会有这么两条通路:

<ul><li>subscribe 客户端主动链接,并订阅对应topic,此时状态信息被服务器记录下来</li><li>publish 服务器根据topic主动进行数据推送,推送过程中就用到了subscribe时记录到的客户端连接状态信息</li></ul><h2>
实现</h2>

channel.go

<pre><code class="lang-go hljs"><span class="token keyword">package</span> pubsub <span class="token keyword">import</span> <span class="token punctuation">(</span> <span class="token string">"sync"</span> <span class="token string">"fmt"</span> <span class="token string">"sync/atomic"</span> <span class="token string">"net"</span> <span class="token string">"bufio"</span> <span class="token punctuation">)</span> <span class="token keyword">type</span> Channel <span class="token keyword">struct</span> <span class="token punctuation">{</span> Name <span class="token builtin">string</span> clients <span class="token keyword">map</span><span class="token punctuation">[</span><span class="token builtin">int</span><span class="token punctuation">]</span><span class="token operator">*</span>Client sync<span class="token punctuation">.</span>RWMutex waitGroup sync<span class="token punctuation">.</span>WaitGroup messageCount <span class="token builtin">uint64</span> exitFlag <span class="token builtin">int32</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token function">NewChannel</span><span class="token punctuation">(</span>channelName <span class="token builtin">string</span><span class="token punctuation">)</span> <span class="token operator">*</span>Channel <span class="token punctuation">{</span> <span class="token keyword">return</span> <span class="token operator">&</span>Channel<span class="token punctuation">{</span> Name<span class="token punctuation">:</span>channelName<span class="token punctuation">,</span> clients<span class="token punctuation">:</span><span class="token function">make</span><span class="token punctuation">(</span><span class="token keyword">map</span><span class="token punctuation">[</span><span class="token builtin">int</span><span class="token punctuation">]</span><span class="token operator">*</span>Client<span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">}</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Channel<span class="token punctuation">)</span> <span class="token function">AddClient</span><span class="token punctuation">(</span>client <span class="token operator">*</span>Client<span class="token punctuation">)</span> <span class="token builtin">bool</span> <span class="token punctuation">{</span> this<span class="token punctuation">.</span><span class="token function">RLock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token boolean">_</span><span class="token punctuation">,</span> found <span class="token operator">:=</span> this<span class="token punctuation">.</span>clients<span class="token punctuation">[</span>client<span class="token punctuation">.</span>Id<span class="token punctuation">]</span> this<span class="token punctuation">.</span><span class="token function">RUnlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> this<span class="token punctuation">.</span><span class="token function">Lock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">if</span> <span class="token operator">!</span>found <span class="token punctuation">{</span> this<span class="token punctuation">.</span>clients<span class="token punctuation">[</span>client<span class="token punctuation">.</span>Id<span class="token punctuation">]</span> <span class="token operator">=</span> client <span class="token punctuation">}</span> this<span class="token punctuation">.</span><span class="token function">Unlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">return</span> found <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Channel<span class="token punctuation">)</span> <span class="token function">DeleteClient</span><span class="token punctuation">(</span>client <span class="token operator">*</span>Client<span class="token punctuation">)</span> <span class="token builtin">int</span><span class="token punctuation">{</span> <span class="token keyword">var</span> remain <span class="token builtin">int</span> <span class="token comment">// 整理输出信息</span> this<span class="token punctuation">.</span><span class="token function">Lock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token function">delete</span><span class="token punctuation">(</span>this<span class="token punctuation">.</span>clients<span class="token punctuation">,</span> client<span class="token punctuation">.</span>Id<span class="token punctuation">)</span> this<span class="token punctuation">.</span><span class="token function">Unlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> this<span class="token punctuation">.</span><span class="token function">RLock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> remain <span class="token operator">=</span> <span class="token function">len</span><span class="token punctuation">(</span>this<span class="token punctuation">.</span>clients<span class="token punctuation">)</span> this<span class="token punctuation">.</span><span class="token function">RUnlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">return</span> remain <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Channel<span class="token punctuation">)</span> <span class="token function">Notify</span><span class="token punctuation">(</span>message <span class="token builtin">string</span><span class="token punctuation">)</span> <span class="token builtin">bool</span> <span class="token punctuation">{</span> this<span class="token punctuation">.</span><span class="token function">RLock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">defer</span> this<span class="token punctuation">.</span><span class="token function">RUnlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">for</span> clientid<span class="token punctuation">,</span> client <span class="token operator">:=</span> <span class="token keyword">range</span> this<span class="token punctuation">.</span>clients <span class="token punctuation">{</span> this<span class="token punctuation">.</span><span class="token function">PutMessage</span><span class="token punctuation">(</span>clientid<span class="token punctuation">,</span> message<span class="token punctuation">)</span> <span class="token keyword">go</span> <span class="token function">handleResponse</span><span class="token punctuation">(</span>client<span class="token punctuation">.</span>conn<span class="token punctuation">,</span> message<span class="token punctuation">)</span> <span class="token punctuation">}</span> this<span class="token punctuation">.</span>waitGroup<span class="token punctuation">.</span><span class="token function">Done</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">return</span> <span class="token boolean">true</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token function">handleResponse</span><span class="token punctuation">(</span>conn net<span class="token punctuation">.</span>Conn<span class="token punctuation">,</span> message <span class="token builtin">string</span><span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token comment">// todo alive check</span> fmt<span class="token punctuation">.</span><span class="token function">Println</span><span class="token punctuation">(</span><span class="token string">"handleResponse: "</span><span class="token punctuation">,</span> message<span class="token punctuation">)</span> writter <span class="token operator">:=</span> bufio<span class="token punctuation">.</span><span class="token function">NewWriter</span><span class="token punctuation">(</span>conn<span class="token punctuation">)</span> writter<span class="token punctuation">.</span><span class="token function">WriteString</span><span class="token punctuation">(</span><span class="token string">"[CONSUME MESSAGE] "</span> <span class="token operator"> </span> message<span class="token punctuation">)</span> writter<span class="token punctuation">.</span><span class="token function">Flush</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Channel<span class="token punctuation">)</span> <span class="token function">Wait</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token comment">//this.waitGroup.Wait()</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Channel<span class="token punctuation">)</span> <span class="token function">Exit</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token keyword">if</span> <span class="token operator">!</span>atomic<span class="token punctuation">.</span><span class="token function">CompareAndSwapInt32</span><span class="token punctuation">(</span><span class="token operator">&</span>this<span class="token punctuation">.</span>exitFlag<span class="token punctuation">,</span> <span class="token number">0</span><span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token keyword">return</span> <span class="token punctuation">}</span> this<span class="token punctuation">.</span><span class="token function">Wait</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Channel<span class="token punctuation">)</span> <span class="token function">Exiting</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token builtin">bool</span> <span class="token punctuation">{</span> <span class="token keyword">return</span> atomic<span class="token punctuation">.</span><span class="token function">LoadInt32</span><span class="token punctuation">(</span><span class="token operator">&</span>this<span class="token punctuation">.</span>exitFlag<span class="token punctuation">)</span> <span class="token operator">==</span> <span class="token number">1</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Channel<span class="token punctuation">)</span> <span class="token function">PutMessage</span><span class="token punctuation">(</span>clientid <span class="token builtin">int</span><span class="token punctuation">,</span> message <span class="token builtin">string</span><span class="token punctuation">)</span> <span class="token punctuation">{</span> this<span class="token punctuation">.</span><span class="token function">RLock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">defer</span> this<span class="token punctuation">.</span><span class="token function">RUnlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">if</span> this<span class="token punctuation">.</span><span class="token function">Exiting</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token keyword">return</span> <span class="token punctuation">}</span> fmt<span class="token punctuation">.</span><span class="token function">Println</span><span class="token punctuation">(</span><span class="token string">"{"</span><span class="token punctuation">,</span> this<span class="token punctuation">.</span>Name<span class="token punctuation">,</span> <span class="token string">"}["</span><span class="token punctuation">,</span> clientid<span class="token punctuation">,</span> <span class="token string">"] "</span><span class="token punctuation">,</span> message<span class="token punctuation">)</span> atomic<span class="token punctuation">.</span><span class="token function">AddUint64</span><span class="token punctuation">(</span><span class="token operator">&</span>this<span class="token punctuation">.</span>messageCount<span class="token punctuation">,</span> <span class="token number">1</span><span class="token punctuation">)</span> <span class="token keyword">return</span> <span class="token punctuation">}</span> </code></pre>

** server.go**

<pre><code class="lang-go hljs"><span class="token keyword">package</span> pubsub <span class="token keyword">import</span> <span class="token punctuation">(</span> <span class="token string">"sync"</span> <span class="token string">"net"</span> <span class="token string">"strconv"</span> <span class="token string">"bufio"</span> <span class="token string">"fmt"</span> <span class="token string">"strings"</span> <span class="token punctuation">)</span> <span class="token keyword">type</span> Client <span class="token keyword">struct</span> <span class="token punctuation">{</span> Id <span class="token builtin">int</span> Ip <span class="token builtin">string</span> conn net<span class="token punctuation">.</span>Conn <span class="token punctuation">}</span> <span class="token keyword">type</span> Server <span class="token keyword">struct</span> <span class="token punctuation">{</span> <span class="token comment">// map[Channel.Name]*Channel 一个channel会被很多个client给subscribe了</span> Bucket <span class="token keyword">map</span><span class="token punctuation">[</span><span class="token builtin">string</span><span class="token punctuation">]</span><span class="token operator">*</span>Channel sync<span class="token punctuation">.</span>RWMutex <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token function">NewServer</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">*</span>Server <span class="token punctuation">{</span> <span class="token keyword">return</span> <span class="token operator">&</span>Server<span class="token punctuation">{</span> Bucket<span class="token punctuation">:</span><span class="token function">make</span><span class="token punctuation">(</span><span class="token keyword">map</span><span class="token punctuation">[</span><span class="token builtin">string</span><span class="token punctuation">]</span><span class="token operator">*</span>Channel<span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token punctuation">}</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Server<span class="token punctuation">)</span> <span class="token function">Run</span><span class="token punctuation">(</span>host <span class="token builtin">string</span><span class="token punctuation">,</span> port <span class="token builtin">int</span><span class="token punctuation">)</span> <span class="token builtin">error</span> <span class="token punctuation">{</span> address <span class="token operator">:=</span> host <span class="token operator"> </span> <span class="token string">":"</span> <span class="token operator"> </span> strconv<span class="token punctuation">.</span><span class="token function">Itoa</span><span class="token punctuation">(</span>port<span class="token punctuation">)</span> listener<span class="token punctuation">,</span> err <span class="token operator">:=</span> net<span class="token punctuation">.</span><span class="token function">Listen</span><span class="token punctuation">(</span><span class="token string">"tcp"</span><span class="token punctuation">,</span> address<span class="token punctuation">)</span> <span class="token keyword">if</span> err <span class="token operator">!=</span> <span class="token boolean">nil</span> <span class="token punctuation">{</span> <span class="token keyword">return</span> err <span class="token punctuation">}</span> <span class="token keyword">for</span> <span class="token punctuation">{</span> conn<span class="token punctuation">,</span> err <span class="token operator">:=</span> listener<span class="token punctuation">.</span><span class="token function">Accept</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">if</span> err <span class="token operator">!=</span> <span class="token boolean">nil</span> <span class="token punctuation">{</span> <span class="token keyword">continue</span> <span class="token punctuation">}</span> <span class="token keyword">go</span> this<span class="token punctuation">.</span><span class="token function">HandleRequest</span><span class="token punctuation">(</span>conn<span class="token punctuation">)</span> <span class="token punctuation">}</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Server<span class="token punctuation">)</span> <span class="token function">HandleRequest</span><span class="token punctuation">(</span>conn net<span class="token punctuation">.</span>Conn<span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token comment">//defer conn.Close()</span> <span class="token keyword">for</span> <span class="token punctuation">{</span> bytes<span class="token punctuation">,</span> <span class="token boolean">_</span><span class="token punctuation">,</span> <span class="token boolean">_</span> <span class="token operator">:=</span> bufio<span class="token punctuation">.</span><span class="token function">NewReader</span><span class="token punctuation">(</span>conn<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">ReadLine</span><span class="token punctuation">(</span><span class="token punctuation">)</span> content <span class="token operator">:=</span> strings<span class="token punctuation">.</span><span class="token function">Trim</span><span class="token punctuation">(</span>strings<span class="token punctuation">.</span><span class="token function">Trim</span><span class="token punctuation">(</span><span class="token function">string</span><span class="token punctuation">(</span>bytes<span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token string">" "</span><span class="token punctuation">)</span><span class="token punctuation">,</span> <span class="token string">"\n"</span><span class="token punctuation">)</span> fmt<span class="token punctuation">.</span><span class="token function">Println</span><span class="token punctuation">(</span>fmt<span class="token punctuation">.</span><span class="token function">Sprintf</span><span class="token punctuation">(</span><span class="token string">"request string: [%s]"</span><span class="token punctuation">,</span> content<span class="token punctuation">)</span><span class="token punctuation">)</span> writter <span class="token operator">:=</span> bufio<span class="token punctuation">.</span><span class="token function">NewWriter</span><span class="token punctuation">(</span>conn<span class="token punctuation">)</span> <span class="token keyword">if</span> content <span class="token operator">==</span> <span class="token string">"subscribe"</span> <span class="token punctuation">{</span> address <span class="token operator">:=</span> conn<span class="token punctuation">.</span><span class="token function">RemoteAddr</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">String</span><span class="token punctuation">(</span><span class="token punctuation">)</span> splits <span class="token operator">:=</span> strings<span class="token punctuation">.</span><span class="token function">Split</span><span class="token punctuation">(</span>address<span class="token punctuation">,</span> <span class="token string">":"</span><span class="token punctuation">)</span> clientid<span class="token punctuation">,</span> <span class="token boolean">_</span> <span class="token operator">:=</span> strconv<span class="token punctuation">.</span><span class="token function">Atoi</span><span class="token punctuation">(</span>splits<span class="token punctuation">[</span><span class="token number">1</span><span class="token punctuation">]</span><span class="token punctuation">)</span> client <span class="token operator">:=</span> <span class="token operator">&</span>Client<span class="token punctuation">{</span> Id<span class="token punctuation">:</span> clientid<span class="token punctuation">,</span> Ip<span class="token punctuation">:</span> splits<span class="token punctuation">[</span><span class="token number">0</span><span class="token punctuation">]</span><span class="token punctuation">,</span> conn<span class="token punctuation">:</span> conn<span class="token punctuation">,</span> <span class="token punctuation">}</span> topic <span class="token operator">:=</span> <span class="token string">"hello"</span> this<span class="token punctuation">.</span><span class="token function">Subscribe</span><span class="token punctuation">(</span>client<span class="token punctuation">,</span> topic<span class="token punctuation">)</span> writter<span class="token punctuation">.</span><span class="token function">WriteString</span><span class="token punctuation">(</span>address<span class="token punctuation">)</span> <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token keyword">if</span> content <span class="token operator">==</span> <span class="token string">"publish"</span> <span class="token punctuation">{</span> message <span class="token operator">:=</span> <span class="token string">"PUBLISH MESSAGE"</span> topic <span class="token operator">:=</span> <span class="token string">"hello"</span> this<span class="token punctuation">.</span><span class="token function">PublishMessahe</span><span class="token punctuation">(</span>topic<span class="token punctuation">,</span> message<span class="token punctuation">)</span> <span class="token punctuation">}</span><span class="token keyword">else</span> <span class="token keyword">if</span> content <span class="token operator">==</span> <span class="token string">"quit"</span> <span class="token punctuation">{</span> content <span class="token operator">=</span> <span class="token string">"client quited"</span> <span class="token keyword">break</span> <span class="token punctuation">}</span><span class="token keyword">else</span> <span class="token punctuation">{</span> fmt<span class="token punctuation">.</span><span class="token function">Println</span><span class="token punctuation">(</span><span class="token string">"common chat "</span> <span class="token operator"> </span> content<span class="token punctuation">)</span> writter<span class="token punctuation">.</span><span class="token function">WriteString</span><span class="token punctuation">(</span>content <span class="token operator"> </span> <span class="token string">"\n"</span><span class="token punctuation">)</span> writter<span class="token punctuation">.</span><span class="token function">Flush</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">}</span> writter<span class="token punctuation">.</span><span class="token function">WriteString</span><span class="token punctuation">(</span>content <span class="token operator"> </span> <span class="token string">"\n"</span><span class="token punctuation">)</span> <span class="token punctuation">}</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Server<span class="token punctuation">)</span><span class="token function">Subscribe</span><span class="token punctuation">(</span>client <span class="token operator">*</span>Client<span class="token punctuation">,</span> channelName <span class="token builtin">string</span><span class="token punctuation">)</span> <span class="token punctuation">{</span> this<span class="token punctuation">.</span><span class="token function">RLock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> channel<span class="token punctuation">,</span> found <span class="token operator">:=</span> this<span class="token punctuation">.</span>Bucket<span class="token punctuation">[</span>channelName<span class="token punctuation">]</span> this<span class="token punctuation">.</span><span class="token function">RUnlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">if</span> found <span class="token punctuation">{</span> channel<span class="token punctuation">.</span><span class="token function">AddClient</span><span class="token punctuation">(</span>client<span class="token punctuation">)</span> <span class="token punctuation">}</span><span class="token keyword">else</span><span class="token punctuation">{</span> <span class="token comment">// create a new channel, add this client</span> newchannel <span class="token operator">:=</span> <span class="token function">NewChannel</span><span class="token punctuation">(</span>channelName<span class="token punctuation">)</span> newchannel<span class="token punctuation">.</span><span class="token function">AddClient</span><span class="token punctuation">(</span>client<span class="token punctuation">)</span> this<span class="token punctuation">.</span><span class="token function">Lock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> this<span class="token punctuation">.</span>Bucket<span class="token punctuation">[</span>channelName<span class="token punctuation">]</span> <span class="token operator">=</span> newchannel this<span class="token punctuation">.</span><span class="token function">Unlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">}</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Server<span class="token punctuation">)</span> <span class="token function">Ubsubscribe</span><span class="token punctuation">(</span>client <span class="token operator">*</span>Client<span class="token punctuation">,</span> channelName <span class="token builtin">string</span><span class="token punctuation">)</span> <span class="token punctuation">{</span> this<span class="token punctuation">.</span><span class="token function">RLock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> channel<span class="token punctuation">,</span> found <span class="token operator">:=</span> this<span class="token punctuation">.</span>Bucket<span class="token punctuation">[</span>channelName<span class="token punctuation">]</span> this<span class="token punctuation">.</span><span class="token function">RUnlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">if</span> found <span class="token punctuation">{</span> remain <span class="token operator">:=</span> channel<span class="token punctuation">.</span><span class="token function">DeleteClient</span><span class="token punctuation">(</span>client<span class="token punctuation">)</span> <span class="token keyword">if</span> remain <span class="token operator">==</span> <span class="token number">0</span> <span class="token punctuation">{</span> channel<span class="token punctuation">.</span><span class="token function">Exit</span><span class="token punctuation">(</span><span class="token punctuation">)</span> this<span class="token punctuation">.</span><span class="token function">Lock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token function">delete</span><span class="token punctuation">(</span>this<span class="token punctuation">.</span>Bucket<span class="token punctuation">,</span> channelName<span class="token punctuation">)</span> this<span class="token punctuation">.</span><span class="token function">Unlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">}</span> <span class="token punctuation">}</span> <span class="token punctuation">}</span> <span class="token keyword">func</span> <span class="token punctuation">(</span>this <span class="token operator">*</span>Server<span class="token punctuation">)</span> <span class="token function">PublishMessahe</span><span class="token punctuation">(</span>channelName<span class="token punctuation">,</span> message <span class="token builtin">string</span><span class="token punctuation">)</span> <span class="token punctuation">(</span><span class="token builtin">bool</span><span class="token punctuation">,</span> <span class="token builtin">string</span><span class="token punctuation">)</span> <span class="token punctuation">{</span> this<span class="token punctuation">.</span><span class="token function">RLock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> channel<span class="token punctuation">,</span> found <span class="token operator">:=</span> this<span class="token punctuation">.</span>Bucket<span class="token punctuation">[</span>channelName<span class="token punctuation">]</span> <span class="token keyword">defer</span> this<span class="token punctuation">.</span><span class="token function">RUnlock</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">if</span> <span class="token operator">!</span>found <span class="token punctuation">{</span> <span class="token keyword">return</span> <span class="token boolean">false</span><span class="token punctuation">,</span> <span class="token string">"channelName 不存在"</span> <span class="token punctuation">}</span> channel<span class="token punctuation">.</span>waitGroup<span class="token punctuation">.</span><span class="token function">Add</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">)</span> <span class="token keyword">go</span> channel<span class="token punctuation">.</span><span class="token function">Notify</span><span class="token punctuation">(</span>message<span class="token punctuation">)</span> channel<span class="token punctuation">.</span>waitGroup<span class="token punctuation">.</span><span class="token function">Wait</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">return</span> <span class="token boolean">true</span><span class="token punctuation">,</span> <span class="token string">""</span> <span class="token punctuation">}</span> </code></pre>

main.go

<pre><code class="lang-go hljs"><span class="token keyword">package</span> main <span class="token keyword">import</span> <span class="token punctuation">(</span> <span class="token string">"github.com/guoruibiao/pubsub"</span> <span class="token string">"log"</span> <span class="token string">"fmt"</span> <span class="token punctuation">)</span> <span class="token keyword">func</span> <span class="token function">main</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span> server <span class="token operator">:=</span> pubsub<span class="token punctuation">.</span><span class="token function">NewServer</span><span class="token punctuation">(</span><span class="token punctuation">)</span> fmt<span class="token punctuation">.</span><span class="token function">Println</span><span class="token punctuation">(</span><span class="token string">"server running..."</span><span class="token punctuation">)</span> err <span class="token operator">:=</span> server<span class="token punctuation">.</span><span class="token function">Run</span><span class="token punctuation">(</span><span class="token string">"localhost"</span><span class="token punctuation">,</span> <span class="token number">8080</span><span class="token punctuation">)</span> <span class="token keyword">if</span> err <span class="token operator">!=</span> <span class="token boolean">nil</span> <span class="token punctuation">{</span> log<span class="token punctuation">.</span><span class="token function">Fatal</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token punctuation">}</span> <span class="token punctuation">}</span> </code></pre> <h2>
测试步骤</h2>

首先肯定是先把服务跑起来啦,如下:

<pre><code>$ go run main.go server running... </code></pre>

然后是客户端链接测试,因为底层实现是用的tcp链接,所以可以借助netcat,这样就不用单独再写golang的客户端连接了。

打开第一个终端, 输入subscribe

<pre><code>nc localhost 8080 subscribe </code></pre>

打开第二个终端,输入subscribe

<pre><code>nc localhost 8080 subscribe </code></pre>

打开第三个终端,输入publish

<pre><code>nc localhost 8080 publish </code></pre>

就可以看到,第一、第二个终端有来自服务器的publish数据推送了,具体可以查看上面的GIF图。

<h2>
TODO</h2> <ul><li class="task-list-item"> 添加flag库,以支持命令行任意数据的发送</li><li class="task-list-item"> 检测客户端是否alive,以避免对链接关闭了的tcp链接触发“写”操作。</li><li class="task-list-item"> 使用channel,减少对net.Conn对象的分散调用。</li></ul><h2>参考链接</h2>

简单的订阅发布机制实现(Golang)

到此这篇关于“golang 实现 简易pub/sub模型”的文章就介绍到这了,更多文章或继续浏览下面的相关文章,希望大家以后多多支持JQ教程网!

您可能感兴趣的文章:
Centrifugo —— 用 Golang 实现的实时消息通信平台
javascript与发布/订阅模式
PHP的FTP学习(一)
golang 订阅发布机制实现
golang物联网_使用golang谷歌云平台和grafana监控物联网设备
golang 面试题(四)go的组合继承
golang nats[4] request reply模式
Golang设计模式之建造者模式
golang goroutine 通知_深入golang之---goroutine并发控制与通信
关于Golang的那些事(六) -- 接口

[关闭]
~ ~