openstack的mq机制,封装的很好,只需要通过调用几个高层函数就可以实现消费者、exchange、queue以及他们之间的bindings。具体的创建代码在service.py的start函数中。
1.结合nova-network服务的启动,分析openstack 的消费者相关的行为。
服务启动的入口在bin/nova-network中,核心代码只有三行:
server = service.Service.create(binary='nova-network') #create是一个类方法,该类的返回结果就实例化了一个Service对象,create方法进行了Service对象的初始化
service.serve(server) #启动该服务,服务启动后做的工作主要在这个方法中实现,该方法位于nova/service.py中
service.wait() #主线程等待
nova/service.py 的serve方法:
_launcher = None
def serve(*servers):
global _launcher #定义_launcher为全局变量
if not _launcher:
_launcher = Launcher() #实例化launcher,serveice操作的一些方法,对service方法的封装,实现了多个service的操作
for server in servers:
_launcher.launch_server(server) 启动server
nova/service.py的launch_server方法:
gt = eventlet.spawn(self.run_server, server) #创建一个绿色线程来执行run_server方法,并给该方法传入server对象
self._services.append(gt) #把返回的线程对象添加到_services数组中
ps:eventlet.spawn(func,*args,**kw)方法启动一个绿色线程(green thread)来执行func函数,args和kw为传入的参数,绿色线程用来处理与网络相关的一些工作,与普通线程相比,它的消耗非常低,每一个网络连接至少一个绿色线程;同时,绿色线程执行的是协同式调度,而非普通线程的抢占式调度,因此无需对共享的数据访问进行加锁。spawn会返回线程对象,通过该对象,也可以获取func执行的结果;spawn_n则没有返回值。
nova/service.py的run_server方法:
server.start()
server.wait()
nova/service.py的start方法:
def start(self):
vcs_string = version.version_string_with_vcs()
LOG.audit(_('Starting %(topic)s node (version %(vcs_string)s)'),
{'topic': self.topic, 'vcs_string': vcs_string})
utils.cleanup_file_locks() #lockfile是用来不同平台进程通信时的同步锁,该方法删除进程执行失败后清理这些lockfile
self.manager.init_host() #network/manager.py下init_host方法初始化一些网络设备,比如说创建网桥等等
self.model_disconnected = False
.......
self.conn = rpc.create_connection(new=True) #创建连接,默认是kombo实现
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, self, fanout=False) #创建以topic为路由键的消费者
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, self, fanout=False) #创建以node.topic为路由键的消费者,exchange方式为topic的
self.conn.create_consumer(self.topic, self, fanout=True) #创建一个fanout方式转发的消费者,广播的形式,消息转发速度最快
#在创建consumer时,declare_consumer方法会将创建的声明的consumer添加到consumers数组中(self.consumers.append(consumer)),指定了消费者的回调函数(ProxyCallback,解析
#消息);在下面的consume_in_thread中执行
#self.consume ()方法,consume()方法会用所有消费者去做消费消息动作,该动作在iterconsume方法中,_consume()方法做消费消息的动作:
#self.connection.drain_events(timeout=timeout) 会等待来自服务器的消息通知
# Consume from all consumers in a thread
self.conn.consume_in_thread() #在一个绿色线程中消费消息
#下面是两个定时器,报告服务的状态等信息
if self.report_interval:
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
self.timers.append(pulse)
if self.periodic_interval:
periodic = utils.LoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval, now=False)
self.timers.append(periodic)
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com