Rabbitmq集群高可用 RabbitMQ是用erlang开发的,集群非常方便,因为erlang天生就是一门分布式语言,但其本身并不支持负载均衡。 Rabbit模式大概分为以下三种:单一模式、普通模式、镜像模式 单一模式:最简单的情况,非集群模式。 及单实例服务。
普通模式:默认的集群模式。 queue创建之后,如果没有其它policy,则queue就会按照普通模式集群。对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构,但队列的元数据仅保存有一份,即创建该队列的rabbitmq节点(A节点),当A节点宕机,你可以去其B节点查看,./rabbitmqctl list_queues 发现该队列已经丢失,但声明的exchange还存在。
当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。 所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连A或B,出口总在A,会产生瓶颈。 该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。 如果做了消息持久化,那么得等A节点恢复,然后才可被消费;如果没有持久化的话,队列数据就丢失了。
镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案。 该模式解决了上述问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。 该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。 所以在对可靠性要求较高的场合中适用,一个队列想做成镜像队列,需要先设置policy,然后客户端创建队列的时候,rabbitmq集群根据“队列名称”自动设置是普通集群模式或镜像队列。具体如下: 队列通过策略来使能镜像。策略能在任何时刻改变,rabbitmq队列也近可能的将队列随着策略变化而变化;非镜像队列和镜像队列之间是有区别的,前者缺乏额外的镜像基础设施,没有任何slave,因此会运行得更快。 为了使队列称为镜像队列,你将会创建一个策略来匹配队列,设置策略有两个键“ha-mode和 ha-params(可选)”。ha-params根据ha-mode设置不同的值,下面表格说明这些key的选项
了解集群中的基本概念: RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。不过,如前文所述,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。 一个rabbitmq集 群中可以共享 user,vhost,queue,exchange等,所有的数据和状态都是必须在所有节点上复制的,一个例外是,那些当前只属于创建它的节点的消息队列,尽管它们可见且可被所有节点读取。rabbitmq节点可以动态的加入到集群中,一个节点它可以加入到集群中,也可以从集群环集群会进行一个基本的负载均衡。
集群中有两种节点:
1 内存节点:只保存状态到内存(一个例外的情况是:持久的queue的持久内容将被保存到disk)
2 磁盘节点:保存状态到内存和磁盘。
内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了
如果集群中只有内存节点,那么不能停止它们,否则所有的状态,消息等都会丢失。 思路: 那么具体如何实现RabbitMQ高可用,我们先搭建一个普通集群模式,在这个模式基础上再配置镜像模式实现高可用,Rabbit集群前增加一个反向代理,生产者、消费者通过反向代理访问RabbitMQ集群。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
| rabbitmq 安装
1.配置epel 源 on node1-2
rpm -ivh http://download.fedoraproject.or ... ease-6-8.noarch.rpm
#http://mirrors.yun-idc.com/epel/ ... ease-5-4.noarch.rpm
wget -O /etc/yum.repos.d/epel-erlang.repo
#yum install erlang xmlto
#wget http://www.rabbitmq.com/releases ... -3.5.3-1.noarch.rpm
#rpm -ivh rabbitmq-server-3.5.3-1.noarch.rpm
#/etc/init.d/rabbitmq-server restart
# rabbitmqctl delete_user guest
#rabitmqctl add_user admin password
#rabbitmqctl set_user_tags admin administrator
#rabbitmqctl add_vhost web
#rabbitmqctl set_permissions -p web admin ".*" ".*" ".*"
#rabbitmq restart
#sudo rabbitmq-plugins enable rabbitmq_management
#node2
# rabbitmqctl delete_user guest
#rabitmqctl add_user admin password
#rabbitmqctl set_user_tags admin administrator
#rabbitmqctl add_vhost web
#rabbitmqctl set_permissions -p web admin ".*" ".*" ".*"
#rabbitmq restart
#sudo vim /var/lib/rabbitmq/.erlang.cookie #保持2台node 上的文件一直,权限一样
-r-------- 1 rabbitmq rabbitmq 21 8月 14 10:21 /var/lib/rabbitmq/.erlang.cookie
将node1和node2 组成集群
node2 上面执行如下命令
#rabbitmqctl stop_app
#rabbitmqctl join_cluster --disk rabbit@node1
#rabbitmqctl start_app
#rabbitmqctl cluster_status
Cluster status of node 'rabbit@node2' ...
[{nodes,[{disc,['rabbit@node1']},
{ram,['rabbit@node2']}]},
{running_nodes,['rabbit@node1','rabbit@node2']},
{cluster_name,<<"rabbit@node1">>},
{partitions,[]}]
#sudo rabbitmq-plugins enable rabbitmq_management
#sudo rabbitmqctl stop_app
sudo rabbitmqctl change_cluster_node_type ram
sudo rabbitmqctl start_app
二,配置keepalived 集群
node1
! Configuration File for keepalived
global_defs {
router_id LVS_TALARIS_RMQ_SVR
}
vrrp_instance VI_1 {
state BACKUP
interface eth0
virtual_router_id 47
priority 160
advert_int 1
nopreempt
# lvs_sync_daemon_interface eth0
dont_track_primary
garp_master_delay 5
authentication {
auth_type PASS
auth_pass jianzhong.xu@aa.com
}
virtual_ipaddress {
1.1.1.1
}
}
node2
! Configuration File for keepalived
global_defs {
router_id LVS_TALARIS_RMQ_SVR
}
vrrp_instance VI_1 {
state MASTER
interface eth0
virtual_router_id 47
priority 150
advert_int 1
nopreempt
# lvs_sync_daemon_interface eth0
dont_track_primary
garp_master_delay 5
authentication {
auth_type PASS
auth_pass qc_rmq2015@ele.me
}
virtual_ipaddress {
1.1.1.1
}
}
node1 haproxy:
global
maxconn 100000
log /dev/log local0 notice
defaults
mode tcp
log global
option redispatch
retries 3
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms
listen stats :4966
mode http
stats enable
stats uri /
frontend rmq_lb
bind :27010
default_backend rmq_lb
backend rmq_lb
balance roundrobin
server node1_5672 node1:5672 weight 1 maxconn 4096 check inter 1000
server node2_5672 node2:5672 weight 1 maxconn 4096 check inter 1000
node2 haproxy:
global
maxconn 100000
log /dev/log local0 notice
defaults
mode tcp
log global
option redispatch
retries 3
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms
listen stats :4966
mode http
stats enable
stats uri /
frontend rmq_lb
bind :27010
default_backend rmq_lb
backend rmq_lb
balance roundrobin
server node1_5672 node1:5672 weight 1 maxconn 4096 check inter 1000
server node2_5672 node2:5672 weight 1 maxconn 4096 check inter 1000
#node1 supervisor
[program:rmq_lb]
command=/opt/sbin/haproxy -f /opt/etc/haproxy/rmq_lb.cfg
autostart=true
autorestart=unexpected
startsecs=3
startretries=3
stopsignal=TERM
stopwaitsecs=5
user=nobody
stopasgroup=true
killasgroup=true
stdout_logfile=syslog
stderr_logfile=syslog
#node2 supervisor
[program:rmq_lb]
command=/opt/sbin/haproxy -f /opt/etc/haproxy/rmq_lb.cfg
autostart=true
autorestart=unexpected
startsecs=3
startretries=3
stopsignal=TERM
stopwaitsecs=5
user=nobody
stopasgroup=true
killasgroup=true
stdout_logfile=syslog
stderr_logfile=syslog
#supervisor monitor
[program:rmqmonitor]
command=ruby /opt/sbin/rmq.watcher.rb -h statsd
directory=/opt/
autostart=true
user=root
redirect_stderr=true
stdout_logfile=/data/log/rmqmonitor/rmqmonitor.log
#cat rmq.watcher.rb
#!/usr/bin/env ruby
#
# Author: Jeff Vier <jeff@jeffvier.com>
require 'rubygems'
require 'digest'
require 'find'
require 'fileutils'
require 'json'
require 'socket'
require 'resolv'
require 'optparse'
options = {
:prefix => Socket.gethostname,
:interval => 10,
:host => '127.0.0.1',
:port => 8125,
:queues => false
}
OptionParser.new do |opts|
opts.banner = "Usage: #{$0} [options]"
opts.on('-P', '--prefix [STATSD_PREFIX]', "metric prefix (default: #{options[:prefix]})") { |prefix| options[:prefix] = "#{prefix}" }
opts.on('-i', '--interval [SEC]',"reporting interval (default: #{options[:interval]})") { |interval| options[:interval] = interval }
opts.on('-h', '--host [HOST]', "statsd host (default: #{options[:host]})") { |host| options[:host] = host }
opts.on('-p', '--port [PORT]', "statsd port (default: #{options[:port]})") { |port| options[:port] = port }
opts.on('-q', '--[no-]queues', "report queue metrics (default: #{options[:queues]})") { |queues| options[:queues] = queues }
end.parse!
###############################################################
# Typical StatsD class, pasted to avoid an external dependency:
# Stolen from https://github.com/bvandenbos/statsd-client
class Statsd
Version = '0.0.8'
class << self
attr_accessor :host, :port
def host_ip_addr
@host_ip_addr ||= Resolv.getaddress(host)
end
def host=(h)
@host_ip_addr = nil
@host = h
end
# +stat+ to log timing for
# +time+ is the time to log in ms
def timing(stat, time = nil, sample_rate = 1)
if block_given?
start_time = Time.now.to_f
yield
time = ((Time.now.to_f - start_time) * 1000).floor
end
send_stats("#{stat}:#{time}|ms", sample_rate)
end
def gauge(stat, value, sample_rate = 1)
send_stats("#{stat}:#{value}|g", sample_rate)
end
# +stats+ can be a string or an array of strings
def increment(stats, sample_rate = 1)
update_counter stats, 1, sample_rate
end
# +stats+ can be a string or an array of strings
def decrement(stats, sample_rate = 1)
update_counter stats, -1, sample_rate
end
# +stats+ can be a string or array of strings
def update_counter(stats, delta = 1, sample_rate = 1)
stats = Array(stats)
send_stats(stats.map { |s| "#{s}:#{delta}|c" }, sample_rate)
end
private
def send_stats(data, sample_rate = 1)
data = Array(data)
sampled_data = []
# Apply sample rate if less than one
if sample_rate < 1
data.each do |d|
if rand <= sample_rate
sampled_data << "#{d}|@#{sample_rate}"
end
end
data = sampled_data
end
return if data.empty?
raise "host and port must be set" unless host && port
begin
sock = UDPSocket.new
data.each do |d|
sock.send(d, 0, host_ip_addr, port)
end
rescue => e
puts "UDPSocket error: #{e}"
ensure
sock.close
end
true
end
end
end
################################################################################
include FileUtils # allows use of FileUtils methods without the FileUtils:: prefix ie: mv_f(file, file2) or rm_rf(dir)
STDOUT.sync = true # don't buffer STDOUT
Statsd.host = options[:host]
Statsd.port = options[:port]
unless system 'which rabbitmqadmin'
raise "unable to locate the rabbitmqadmin command"
end
loop do
overview = JSON.parse(`rabbitmqadmin --username=user_monitor --password=Monitor_2015 show overview -f raw_json`)
prefix = "rabbitmq.#{options[:prefix]}.overview.object_totals"
Statsd.gauge("#{prefix}.channels", overview[0]['object_totals']['channels'])
Statsd.gauge("#{prefix}.connections", overview[0]['object_totals']['connections'])
Statsd.gauge("#{prefix}.consumers", overview[0]['object_totals']['consumers'])
Statsd.gauge("#{prefix}.exchanges", overview[0]['object_totals']['exchanges'])
Statsd.gauge("#{prefix}.messages", overview[0]['queue_totals']['messages'])
Statsd.gauge("#{prefix}.queues", overview[0]['object_totals']['queues'])
if options[:queues]
queues = JSON.parse(`rabbitmqadmin --username=user_monitor --password=Monitor_2015 list queues -f raw_json`)
queues.each do |queue|
if queue.key?('name')
prefix = "#{options[:prefix]}.queues.#{queue['name']}"
Statsd.gauge("#{prefix}.active_consumers", queue['active_consumers'])
Statsd.gauge("#{prefix}.consumers", queue['consumers'])
Statsd.gauge("#{prefix}.memory", queue['memory'])
Statsd.gauge("#{prefix}.messages", queue['messages'])
Statsd.gauge("#{prefix}.messages_ready", queue['messages_ready'])
Statsd.gauge("#{prefix}.messages_unacknowledged", queue['messages_unacknowledged'])
Statsd.gauge("#{prefix}.avg_egress_rate", queue['backing_queue_status']['avg_egress_rate']) if queue['backing_queue_status']
Statsd.gauge("#{prefix}.avg_ingress_rate", queue['backing_queue_status']['avg_ingress_rate']) if queue['backing_queue_status']
if queue.key?('message_stats')
Statsd.gauge("#{prefix}.ack_rate", queue['message_stats']['ack_details']['rate']) if queue['message_stats']['ack_details']
Statsd.gauge("#{prefix}.deliver_rate", queue['message_stats']['deliver_details']['rate']) if queue['message_stats']['deliver_details']
Statsd.gauge("#{prefix}.deliver_get_rate", queue['message_stats']['deliver_get_details']['rate']) if queue['message_stats']['deliver_get_details']
Statsd.gauge("#{prefix}.publish_rate", queue['message_stats']['publish_details']['rate']) if queue['message_stats']['publish_details']
end
end
end
end
sleep options[:interval]
end
# 客户端,
将keepalived VIP 配置成域名
然后客户端连接 域名:haproxy 端口
zabbix 监控rmq
UserParameter=rmq.messageTotal,/etc/zabbix/scripts/rmqmessage.py -u "$1" -p "$2" -H "$3"
cat rmqmessage.py
#!/usr/bin/env python
#coding:utf-8
import requests,re
from requests.auth import HTTPBasicAuth
from optparse import OptionParser
def RmqMessage():
usage="usage: %prog [options] arg"
parser=OptionParser(usage)
parser.add_option("-u", "--user",action="store",type="string",help="USERNAME")
parser.add_option("-p", "--passwd",action="store",type="string",help="PASSWORD")
parser.add_option("-H", "--hostname",action="store",type="string",help="HOSTNAME")
(options,args) = parser.parse_args()
user=options.user
passwd=options.passwd
hostname=options.hostname
url="http://%s.elenet.me:15672/api/overview" %(hostname)
if re.match(r"^\d+.",hostname):
url="http://%s:15672/api/overview" %(hostname)
try:
r = requests.get(url, auth=HTTPBasicAuth(user, passwd))
result=r.json()
QmessageTotal=result['queue_totals']['messages']
print QmessageTotal
return QmessageTotal
except Exception, e:
return -1
RmqMessage()
|
|