yui 发表于 2017-12-9 12:35:53

Neutron Message Queue

  1.单个worker queue创建情况:



q-plugin: # neutron-sever topic of q-plugin

1.exchange => neutron
   queue=>q-plugin
   router_key=>q-plugin
2.exchange=>neutron
   queue=>q-plugin.newton-controller
   router_key=>q-plugin.newton-controller
3.exchange=>q-plugin_fanout
   queue=>q-plugin_fanout_707fee67cf9746a2aa121593c73e84b1
   router_key=>q-plugin
  2. Consumer 创建监听(executor_thread_pool_size = 64):



start_rpc_listeners(plugin.py) -> create_consumer(neutron/common/rpc.py) -> get_rpc_server(oslo_messaging/rpc/server.py) -> MessageHandlingServer(oslo_messaging/server.py)
                                  | # Create listener
                                 consume_in_threads(neutron/common/rpc.py) -> start(oslo_messaging/server.py:MessageHandlingServer)
                                 -> _runner(oslo_messaging/server.py:MessageHandlingServer) -> _listen(oslo_messaging/transport.py:Transport)
                                          | # Dispatch message to Endpoint
                                           __call__(oslo_messaging/rpc/dispatcher.py) -> __init__(oslo_messaging/rpc/dispatcher.py:DispatcherExecutorContext)
                                           -> run(oslo_messaging/rpc/dispatcher.py:DispatcherExecutorContext)-> _dispatch_and_reply(oslo_messaging/rpc/dispatcher.py)
                                           -> _dispatch(oslo_messaging/rpc/dispatcher.py:RPCDispatcher) -> _do_dispatch(oslo_messaging/rpc/dispatcher.py)
                                           -> reply(oslo_messaging/_drivers/amqpdriver.py:AMQPIncomingMessage) -> direct_send(oslo_messaging/_drivers/impl_rabbit.py:Connection)
                                 -> listen(oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase) -> poll(oslo_messaging/_drivers/amqpdriver.py:AMQPListener)
                                 -> consume(oslo_messaging/_drivers/impl_rabbit.py:Connection)-> autoretry(kombu.connection.Connection)
                                 -> _consume(oslo_messaging/_drivers/impl_rabbit.py:Connection) -> drain_events(kombu.connection.Connection)
                                 -> consume(oslo_messaging/_drivers/impl_rabbit.py:Consumer)
                                          | # Dispatch message to MessageHandlingServer-> _callback(oslo_messaging/_drivers/impl_rabbit.py:Consumer)
                                           -> __call__(oslo_messaging/_drivers/amqpdriver.py:AMQPListener)
  2. Publisher 发送情况(rpc_response_timeout = 60):



report_state(neutron/common/rpc.py:PluginReportStateAPI) -> get_client(neutron/common/rpc.py) -> __init__(neutron/common/rpc.py:BackingOffClient)-> __init__(oslo_messaging/rpc/dispatcher.py:RPCClient)
                                                            |
                                                             prepare(neutron/common/rpc.py:BackingOffClient) -> prepare(oslo_messaging/rpc/dispatcher.py:RPCClient)
                                                             ->prepare(oslo_messaging/rpc/dispatcher.py:_CallContext)
                                                            |
                                                             call(oslo_messaging/rpc/dispatcher.py:_CallContext) -> _send(oslo_messaging/transport.py:Transport)
                                                             ->send(oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase) -> _send
                                                                   -> _get_reply_q(oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase)
                                                                   -> wait(oslo_messaging/_drivers/amqpdriver.py:ReplyWaiter) -> _process_reply
                                                             ->topic_send(oslo_messaging/_drivers/impl_rabbit.py:Connection) -> _ensure_publishing -> _publish
页: [1]
查看完整版本: Neutron Message Queue