Neutron Message Queue
1.单个worker queue创建情况:q-plugin: # neutron-sever topic of q-plugin
1.exchange => neutron
queue=>q-plugin
router_key=>q-plugin
2.exchange=>neutron
queue=>q-plugin.newton-controller
router_key=>q-plugin.newton-controller
3.exchange=>q-plugin_fanout
queue=>q-plugin_fanout_707fee67cf9746a2aa121593c73e84b1
router_key=>q-plugin
2. Consumer 创建监听(executor_thread_pool_size = 64):
start_rpc_listeners(plugin.py) -> create_consumer(neutron/common/rpc.py) -> get_rpc_server(oslo_messaging/rpc/server.py) -> MessageHandlingServer(oslo_messaging/server.py)
| # Create listener
consume_in_threads(neutron/common/rpc.py) -> start(oslo_messaging/server.py:MessageHandlingServer)
-> _runner(oslo_messaging/server.py:MessageHandlingServer) -> _listen(oslo_messaging/transport.py:Transport)
| # Dispatch message to Endpoint
__call__(oslo_messaging/rpc/dispatcher.py) -> __init__(oslo_messaging/rpc/dispatcher.py:DispatcherExecutorContext)
-> run(oslo_messaging/rpc/dispatcher.py:DispatcherExecutorContext)-> _dispatch_and_reply(oslo_messaging/rpc/dispatcher.py)
-> _dispatch(oslo_messaging/rpc/dispatcher.py:RPCDispatcher) -> _do_dispatch(oslo_messaging/rpc/dispatcher.py)
-> reply(oslo_messaging/_drivers/amqpdriver.py:AMQPIncomingMessage) -> direct_send(oslo_messaging/_drivers/impl_rabbit.py:Connection)
-> listen(oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase) -> poll(oslo_messaging/_drivers/amqpdriver.py:AMQPListener)
-> consume(oslo_messaging/_drivers/impl_rabbit.py:Connection)-> autoretry(kombu.connection.Connection)
-> _consume(oslo_messaging/_drivers/impl_rabbit.py:Connection) -> drain_events(kombu.connection.Connection)
-> consume(oslo_messaging/_drivers/impl_rabbit.py:Consumer)
| # Dispatch message to MessageHandlingServer-> _callback(oslo_messaging/_drivers/impl_rabbit.py:Consumer)
-> __call__(oslo_messaging/_drivers/amqpdriver.py:AMQPListener)
2. Publisher 发送情况(rpc_response_timeout = 60):
report_state(neutron/common/rpc.py:PluginReportStateAPI) -> get_client(neutron/common/rpc.py) -> __init__(neutron/common/rpc.py:BackingOffClient)-> __init__(oslo_messaging/rpc/dispatcher.py:RPCClient)
|
prepare(neutron/common/rpc.py:BackingOffClient) -> prepare(oslo_messaging/rpc/dispatcher.py:RPCClient)
->prepare(oslo_messaging/rpc/dispatcher.py:_CallContext)
|
call(oslo_messaging/rpc/dispatcher.py:_CallContext) -> _send(oslo_messaging/transport.py:Transport)
->send(oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase) -> _send
-> _get_reply_q(oslo_messaging/_drivers/amqpdriver.py:AMQPDriverBase)
-> wait(oslo_messaging/_drivers/amqpdriver.py:ReplyWaiter) -> _process_reply
->topic_send(oslo_messaging/_drivers/impl_rabbit.py:Connection) -> _ensure_publishing -> _publish
页:
[1]