java中RabbitMQ集群使用方法简单介绍
RabbitMQ是一个很受欢迎的消息中间件,通过它可以很方便地实现异构子系统之间的通讯,还可以将不同子系统之间进行解耦。它用erlang开发,基本上是实现了AMQP 1.0标准的消息协议。
了解RabbitMQ首先要了解以下一些概念:Message,Producer、Exchange、Queue、Consumer
Message是一些简单的字符串, Producer(Publisher)是实际发布消息的角色
Queue,是实际存放消息的地方。顾名思义,消息从Queue一端放入,另一段由Consumer(Subscriber)取出,如果有多Consumer,每个consumer各自取出不同的消息进行处理。
当Producer发布消息的时候,首先是发布到Exchange,然后RabbitMQ根据Exchange的类型和逻辑来判断应该发送到哪个Queue中。所以Queue必须bind到特定的Exchange上才能获取到消息,绑定的时候可以提供一个routing_key来判断选择什么消息,Publisher在发出信息的时候就可以指定不同的routing_key来选择如何分发消息。
当使用最基本的队列模式的时候,可以不指定exchange,这时候会使用默认exchange来进行消息的发送。
Exchange和Queue都有自己的名字,多个Publisher可以发布到同一个Exchange,多个Consumer也可以订阅到同一个Queue。
RabbitMQ支持的Exchange方式有:
direct 直接投递
fanout 广播投递
topic 可以按照一个topic名字的模式进行匹配routing_key,例如topic.*可以匹配topic.paragraph和topic.paragraph.word,而topic.#
rpc Producer可以等待Consumer处理消息结束并把结果返回给Producer
Ruby下可以使用基于EventMachine的异步客户端amqp,或者是同步模式的bunny和carrot。
用bunny,以topic订阅为例:
publisher部分:
<table width="620" align="center" border="0" cellpadding="1" cellspacing="1" style="background:#FB7"> <tr> <td width="464" height="27" bgcolor="#FFE7CE"> 代码如下</td> <td width="109" align="center" bgcolor="#FFE7CE" style="cursor:pointer;" onclick="doCopy('copy6405')">复制代码</td> </tr> <tr> <td height="auto" colspan="2" valign="top" bgcolor="#FFFFFF" style="padding:10px;" class="copyclass" id=copy6405>#!/usr/bin/env ruby# encoding: utf-8
require "bunny"
conn = Bunny.new
conn.start
ch = conn.create_channel
x = ch.topic("topic_logs")
severity = ARGV.shift || "anonymous.info"
msg = ARGV.empty? ? "Hello World!" : ARGV.join(" ")
x.publish(msg, :routing_key => severity)
puts " [x] Sent #{severity}:#{msg}"
conn.close
consumer部分
<table width="620" align="center" border="0" cellpadding="1" cellspacing="1" style="background:#FB7"> <tr> <td width="464" height="27" bgcolor="#FFE7CE"> 代码如下</td> <td width="109" align="center" bgcolor="#FFE7CE" style="cursor:pointer;" onclick="doCopy('copy2991')">复制代码</td> </tr> <tr> <td height="auto" colspan="2" valign="top" bgcolor="#FFFFFF" style="padding:10px;" class="copyclass" id=copy2991>#!/usr/bin/env ruby# encoding: utf-8
require "bunny"
if ARGV.empty?
abort "Usage: #{$0} [binding key]"
end
conn = Bunny.new
conn.start
ch = conn.create_channel
x = ch.topic("topic_logs")
q = ch.queue("", :exclusive => true)
ARGV.each do |severity|
q.bind(x, :routing_key => severity)
end
puts " [*] Waiting for logs. To exit press CTRL C"
begin
q.subscribe(:block => true) do |delivery_info, properties, body|
puts " [x] #{delivery_info.routing_key}:#{body}"
end
rescue Interrupt => _
ch.close
conn.close
end
RabbitMQ集群和High Scalability
由于RabbitMQ实现的是AMQP,它非常强调一致性,而AMQP本身就是一种适用于金融行业的消息协议。根据CAP原理,一致性、高可用性和分区容忍性只能选两项,RabbitMQ提供了三种配置选项:
ignore:默认配置,发生网络分区时不作处理,当认为网络是可靠时选用该配置
autoheal:各分区协商后重启客户端连接最少的分区节点,恢复集群(CAP 中保证 AP,有状态丢失)
pause_minority:分区发生后判断自己所在分区内节点是否超过集群总节点数一半,如果没有超过则暂停这些节点(保证 CP,总节点数为奇数个)
由于使用Erlang开发,RabbitMQ可以非常方便的搭建集群,可以随时加入节点,这些节点之间是互相等同的,可以在客户端随机选择节点,或者使用诸如haproxy等进行负载均衡反向代理,以达到水平扩展的目的。
您可能感兴趣的文章:
java中RabbitMQ集群使用方法简单介绍
什么是RabbitMQ?RabbitMQ的简单介绍
如何保证消息队列的高可用?
Qutrunk-具有GUI的开源REST / gRPC接口
Windows .NET Server 2003 中的 Microsoft 群集服务 (MSCS)
redis集群是什么?
Centos安装rabbitMQ
Mongodb集群分片与集群简单实例
php安装amqp扩展(windows)
php和java哪个更早