设为首页 收藏本站
查看: 1119|回复: 0

[经验分享] [Erlang 0089] RabbitMQ Exchange

[复制链接]

尚未签到

发表于 2015-9-10 12:23:14 | 显示全部楼层 |阅读模式
  
    之前提到了RabbitMQ是怎样维护Queue的data和metadata的.我们知道Queue在RabbitMQ对应Erlang的进程,那么Exchane是不是也是独立的Erlang进程呢?它的信息是如何维护的呢?
  

Exchange 本质上是什么
  
     印象中Vhost就像一个容器,Exchange Queue就像一个个零件,这些东西组装起来成为一个消息的Broker.真正实现的时候并不是每一个东西都会按照一一对应的方式设计实体,这里我想到两点:
    [1] 思考问题的时候恰当的隐喻能够帮我们快速理解问题,但是在某个时候要抛开隐喻,因为它和你要理解的东西毕竟是两个独立的东西;
    [2] 奥卡姆剃刀原则,如无必要勿增实体,在Erlang世界中创建进程的成本很低,往往会滥用这一点,把一些没有必要设计成独立进程的搞成了进程,Erlang的设计哲学是把独立的活动用进程表达.进程创建也是有限制的,最后一根稻草压死骆驼,雪崩的时候每一片雪花都不认为自己有责任,就是这个道理.
  
DSC0000.png
     Exchange在RabbitMQ就没有设计为进程而仅仅是单纯维护了exchange 名称以及相关的bingding规则.我们发送消息到Exchange本质上是:你连接到RabbitMQ的channel拿消息中的routing key和binding规则匹配完成消息的路由.换句话说,真正完成route逻辑的实体是channel,exchange只是一堆规则集,很容易在集群内部同步,所以没有queue面临的那些问题.下面是rabbit_channel的代码片段,我们可以窥见一斑:
    



handle_method(#'basic.publish'{exchange    = ExchangeNameBin,
routing_key = RoutingKey,
mandatory   = Mandatory,
immediate   = Immediate},
Content, State = #ch{virtual_host    = VHostPath,
tx_status       = TxStatus,
confirm_enabled = ConfirmEnabled,
trace_state     = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
{MsgSeqNo, State1} =
case {TxStatus, ConfirmEnabled} of
{none, false} -> {undefined, State};
{_, _}        -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_trace_in(Message, TraceState),
Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
{noreply,
case TxStatus of
none        -> deliver_to_queues({Delivery, QNames}, State1);
in_progress -> TMQ = State1#ch.uncommitted_message_q,
NewTMQ = queue:in({Delivery, QNames}, TMQ),
State1#ch{uncommitted_message_q = NewTMQ}
end};
{error, Reason} ->
rabbit_misc:protocol_error(precondition_failed,
"invalid message: ~p", [Reason])
end;
  
    我们在RabbitMQ集群环境创建exchange实际上就是在集群内所有节点的exchange表增加一条数据.这样连接到所有节点都可以使用这个新exchange.
  
    要是发送到channel的消息还没有完成路由节点就当掉了怎么办?
  
    basic.public AMQP命令并没有返回消息的状态.这就意味节点当掉的时候,channel还会继续routing消息.producer会继续发布消息,这样就有丢消息的风险.
  
    解决方案有两个:[1] 使用AMQP transaction (AMQP事务):producer保持阻塞状态直到消息被路由到queue.[2] 使用publisher confirm 来跟踪节点当掉的时候哪些消息还没有确认.
    这两种方法都可以帮助我们检测消息是否到达目的地(比如节点当掉).
    下面我们看看exchange在erlang node里面是怎么维护的:
  





Exchange in Erlang node




  通过ets:i()查看节点的ETS表,能够看到一系列Rabbit前缀的表:






rabbit_durable_exchange rabbit_durable_exchange set   9      507      mnesia_monitor
rabbit_durable_queue rabbit_durable_queue set   2      335      mnesia_monitor
rabbit_durable_route rabbit_durable_route set   4      445      mnesia_monitor
rabbit_exchange rabbit_exchange   set   9      507      mnesia_monitor
rabbit_exchange_serial rabbit_exchange_serial set   0      283      mnesia_monitor
rabbit_listener rabbit_listener   bag   1      321      mnesia_monitor
rabbit_queue    rabbit_queue      set   2      335      mnesia_monitor
rabbit_registry rabbit_registry   set   7      353      rabbit_registry
rabbit_reverse_route rabbit_reverse_route ordered_set 4      239      mnesia_monitor
rabbit_route    rabbit_route      ordered_set 4      239      mnesia_monitor
rabbit_semi_durable_route rabbit_semi_durable_route ordered_set 4      239      mnesia_monitor
rabbit_topic_trie_binding rabbit_topic_trie_binding ordered_set 0      73       mnesia_monitor
rabbit_topic_trie_edge rabbit_topic_trie_edge ordered_set 0      73       mnesia_monitor
rabbit_topic_trie_node rabbit_topic_trie_node ordered_set 0      73       mnesia_monitor
rabbit_user     rabbit_user       set   1      302      mnesia_monitor
rabbit_user_permission rabbit_user_permission set   1      315      mnesia_monitor
rabbit_vhost    rabbit_vhost      set   1      294      mnesia_monitor
  




  下面我们看几个我们比较关注的ets表



rabbit_exchange :






(default@zen.com)37> ets:match(rabbit_exchange,'$1').
[[{exchange,{resource,<<"/">>,exchange,
<<"amq.rabbitmq.trace">>},
topic,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,
<<"amq.rabbitmq.log">>},
topic,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"qp_pic_exchange">>},
direct,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.match">>},
headers,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.headers">>},
headers,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.topic">>},
topic,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.direct">>},
direct,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.fanout">>},
fanout,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<>>},
direct,true,false,false,[],undefined}]]
  




rabbit_queue






(default@zen.com)32> ets:match(rabbit_queue,'$1').
[[#amqqueue{name = {resource,<<"/">>,queue,
<<"zen_qp_pic_queue">>},
durable = true,auto_delete = false,exclusive_owner = none,
arguments = [],pid = <0.396.0>,slave_pids = [],
mirror_nodes = undefined}],
[#amqqueue{name = {resource,<<"/">>,queue,
<<"qp_pic_queue2">>},
durable = true,auto_delete = false,exclusive_owner = none,
arguments = [],pid = <0.397.0>,slave_pids = [],
mirror_nodes = undefined}]]
(default@zen.com)33> process_info(pid(0,396,0)).
[{current_function,{erlang,hibernate,3}},
{initial_call,{proc_lib,init_p,5}},
{status,waiting},
{message_queue_len,0},
{messages,[]},
{links,[#Port<0.2723>,<0.204.0>]},
{dictionary,[{{"/var/lib/rabbitmq/mnesia/rabbit@localhost/queues/48ROU6Q8HVQVRA1PVZQ38TZLK/journal.jif",
fhc_file},
{file,1,true}},
{{xtype_to_module,direct},rabbit_exchange_type_direct},
{credit_blocked,[]},
{{credit_from,<0.198.0>},1998},
{'$ancestors',[rabbit_amqqueue_sup,rabbit_sup,<0.110.0>]},
{{#Ref<0.0.0.4414>,fhc_handle},
{handle,{file_descriptor,prim_file,{#Port<0.2723>,18}},
0,false,0,infinity,[],true,
"/var/lib/rabbitmq/mnesia/rabbit@localhost/queues/48ROU6Q8HVQVRA1PVZQ38TZLK/journal.jif",
[write,binary|...],
[{...}],
true,...}},
{fhc_age_tree,{1,
{{1352,908329,782846},#Ref<0.0.0.4414>,nil,nil}}},
{guid,{{2942931266,2608231844,2036421255,3742291922},1}},
{'$initial_call',{gen,init_it,6}}]},
{trap_exit,true},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.109.0>},
{total_heap_size,1709},
{heap_size,1709},
{stack_size,0},
{reductions,8669},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,10}]},
{suspending,[]}]
  
   rabbit_route







(default@zen.com)35> ets:match(rabbit_route,'$1').
[[{route,{binding,{resource,<<"/">>,exchange,<<>>},
<<"qp_pic_queue2">>,
{resource,<<"/">>,queue,<<"qp_pic_queue2">>},
[]},
const}],
[{route,{binding,{resource,<<"/">>,exchange,<<>>},
<<"zen_qp_pic_queue">>,
{resource,<<"/">>,queue,<<"zen_qp_pic_queue">>},
[]},
const}],
[{route,{binding,{resource,<<"/">>,exchange,
<<"qp_pic_exchange">>},
<<"qp_pic2">>,
{resource,<<"/">>,queue,<<"qp_pic_queue2">>},
[]},
const}],
[{route,{binding,{resource,<<"/">>,exchange,
<<"qp_pic_exchange">>},
<<"qp_pic2">>,
{resource,<<"/">>,queue,<<"zen_qp_pic_queue">>},
[]},
const}]]
  
  
  technology.rabbitmq.ebook
sport.golf.paper
sport.tennis.ebook
  
  Using the topic exchange, the subscription/binding key specified in step 5 can be a sequence
  of dot-separated words and/or wildcards. AMQP wildcards are just:
#  This matches zero or more words
*  This matches exactly one word
  
So, for example:
#.ebook and *.*.ebook both match the first and the third sent messages
sport.# and sport.*.* both match the second and the third sent messages
# alone matches any message sent
  
  
  最后小图一张 Emma Watson 我们俩都是O型血 Petrificus Totalus!
DSC0001.jpg

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-111938-1-1.html 上篇帖子: Sharepoint应用EWSManagedAPI操作Exchange邮箱 下篇帖子: Exchange开发(六) ExchangeHelper类
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表