wfkjxy 发表于 2015-10-11 10:33:57

openstack创建instance的流程解析


综述

启动一个新的instance涉及到很多openstack nova里面的组件

[*]API server:处理客户端的请求,并且转发到cloud control
[*]Cloud control:处理compute节点,网络控制节点,API server和scheduler中间连接
[*]Scheduler:选择一个host去执行命令
[*]compute worker:启动和停止实例,附加和删除卷 等操作
[*]network controller:管理网络资源,分配固定IP,配置vlans





1.API server将消息发送到Cloud Controller
2. Authertication 保用户有权限,然后Cloud Controller将消息发送给Scheduler
3. Scheduler caste 一个消息给一个选择好的host要求他启动一个实例
4.compute worker(选择的那个host)获取到消息
5.6.7.8 compute worker需要一个固定的ip去启动一个实例,所以向network controller发送消息


下面我将详细说明一下:




API


1.可以在dashboard网页上面进行
2.可以用命令行 euca-add-keypair      euca-run-instances


用户的请求发送到nova-api,有两种方式
第一种:通过openstack api (nova/api/servers.py 类 class Controller(object))create方法
    def create(self, req, body):
      """ Creates a new server for a given user """
      if 'server' in body:
            body['server']['key_name'] = self._get_key_name(req, body)


      extra_values = None
      extra_values, instances = self.helper.create_instance(
                req, body, self.compute_api.create)


第二种:通过ec2 api (nova/api/cloud.py 中类 CloudController )
调用def run_instances(self, context, **kwargs):
      ...
      (instances, resv_id) = self.compute_api.create(context,
            instance_type=instance_types.get_instance_type_by_name(
                kwargs.get('instance_type', None)),
      ...


最终调用的Compute API create():

[*]查看这种类型的instance是否达到最大值
[*]如果不存在安全组,就创建个
[*]生成MAC地址和hostnames
[*]给scheduler发送一个消息去运行这个实例



CAST

当然maxCount为1(默认值为1)的时候 调用RPC.cast方法向scheduler发送运行实例的消息









在openstack中通过RPC.cast来发送消息,消息的分发通过RabbitMQ。消息发送方(Compute API)往
topic exchange(scheduler topic)发送一个消息,消息消费者(Scheduler worker)从队列中获得消息,
cast调用不需要返回值。

view plaincopyprint?

[*]def _schedule_run_instance(self,
[*]      ...
[*]      return rpc_method(context,
[*]                FLAGS.scheduler_topic,
[*]                {"method":
"run_instance",
[*]               "args": {"topic": FLAGS.compute_topic,
[*]                        "request_spec": request_spec,
[*]                        "admin_password": admin_password,
[*]                        "injected_files": injected_files,
[*]                        "requested_networks": requested_networks,
[*]                        "is_first_time":
True,
[*]                        "filter_properties": filter_properties}})

def _schedule_run_instance(self,
...
return rpc_method(context,
FLAGS.scheduler_topic,
{"method": "run_instance",
"args": {"topic": FLAGS.compute_topic,
"request_spec": request_spec,
"admin_password": admin_password,
"injected_files": injected_files,
"requested_networks": requested_networks,
"is_first_time": True,
"filter_properties": filter_properties}})

Scheduler


scheduler接收到消息,然后通过设定的scheduler策略选择一个目的host,如:zone scheduler
选择一个主机在特定的可获取的zone上面。最后发送一个cast消息到特定的host上面

view plaincopyprint?

[*]def cast_to_compute_host(context, host, method, update_db=True, **kwargs):
[*]    """Cast request to a compute host queue"""
[*]
[*]    if update_db:
[*]      # fall back on the id if the uuid is not present
[*]      instance_id = kwargs.get('instance_id',
None)
[*]      instance_uuid = kwargs.get('instance_uuid', instance_id)
[*]      if instance_uuid is
not None:
[*]            now = utils.utcnow()
[*]            db.instance_update(context, instance_uuid,
[*]                  {'host': host,
'scheduled_at': now})
[*]    rpc.cast(context,
[*]            db.queue_get_for(context,
'compute', host),
[*]            {"method": method,
"args": kwargs})
[*]    LOG.debug(_("Casted '%(method)s' to compute '%(host)s'") % locals())

def cast_to_compute_host(context, host, method, update_db=True, **kwargs):
"""Cast request to a compute host queue"""
if update_db:
# fall back on the id if the uuid is not present
instance_id = kwargs.get('instance_id', None)
instance_uuid = kwargs.get('instance_uuid', instance_id)
if instance_uuid is not None:
now = utils.utcnow()
db.instance_update(context, instance_uuid,
{'host': host, 'scheduled_at': now})
rpc.cast(context,
db.queue_get_for(context, 'compute', host),
{"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to compute '%(host)s'") % locals())


Compute

compute worker进程接收到消息执行方法(nova/compute/manager.py)




view plaincopyprint?

[*]def _run_instance(self, context, instance_uuid,
[*]                      requested_networks=None,
[*]                      injected_files=[],
[*]                      admin_password=None,
[*]                      is_first_time=False,
[*]                      **kwargs):
[*]      """Launch a new instance with specified options."""
[*]      context = context.elevated()
[*]      try:
[*]            instance = self.db.instance_get_by_uuid(context, instance_uuid)
[*]            self._check_instance_not_already_created(context, instance)
[*]            image_meta = self._check_image_size(context, instance)
[*]            self._start_building(context, instance)
[*]            self._notify_about_instance_usage(instance,
"create.start")
[*]            network_info = self._allocate_network(context, instance,
[*]                                                requested_networks)
[*]            try:
[*]                block_device_info = self._prep_block_device(context, instance)
[*]                instance = self._spawn(context, instance, image_meta,
[*]                                       network_info, block_device_info,
[*]                                       injected_files, admin_password)
[*]      ...

def _run_instance(self, context, instance_uuid,
requested_networks=None,
injected_files=[],
admin_password=None,
is_first_time=False,
**kwargs):
"""Launch a new instance with specified options."""
context = context.elevated()
try:
instance = self.db.instance_get_by_uuid(context, instance_uuid)
self._check_instance_not_already_created(context, instance)
image_meta = self._check_image_size(context, instance)
self._start_building(context, instance)
self._notify_about_instance_usage(instance, "create.start")
network_info = self._allocate_network(context, instance,
requested_networks)
try:
block_device_info = self._prep_block_device(context, instance)
instance = self._spawn(context, instance, image_meta,
network_info, block_device_info,
injected_files, admin_password)
...


[*]检查instance是否已经在运行
[*]分配一个固定的ip地址
[*]如果没有设置vlan和网桥,设置一下
[*]最后通过虚拟化的driver spawn一个instance





network controller

network_info = self._allocate_network(context, instance,
                                                requested_networks)
调用network的API的allocate_for_instance方法


view plaincopyprint?

[*]def allocate_for_instance(self, context, instance, **kwargs):
[*]      """Allocates all network structures for an instance.
[*]
[*]      :returns: network info as from get_instance_nw_info() below
[*]      """
[*]      args = kwargs
[*]      args['instance_id'] = instance['id']
[*]      args['instance_uuid'] = instance['uuid']
[*]      args['project_id'] = instance['project_id']
[*]      args['host'] = instance['host']
[*]      args['rxtx_factor'] = instance['instance_type']['rxtx_factor']
[*]
[*]      nw_info = rpc.call(context, FLAGS.network_topic,
[*]                           {'method':
'allocate_for_instance',
[*]                           'args': args})

def allocate_for_instance(self, context, instance, **kwargs):
"""Allocates all network structures for an instance.
:returns: network info as from get_instance_nw_info() below
"""
args = kwargs
args['instance_id'] = instance['id']
args['instance_uuid'] = instance['uuid']
args['project_id'] = instance['project_id']
args['host'] = instance['host']
args['rxtx_factor'] = instance['instance_type']['rxtx_factor']
nw_info = rpc.call(context, FLAGS.network_topic,
{'method': 'allocate_for_instance',
'args': args})RPC.call 与RPC.cast最大的不同 就是call方法需要一个response







Spawn instance

接下来我要说的就是虚拟化的driver spawn instance,我们这里使用的是libvirt(nova/virt/libvirt/lconnection.py)




view plaincopyprint?

[*]def spawn(self, context, instance, image_meta, network_info,
[*]            block_device_info=None):
[*]      xml = self.to_xml(instance, network_info, image_meta,
False,
[*]                        block_device_info=block_device_info)
[*]      self.firewall_driver.setup_basic_filtering(instance, network_info)
[*]      self.firewall_driver.prepare_instance_filter(instance, network_info)
[*]      self._create_image(context, instance, xml, network_info=network_info,
[*]                           block_device_info=block_device_info)
[*]
[*]      self._create_new_domain(xml)
[*]      LOG.debug(_("Instance is running"), instance=instance)
[*]      self._enable_hairpin(instance)
[*]      self.firewall_driver.apply_instance_filter(instance, network_info)
[*]
[*]      def _wait_for_boot():
[*]            """Called at an interval until the VM is running."""
[*]            try:
[*]                state = self.get_info(instance)['state']
[*]            except exception.NotFound:
[*]                LOG.error(_("During reboot, instance disappeared."),
[*]                        instance=instance)
[*]                raise utils.LoopingCallDone
[*]
[*]            if state == power_state.RUNNING:
[*]                LOG.info(_("Instance spawned successfully."),
[*]                         instance=instance)
[*]                raise utils.LoopingCallDone
[*]
[*]      timer = utils.LoopingCall(_wait_for_boot)
[*]      return timer.start(interval=0.5, now=True)

def spawn(self, context, instance, image_meta, network_info,
block_device_info=None):
xml = self.to_xml(instance, network_info, image_meta, False,
block_device_info=block_device_info)
self.firewall_driver.setup_basic_filtering(instance, network_info)
self.firewall_driver.prepare_instance_filter(instance, network_info)
self._create_image(context, instance, xml, network_info=network_info,
block_device_info=block_device_info)
self._create_new_domain(xml)
LOG.debug(_("Instance is running"), instance=instance)
self._enable_hairpin(instance)
self.firewall_driver.apply_instance_filter(instance, network_info)
def _wait_for_boot():
"""Called at an interval until the VM is running."""
try:
state = self.get_info(instance)['state']
except exception.NotFound:
LOG.error(_("During reboot, instance disappeared."),
instance=instance)
raise utils.LoopingCallDone
if state == power_state.RUNNING:
LOG.info(_("Instance spawned successfully."),
instance=instance)
raise utils.LoopingCallDone
timer = utils.LoopingCall(_wait_for_boot)
return timer.start(interval=0.5, now=True)

[*]通过libvirt xml文件,然后根据xml文件生成instance
[*]准备network filter,默认的fierwall driver是iptables

[*]image的创建(详细情况以后再介绍)

      def _create_image(self, context, instance, libvirt_xml, suffix='',
                      disk_images=None, network_info=None,
                      block_device_info=None):
         ...

[*]   最后虚拟化driver的spawn()方法中调用driver 的creatXML()
  

版权声明:本文为博主原创文章,未经博主允许不得转载。
页: [1]
查看完整版本: openstack创建instance的流程解析