设为首页 收藏本站
查看: 460|回复: 0

[经验分享] RabbitMQ 安装和配置

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-11-9 12:32:47 | 显示全部楼层 |阅读模式

Rabbitmq集群高可用

RabbitMQ是用erlang开发的,集群非常方便,因为erlang天生就是一门分布式语言,但其本身并不支持负载均衡。

Rabbit模式大概分为以下三种:单一模式、普通模式、镜像模式

单一模式:最简单的情况,非集群模式。

及单实例服务。


普通模式:默认的集群模式。

queue创建之后,如果没有其它policy,则queue就会按照普通模式集群。对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构,但队列的元数据仅保存有一份,即创建该队列的rabbitmq节点(A节点),当A节点宕机,你可以去其B节点查看,./rabbitmqctl list_queues 发现该队列已经丢失,但声明的exchange还存在。


当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。

所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连A或B,出口总在A,会产生瓶颈。

该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。

如果做了消息持久化,那么得等A节点恢复,然后才可被消费;如果没有持久化的话,队列数据就丢失了。


镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

该模式解决了上述问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。

该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。

所以在对可靠性要求较高的场合中适用,一个队列想做成镜像队列,需要先设置policy,然后客户端创建队列的时候,rabbitmq集群根据“队列名称”自动设置是普通集群模式或镜像队列。具体如下:

队列通过策略来使能镜像。策略能在任何时刻改变,rabbitmq队列也近可能的将队列随着策略变化而变化;非镜像队列和镜像队列之间是有区别的,前者缺乏额外的镜像基础设施,没有任何slave,因此会运行得更快。

为了使队列称为镜像队列,你将会创建一个策略来匹配队列,设置策略有两个键“ha-mode和 ha-params(可选)”。ha-params根据ha-mode设置不同的值,下面表格说明这些key的选项



了解集群中的基本概念:

RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。不过,如前文所述,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。

一个rabbitmq集 群中可以共享 user,vhost,queue,exchange等,所有的数据和状态都是必须在所有节点上复制的,一个例外是,那些当前只属于创建它的节点的消息队列,尽管它们可见且可被所有节点读取。rabbitmq节点可以动态的加入到集群中,一个节点它可以加入到集群中,也可以从集群环集群会进行一个基本的负载均衡。
集群中有两种节点:
1 内存节点:只保存状态到内存(一个例外的情况是:持久的queue的持久内容将被保存到disk)
2 磁盘节点:保存状态到内存和磁盘。
内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了
如果集群中只有内存节点,那么不能停止它们,否则所有的状态,消息等都会丢失。

思路:

那么具体如何实现RabbitMQ高可用,我们先搭建一个普通集群模式,在这个模式基础上再配置镜像模式实现高可用,Rabbit集群前增加一个反向代理,生产者、消费者通过反向代理访问RabbitMQ集群。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
rabbitmq 安装

1.配置epel 源 on node1-2
rpm -ivh http://download.fedoraproject.or ... ease-6-8.noarch.rpm           
#http://mirrors.yun-idc.com/epel/ ... ease-5-4.noarch.rpm           
wget -O /etc/yum.repos.d/epel-erlang.repo

  
#yum install erlang xmlto
  
#wget http://www.rabbitmq.com/releases ... -3.5.3-1.noarch.rpm  
#rpm -ivh rabbitmq-server-3.5.3-1.noarch.rpm
  
  
#/etc/init.d/rabbitmq-server restart
  
# rabbitmqctl delete_user guest
#rabitmqctl add_user admin password
#rabbitmqctl set_user_tags admin administrator
#rabbitmqctl add_vhost web
#rabbitmqctl set_permissions -p web admin ".*" ".*" ".*"
#rabbitmq restart
#sudo rabbitmq-plugins enable rabbitmq_management
  
#node2
  # rabbitmqctl delete_user guest
#rabitmqctl add_user admin password
#rabbitmqctl set_user_tags admin administrator
#rabbitmqctl add_vhost web
#rabbitmqctl set_permissions -p web admin ".*" ".*" ".*"
#rabbitmq restart
#sudo vim /var/lib/rabbitmq/.erlang.cookie #保持2台node 上的文件一直,权限一样
-r-------- 1 rabbitmq rabbitmq 21 8月  14 10:21 /var/lib/rabbitmq/.erlang.cookie
  
将node1和node2 组成集群
node2 上面执行如下命令
#rabbitmqctl stop_app
#rabbitmqctl join_cluster --disk rabbit@node1  
#rabbitmqctl start_app
#rabbitmqctl cluster_status
  
Cluster status of node 'rabbit@node2' ...
[{nodes,[{disc,['rabbit@node1']},
         {ram,['rabbit@node2']}]},
{running_nodes,['rabbit@node1','rabbit@node2']},
{cluster_name,<<"rabbit@node1">>},
{partitions,[]}]
  
  
#sudo rabbitmq-plugins enable rabbitmq_management
  
#sudo rabbitmqctl stop_app
  sudo rabbitmqctl change_cluster_node_type ram
  sudo rabbitmqctl start_app
  
  
二,配置keepalived 集群
  
node1
! Configuration File for keepalived

global_defs {
   router_id LVS_TALARIS_RMQ_SVR
}

vrrp_instance VI_1 {
    state BACKUP
    interface eth0
    virtual_router_id 47
    priority 160
    advert_int 1
    nopreempt
#    lvs_sync_daemon_interface eth0
    dont_track_primary
    garp_master_delay  5
    authentication {
        auth_type PASS
        auth_pass jianzhong.xu@aa.com
    }
    virtual_ipaddress {
        1.1.1.1
    }
}

node2
! Configuration File for keepalived

global_defs {
   router_id LVS_TALARIS_RMQ_SVR
}

vrrp_instance VI_1 {
    state MASTER
    interface eth0
    virtual_router_id 47
    priority 150
    advert_int 1
    nopreempt
#    lvs_sync_daemon_interface eth0
    dont_track_primary
    garp_master_delay  5
    authentication {
        auth_type PASS
        auth_pass qc_rmq2015@ele.me
    }
    virtual_ipaddress {
        1.1.1.1
    }
}


node1 haproxy:
global
    maxconn 100000
    log /dev/log local0 notice


defaults
    mode tcp
    log global

    option redispatch
    retries 3

    timeout connect 5000ms
    timeout client 50000ms
    timeout server 50000ms


listen stats :4966
    mode http
    stats enable
    stats uri /

frontend rmq_lb
    bind :27010
    default_backend rmq_lb

backend rmq_lb
  balance roundrobin
  server node1_5672 node1:5672 weight 1 maxconn 4096 check inter 1000
  server node2_5672 node2:5672 weight 1 maxconn 4096 check inter 1000
   
   
  node2 haproxy:
   
  global
    maxconn 100000
    log /dev/log local0 notice


defaults
    mode tcp
    log global

    option redispatch
    retries 3

    timeout connect 5000ms
    timeout client 50000ms
    timeout server 50000ms


listen stats :4966
    mode http
    stats enable
    stats uri /


frontend rmq_lb
    bind :27010
    default_backend rmq_lb

backend rmq_lb
  balance roundrobin
  server node1_5672 node1:5672 weight 1 maxconn 4096 check inter 1000
  server node2_5672 node2:5672 weight 1 maxconn 4096 check inter 1000
   
   
   
  #node1 supervisor
  [program:rmq_lb]
command=/opt/sbin/haproxy -f /opt/etc/haproxy/rmq_lb.cfg
autostart=true
autorestart=unexpected
startsecs=3
startretries=3
stopsignal=TERM
stopwaitsecs=5
user=nobody
stopasgroup=true
killasgroup=true
stdout_logfile=syslog
stderr_logfile=syslog

#node2 supervisor
[program:rmq_lb]
command=/opt/sbin/haproxy -f /opt/etc/haproxy/rmq_lb.cfg
autostart=true
autorestart=unexpected
startsecs=3
startretries=3
stopsignal=TERM
stopwaitsecs=5
user=nobody
stopasgroup=true
killasgroup=true
stdout_logfile=syslog
stderr_logfile=syslog

#supervisor monitor
[program:rmqmonitor]
command=ruby /opt/sbin/rmq.watcher.rb  -h statsd
directory=/opt/
autostart=true
user=root
redirect_stderr=true
stdout_logfile=/data/log/rmqmonitor/rmqmonitor.log



#cat rmq.watcher.rb
#!/usr/bin/env ruby
#
# Author: Jeff Vier <jeff@jeffvier.com>

require 'rubygems'
require 'digest'
require 'find'
require 'fileutils'
require 'json'
require 'socket'
require 'resolv'

require 'optparse'

options = {
  :prefix => Socket.gethostname,
  :interval => 10,
  :host => '127.0.0.1',
  :port => 8125,
  :queues => false
}
OptionParser.new do |opts|
  opts.banner = "Usage: #{$0} [options]"

  opts.on('-P', '--prefix [STATSD_PREFIX]', "metric prefix (default: #{options[:prefix]})") { |prefix|   options[:prefix] = "#{prefix}" }
  opts.on('-i', '--interval [SEC]',"reporting interval (default: #{options[:interval]})")   { |interval| options[:interval] = interval }
  opts.on('-h', '--host [HOST]',   "statsd host (default: #{options[:host]})")              { |host|     options[:host] = host }
  opts.on('-p', '--port [PORT]',   "statsd port (default: #{options[:port]})")              { |port|     options[:port] = port }
  opts.on('-q', '--[no-]queues',   "report queue metrics (default: #{options[:queues]})")   { |queues|   options[:queues] = queues }
end.parse!

###############################################################
# Typical StatsD class, pasted to avoid an external dependency:
# Stolen from https://github.com/bvandenbos/statsd-client

class Statsd

  Version = '0.0.8'

  class << self

    attr_accessor :host, :port

    def host_ip_addr
      @host_ip_addr ||= Resolv.getaddress(host)
    end

    def host=(h)
      @host_ip_addr = nil
      @host = h
    end

    # +stat+ to log timing for
    # +time+ is the time to log in ms
    def timing(stat, time = nil, sample_rate = 1)
      if block_given?
        start_time = Time.now.to_f
        yield
        time = ((Time.now.to_f - start_time) * 1000).floor
      end
      send_stats("#{stat}:#{time}|ms", sample_rate)
    end

    def gauge(stat, value, sample_rate = 1)
      send_stats("#{stat}:#{value}|g", sample_rate)
    end

    # +stats+ can be a string or an array of strings
    def increment(stats, sample_rate = 1)
      update_counter stats, 1, sample_rate
    end

    # +stats+ can be a string or an array of strings
    def decrement(stats, sample_rate = 1)
      update_counter stats, -1, sample_rate
    end

    # +stats+ can be a string or array of strings
    def update_counter(stats, delta = 1, sample_rate = 1)
      stats = Array(stats)
      send_stats(stats.map { |s| "#{s}:#{delta}|c" }, sample_rate)
    end

    private

    def send_stats(data, sample_rate = 1)
      data = Array(data)
      sampled_data = []

      # Apply sample rate if less than one
      if sample_rate < 1
        data.each do |d|
          if rand <= sample_rate
            sampled_data << "#{d}|@#{sample_rate}"
          end
        end
        data = sampled_data
      end

      return if data.empty?

      raise "host and port must be set" unless host && port

      begin
        sock = UDPSocket.new
        data.each do |d|
          sock.send(d, 0, host_ip_addr, port)
        end
      rescue => e
        puts "UDPSocket error: #{e}"
      ensure
        sock.close
      end
      true
    end
  end
end

################################################################################

include FileUtils # allows use of FileUtils methods without the FileUtils:: prefix ie: mv_f(file, file2) or rm_rf(dir)

STDOUT.sync = true # don't buffer STDOUT
Statsd.host = options[:host]
Statsd.port = options[:port]

unless system 'which rabbitmqadmin'
  raise "unable to locate the rabbitmqadmin command"
end

loop do
  overview = JSON.parse(`rabbitmqadmin --username=user_monitor --password=Monitor_2015 show overview -f raw_json`)
  prefix = "rabbitmq.#{options[:prefix]}.overview.object_totals"
  Statsd.gauge("#{prefix}.channels", overview[0]['object_totals']['channels'])
  Statsd.gauge("#{prefix}.connections", overview[0]['object_totals']['connections'])
  Statsd.gauge("#{prefix}.consumers", overview[0]['object_totals']['consumers'])
  Statsd.gauge("#{prefix}.exchanges", overview[0]['object_totals']['exchanges'])
  Statsd.gauge("#{prefix}.messages", overview[0]['queue_totals']['messages'])
  Statsd.gauge("#{prefix}.queues", overview[0]['object_totals']['queues'])

  if options[:queues]
    queues = JSON.parse(`rabbitmqadmin --username=user_monitor --password=Monitor_2015 list queues -f raw_json`)
    queues.each do |queue|
      if queue.key?('name')
        prefix = "#{options[:prefix]}.queues.#{queue['name']}"
        Statsd.gauge("#{prefix}.active_consumers", queue['active_consumers'])
        Statsd.gauge("#{prefix}.consumers", queue['consumers'])
        Statsd.gauge("#{prefix}.memory", queue['memory'])
        Statsd.gauge("#{prefix}.messages", queue['messages'])
        Statsd.gauge("#{prefix}.messages_ready", queue['messages_ready'])
        Statsd.gauge("#{prefix}.messages_unacknowledged", queue['messages_unacknowledged'])
        Statsd.gauge("#{prefix}.avg_egress_rate", queue['backing_queue_status']['avg_egress_rate'])   if queue['backing_queue_status']
        Statsd.gauge("#{prefix}.avg_ingress_rate", queue['backing_queue_status']['avg_ingress_rate']) if queue['backing_queue_status']
        if queue.key?('message_stats')
          Statsd.gauge("#{prefix}.ack_rate", queue['message_stats']['ack_details']['rate'])                  if queue['message_stats']['ack_details']
          Statsd.gauge("#{prefix}.deliver_rate", queue['message_stats']['deliver_details']['rate'])          if queue['message_stats']['deliver_details']
          Statsd.gauge("#{prefix}.deliver_get_rate", queue['message_stats']['deliver_get_details']['rate'])  if queue['message_stats']['deliver_get_details']
          Statsd.gauge("#{prefix}.publish_rate", queue['message_stats']['publish_details']['rate'])          if queue['message_stats']['publish_details']
        end
      end
    end
  end

  sleep options[:interval]
end




# 客户端,
将keepalived VIP 配置成域名
然后客户端连接 域名:haproxy 端口


zabbix 监控rmq
UserParameter=rmq.messageTotal
  • ,/etc/zabbix/scripts/rmqmessage.py -u "$1" -p "$2" -H "$3"

    cat rmqmessage.py

    #!/usr/bin/env python
    #coding:utf-8
    import requests,re
    from requests.auth import HTTPBasicAuth
    from optparse import OptionParser


    def RmqMessage():
       usage="usage: %prog [options] arg"
       parser=OptionParser(usage)
       parser.add_option("-u", "--user",action="store",type="string",help="USERNAME")
       parser.add_option("-p", "--passwd",action="store",type="string",help="PASSWORD")
       parser.add_option("-H", "--hostname",action="store",type="string",help="HOSTNAME")
       (options,args) = parser.parse_args()
       user=options.user
       passwd=options.passwd

       hostname=options.hostname
       url="http://%s.elenet.me:15672/api/overview" %(hostname)
       if re.match(r"^\d+.",hostname):
         url="http://%s:15672/api/overview" %(hostname)
       try:
        r = requests.get(url, auth=HTTPBasicAuth(user, passwd))
        result=r.json()
        QmessageTotal=result['queue_totals']['messages']
        print QmessageTotal
        return QmessageTotal
       except Exception, e:
        return -1
    RmqMessage()



  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-137024-1-1.html 上篇帖子: Centos6.6简易版网络配置NAT 下篇帖子: 修改RHEL7/centos7网卡为eth0
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表