设为首页 收藏本站
查看: 823|回复: 0

[经验分享] OpenStack源码分析之cinder

[复制链接]

尚未签到

发表于 2018-6-1 12:03:27 | 显示全部楼层 |阅读模式
  转载至:http://architecture2.riaos.com/?p=3080665
  版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处!
  
  cat /usr/lib/python2.6/site-packages/cinder-2013.1.g3-py2.6.egg/EGG-INFO/scripts/cinder-volume
  “”"Starter script for Cinder Volume.”"”
import eventlet
eventlet.monkey_patch()
  import os
import sys
  # If ../cinder/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python…
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), os.pardir, os.pardir))
                                                                     
if os.path.exists(os.path.join(possible_topdir, ‘cinder’, ‘__init__.py’)):
    sys.path.insert(0, possible_topdir)
  from cinder import flags
from cinder.openstack.common import log as logging
from cinder import service
from cinder import utils
  FLAGS = flags.FLAGS
  if __name__ == ‘__main__’:
    flags.parse_args(sys.argv)
    logging.setup(“cinder”)
    utils.monkey_patch()
    launcher = service.ProcessLauncher()
    if FLAGS.enabled_backends:
        for backend in FLAGS.enabled_backends:
            host = “%s@%s” % (FLAGS.host, backend)
            server = service.Service.create(host=host, service_name=backend)                                    
            launcher.launch_server(server)
    else:
server = service.Service.create(binary=’cinder-volume’)  具体看下面的《cinder/service.Service.create方法分析》
        launcher.launch_server(server)                           实际调用的是Service的start方法

    launcher.wait()
  《cinder/service.Service.create方法分析》
  cinder/service.py
  class Service(object):
    “”"Service object for binaries running on hosts.
  A service takes a manager and enables rpc by listening to queues based
    on topic. It also periodically runs tasks on the manager and reports
    it state to the database services table.”"”
  def __init__(self, host, binary, topic, manager, report_interval=None,
                 periodic_interval=None, periodic_fuzzy_delay=None,
                 service_name=None, *args, **kwargs):
        self.host = host
        self.binary = binary
        self.topic = topic
        self.manager_class_name = manager
        manager_class = importutils.import_class(self.manager_class_name)    加载cinder.volume.manager.VolumeManager类
        self.manager = manager_class(host=self.host,                       初始化manager,具体过程看下面的《cinder/volume/manager.VolumeManager类初始化》
  service_name=service_name,
                                     *args, **kwargs)
        self.report_interval = report_interval
        self.periodic_interval = periodic_interval
        self.periodic_fuzzy_delay = periodic_fuzzy_delay
        super(Service, self).__init__(*args, **kwargs)
        self.saved_args, self.saved_kwargs = args, kwargs
        self.timers = []
  def start(self):
        version_string = version.version_string()
        LOG.audit(_(‘Starting %(topic)s node (version %(version_string)s)’),
                  {‘topic’: self.topic, ‘version_string’: version_string})
        self.manager.init_host()              调用的是cinder/volume/manager.VolumeManager.init_host
        self.model_disconnected = False
        ctxt = context.get_admin_context()     获取admin相关的context类容,返回一个class RequestContext对象(cinder/service.py)
        try:
            service_ref = db.service_get_by_args(ctxt, self.host,self.binary)  获取一个service服务引用对象                                             
            self.service_id = service_ref['id']  获取本服务的id
        except exception.NotFound:
            self._create_service_ref(ctxt)
  self.conn = rpc.create_connection(new=True)   创建一个rpc的connection对象,具体看《cinder.openstack.common.rpc.create_connection方法》
  实际上最终返回的是一个cinder/openstack/common/rpc/impl_qpid.Connection类的对象
  LOG.debug(_(“Creating Consumer connection for Service %s”) %
                  self.topic)
  rpc_dispatcher = self.manager.create_rpc_dispatcher()  创建一个rpc_dispatcher,cincer/manager.Manager.create_rpc_dispatcher
        # Share this same connection for these Consumers
        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)  具体看《cinder/openstack/common/rpc/impl_qpid.Connection类》
  node_topic = ‘%s.%s’ % (self.topic, self.host)
        self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
  self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
  # Consume from all consumers in a thread
        self.conn.consume_in_thread()     具体看《cinder/openstack/common/rpc/impl_qpid.Connection类》
  if self.report_interval:
            pulse = utils.LoopingCall(self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)
  if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None
  periodic = utils.LoopingCall(self.periodic_tasks)
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)
  def _create_service_ref(self, context):
        zone = FLAGS.storage_availability_zone
        service_ref = db.service_create(context,
                                        {‘host’: self.host,
                                         ‘binary’: self.binary,
                                         ‘topic’: self.topic,
                                         ‘report_count’: 0,
                                         ‘availability_zone’: zone})
        self.service_id = service_ref['id']
  def __getattr__(self, key):
        manager = self.__dict__.get(‘manager’, None)
        return getattr(manager, key)
  @classmethod
    def create(cls, host=None, binary=None, topic=None, manager=None,
               report_interval=None, periodic_interval=None,
               periodic_fuzzy_delay=None, service_name=None):
        “”"Instantiates class and passes back application object.
  :param host: defaults to FLAGS.host
        :param binary: defaults to basename of executable
        :param topic: defaults to bin_name – ‘cinder-’ part
        :param manager: defaults to FLAGS.<topic>_manager
        :param report_interval: defaults to FLAGS.report_interval
        :param periodic_interval: defaults to FLAGS.periodic_interval
        :param periodic_fuzzy_delay: defaults to FLAGS.periodic_fuzzy_delay
  “”"
        if not host:
            host = FLAGS.host
        if not binary:
            binary = os.path.basename(inspect.stack()[-1][1])
        if not topic:
            topic = binary     此处topic为cinder-volume
        if not manager:
            subtopic = topic.rpartition(‘cinder-’)[2]
            manager = FLAGS.get(‘%s_manager’ % subtopic, None)   此处构造的字符串为volume_manager,从flags.py中获取的配置项值为cinder.volume.manager.VolumeManager
        if report_interval is None:
            report_interval = FLAGS.report_interval
        if periodic_interval is None:
            periodic_interval = FLAGS.periodic_interval
        if periodic_fuzzy_delay is None:
            periodic_fuzzy_delay = FLAGS.periodic_fuzzy_delay
        service_obj = cls(host, binary, topic, manager,          调用本类初始化接口__init__
                          report_interval=report_interval,
                          periodic_interval=periodic_interval,
                          periodic_fuzzy_delay=periodic_fuzzy_delay,
                          service_name=service_name)
  return service_obj
  《cinder/volume/manager.VolumeManager类初始化》
  cinder/volume/manager.py
  class VolumeManager(manager.SchedulerDependentManager):      基类manager.SchedulerDependentManager来自cinder/manager.py
    “”"Manages attachable block storage devices.”"”
  RPC_API_VERSION = ’1.4′
  def __init__(self, volume_driver=None, service_name=None,
                 *args, **kwargs):
        “”"Load the driver from the one specified in args, or from flags.”"”
        self.configuration = Configuration(volume_manager_opts, config_group=service_name)
                                          
        if not volume_driver:
            volume_driver = self.configuration.volume_driver     根据volume_manager_opts配置可知,默认的volume_driver值为cinder.volume.drivers.lvm.LVMISCSIDriver
        if volume_driver in MAPPING:
            LOG.warn(_(“Driver path %s is deprecated, update your “
                       “configuration to the new path.”), volume_driver)
            volume_driver = MAPPING[volume_driver]
        self.driver = importutils.import_object(                 加载volume驱动模块cinder.volume.drivers.lvm.LVMISCSIDriver
                                        volume_driver,
                                        configuration=self.configuration)
        # update_service_capabilities needs service_name to be volume
        super(VolumeManager, self).__init__(service_name=’volume’,*args, **kwargs)    调用基类初始化,具体看下面的《cinder/manager.SchedulerDependentManager类初始化》
  self.driver.db = self.db
  def init_host(self):
        “”"Do any initialization that needs to be run if this is a
           standalone service.”"”
  ctxt = context.get_admin_context()
        self.driver.do_setup(ctxt)                具体看下面的《cinder.volume.drivers.lvm.LVMISCSIDriver.do_setup方法》
        self.driver.check_for_setup_error
()
  volumes = self.db.volume_get_all_by_host(ctxt, self.host)
        LOG.debug(_(“Re-exporting %s volumes”), len(volumes))
        for volume in volumes:
            if volume['status'] in ['available', 'in-use']:
                self.driver.ensure_export(ctxt, volume)
            else:
                LOG.info(_(“volume %s: skipping export”), volume['name'])
  LOG.debug(_(‘Resuming any in progress delete operations’))
        for volume in volumes:
            if volume['status'] == ‘deleting’:
                LOG.info(_(‘Resuming delete on volume: %s’) % volume['id'])
                self.delete_volume(ctxt, volume['id'])
  # collect and publish service capabilities
        self.publish_service_capabilities(ctxt)
  以创建一个卷为示例,看下面的具体分析《cinder/volume/manager.VolumeManager.create_volume方法调用》
        def create_volume(self, context, volume_id, request_spec=None,   
                      filter_properties=None, allow_reschedule=True,
                      snapshot_id=None, image_id=None, source_volid=None):
  def delete_volume(self, context, volume_id):
  ………….
  def create_snapshot(self, context, volume_id, snapshot_id):
  ………….
  def delete_snapshot(self, context, snapshot_id):
  ………….
  def attach_volume(self, context, volume_id, instance_uuid, mountpoint):
  ………….
  def detach_volume(self, context, volume_id):
  ………….
  《cinder/manager.SchedulerDependentManager类初始化》
  cincer/manager.py
  class SchedulerDependentManager(Manager):      基类cinder/manager.Manager
    “”"Periodically send capability updates to the Scheduler services.
  Services that need to update the Scheduler of their capabilities
    should derive from this class. Otherwise they can derive from
    manager.Manager directly. Updates are only sent after
    update_service_capabilities is called with non-None values.
  “”"
  def __init__(self, host=None, db_driver=None, service_name=’undefined’):  此处传入的service_name为volume
        self.last_capabilities = None
        self.service_name = service_name  
        self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()        调度器的rpcapi,具体看《cinder/scheduler/rpcapi.SchedulerAPI类》
  super(SchedulerDependentManager, self).__init__(host, db_driver)      调用基类Manager的初始化方法,具体看《cinder/manager.Manager类初始化》
  def update_service_capabilities(self, capabilities):
        “”"Remember these capabilities to send on next periodic update.”"”
        self.last_capabilities = capabilities
  @periodic_task
    def _publish_service_capabilities(self, context):
        “”"Pass data back to the scheduler at a periodic interval.”"”
        if self.last_capabilities:
            LOG.debug(_(‘Notifying Schedulers of capabilities …’))
            self.scheduler_rpcapi.update_service_capabilities(
  context,
                self.service_name,
                self.host,
                self.last_capabilities)
  
  《cinder/manager.Manager类初始化》
  cincer/manager.py
  class Manager(base.Base):
    __metaclass__ = ManagerMeta
  # Set RPC API version to 1.0 by default.
    RPC_API_VERSION = ’1.0′
  def __init__(self, host=None, db_driver=None):
        if not host:
            host = FLAGS.host
        self.host = host
        super(Manager, self).__init__(db_driver)        DB的驱动是在基类base.Base中设置的
  创建一个rpc_dispatcher,这个非常重要,所有的服务请求消息都是通过该rpc_dispatcher分发到manager所管理的功能接口中去的,
  具体看《cinder/openstack/common/rpc/dispatcher.RpcDispatcher类》
  def create_rpc_dispatcher(self):
  return rpc_dispatcher.RpcDispatcher([self])
  def periodic_tasks(self, context, raise_on_error=False):
        “”"Tasks to be run at a periodic interval.”"”
        for task_name, task in self._periodic_tasks:
            full_task_name = ‘.’.join([self.__class__.__name__, task_name])
  ticks_to_skip = self._ticks_to_skip[task_name]
            if ticks_to_skip > 0:
                LOG.debug(_(“Skipping %(full_task_name)s, %(ticks_to_skip)s”
                            ” ticks left until next run”), locals())
                self._ticks_to_skip[task_name] -= 1
                continue
  self._ticks_to_skip[task_name] = task._ticks_between_runs
            LOG.debug(_(“Running periodic task %(full_task_name)s”), locals())
  try:
                task(self, context)
            except Exception as e:
                if raise_on_error:
                    raise
                LOG.exception(_(“Error during %(full_task_name)s: %(e)s”),
                              locals())
  def init_host(self):
        pass
  def service_version(self, context):
        return version.version_string()
  def service_config(self, context):
        config = {}
        for key in FLAGS:
            config[key] = FLAGS.get(key, None)
        return config
  《cinder/openstack/common/rpc/dispatcher.RpcDispatcher类》
  cinder/openstack/common/rpc/dispatcher.py
  class RpcDispatcher(object):
    “”"Dispatch rpc messages according to the requested API version.
  This class can be used as the top level ‘manager’ for a service.  It
    contains a list of underlying managers that have an API_VERSION attribute.
    “”"
  def __init__(self, callbacks):        在本流程中传入的callbacks是一个VolumeManager对象
  “”"Initialize the rpc dispatcher.
  :param callbacks: List of proxy objects that are an instance
                          of a class with rpc methods exposed.  Each proxy
                          object should have an RPC_API_VERSION attribute.
        “”"
        self.callbacks = callbacks
        super(RpcDispatcher, self).__init__()
  @staticmethod
    def _is_compatible(mversion, version):
        “”"Determine whether versions are compatible.
  :param mversion: The API version implemented by a callback.
        :param version: The API version requested by an incoming message.
        “”"
        version_parts = version.split(‘.’)
        mversion_parts = mversion.split(‘.’)
        if int(version_parts[0]) != int(mversion_parts[0]):  # Major
            return False
        if int(version_parts[1]) > int(mversion_parts[1]):  # Minor
            return False
        return True
  def dispatch(self, ctxt, version, method, **kwargs):
        “”"Dispatch a message based on a requested version.
        “”"
        if not version:
            version = ’1.0′
  had_compatible = False
        for proxyobj in self.callbacks:
            if hasattr(proxyobj, ‘RPC_API_VERSION’):
                rpc_api_version = proxyobj.RPC_API_VERSION
            else:
                rpc_api_version = ’1.0′
            is_compatible = self._is_compatible(rpc_api_version, version)
            had_compatible = had_compatible or is_compatible
            if not hasattr(proxyobj, method):                     检查在VolumeManager是否有某一个方法,例如create_volume、delete_volume…….
  continue
            if is_compatible:
                return getattr(proxyobj, method)(ctxt, **kwargs)  调用该方法,例如create_volume、delete_volume等
  if had_compatible:
            raise AttributeError(“No such RPC function ‘%s’” % method)
        else:
            raise rpc_common.UnsupportedRpcVersion(version=version)
  《cinder/volume/manager.VolumeManager.create_volume方法调用》
  cinder/volume/manager.py
  class VolumeManager(manager.SchedulerDependentManager):
  def create_volume(self, context, volume_id, request_spec=None,
                      filter_properties=None, allow_reschedule=True,
                      snapshot_id=None, image_id=None, source_volid=None):
        “”"Creates and exports the volume.”"”
        context = context.elevated()
        if filter_properties is None:
            filter_properties = {}
        volume_ref = self.db.volume_get(context, volume_id)   产生一个volume_ref对象
        self._notify_about_volume_usage(context, volume_ref, “create.start”)
        LOG.info(_(“volume %s: creating”), volume_ref['name'])
  # NOTE(vish): so we don’t have to get volume from db again
        #             before passing it to the driver.
        volume_ref['host'] = self.host
  status = ‘available’
        model_update = False
        image_meta = None
  try:
            vol_name = volume_ref['name']
            vol_size = volume_ref['size']
            LOG.debug(_(“volume %(vol_name)s: creating lv of”
                        ” size %(vol_size)sG”) % locals())
            snapshot_ref = None
            sourcevol_ref = None
            image_service = None
            image_location = None
            image_meta = None
  if snapshot_id is not None:
                snapshot_ref = self.db.snapshot_get(context, snapshot_id)
            elif source_volid is not None:
                sourcevol_ref = self.db.volume_get(context, source_volid)
            elif image_id is not None:
                # create the volume from an image
                image_service, image_id =
                    glance.get_remote_image_service(context,
                                                    image_id)
                image_location = image_service.get_location(context, image_id)
                image_meta = image_service.show(context, image_id)
  try:
                model_update, cloned = self._create_volume(context,     创建volume
                                                           volume_ref,
                                                           snapshot_ref,
                                                           sourcevol_ref,
                                                           image_service,
                                                           image_id,
                                                           image_location)
            except Exception:
                # restore source volume status before reschedule
                if sourcevol_ref is not None:
                    self.db.volume_update(context, sourcevol_ref['id'],
                                          {‘status’: sourcevol_ref['status']})
                exc_info = sys.exc_info()
                # try to re-schedule volume:
                self._reschedule_or_reraise(context, volume_id, exc_info,
                                            snapshot_id, image_id,
                                            request_spec, filter_properties,
                                            allow_reschedule)
  if model_update:
                volume_ref = self.db.volume_update(
                    context, volume_ref['id'], model_update)
            if sourcevol_ref is not None:
                self.db.volume_glance_metadata_copy_from_volume_to_volume(
  context,
                    source_volid,
                    volume_id)
  LOG.debug(_(“volume %s: creating export”), volume_ref['name'])
            model_update = self.driver.create_export(context, volume_ref)     // 导出创建的volume[lun]到系统下为一个设备
            if model_update:
                self.db.volume_update(context, volume_ref['id'], model_update)
  except Exception:
            with excutils.save_and_reraise_exception():
                self.db.volume_update(context,
                                      volume_ref['id'], {‘status’: ‘error’})
  if snapshot_id:
            # Copy any Glance metadata from the original volume
            self.db.volume_glance_metadata_copy_to_volume(context,
                                                          volume_ref['id'],
                                                          snapshot_id)
  if image_id and not cloned:
            if image_meta:
                # Copy all of the Glance image properties to the
                # volume_glance_metadata table for future reference.
                self.db.volume_glance_metadata_create(context,
                                                      volume_ref['id'],
                                                      ‘image_id’, image_id)
  name = image_meta.get(‘name’, None)
                if name:
                    self.db.volume_glance_metadata_create(context,
                                                          volume_ref['id'],
                                                          ‘image_name’, name)
                image_properties = image_meta.get(‘properties’, {})
                for key, value in image_properties.items():
                    self.db.volume_glance_metadata_create(context,
                                                          volume_ref['id'],
                                                          key, value)
  now = timeutils.utcnow()
        self.db.volume_update(context,
                              volume_ref['id'], {‘status’: status,
                                                 ‘launched_at’: now})
        LOG.debug(_(“volume %s: created successfully”), volume_ref['name'])
        self._reset_stats()
  self._notify_about_volume_usage(context, volume_ref, “create.end”)
        return volume_ref['id']
  def _create_volume(self, context, volume_ref, snapshot_ref,
  srcvol_ref, image_service, image_id, image_location):
        cloned = None
        model_update = False
  if all(x is None for x in(snapshot_ref, image_id, srcvol_ref)):
            model_update = self.driver.create_volume(volume_ref)   
  此处的driver就是cinder.volume.drivers.lvm.LVMISCSIDriver对象,而create_volume方法是继承自class LVMVolumeDriver类,所以具体的可以查看该类中的create_volume实现
  elif snapshot_ref is not None:
            model_update = self.driver.create_volume_from_snapshot(
                volume_ref,
                snapshot_ref)
        elif srcvol_ref is not None:
            model_update = self.driver.create_cloned_volume(volume_ref,
                                                            srcvol_ref)
        else:
            # create the volume from an image
            cloned = self.driver.clone_image(volume_ref, image_location)
            if not cloned:
                model_update = self.driver.create_volume(volume_ref)
  updates = dict(model_update or dict(), status=’downloading’)
                volume_ref = self.db.volume_update(context,
                                                   volume_ref['id'],
                                                   updates)
  self._copy_image_to_volume(context,
                                           volume_ref,
                                           image_service,
                                           image_id)
  return model_update, cloned
  这里另外说明一下关于 attach_volume和detach_volume方法,这两个方法均是来自LVMVolumeDriver基类的cinder/volume/driver.VolumeDriver,其中并没有在LVMVolumeDriver中被重载,而是直接调用了基类的这两个方法,也就是直接pass,实际上attach_volume和detach_volume方法是在nova-compute服务中完成的,因为一次attach操作会出发两个api调用,一个是针对cinder的,一个是发送到AMQP的
  《cinder.volume.drivers.lvm.LVMISCSIDriver.do_setup方法》
  cinder/volume/dirvers/lvm.py
  class LVMISCSIDriver(LVMVolumeDriver, driver.ISCSIDriver):  基类LVMVolumeDriver与ISCSIDriver
  def __init__(self, *args, **kwargs):
        self.tgtadm = iscsi.get_target_admin()                获取tgtadm对象,具体看《cinder.volume.iscsi.get_target_admin函数
  super(LVMISCSIDriver, self).__init__(*args, **kwargs)   调用基类的初始化方法,具体看《cinder.volume.drivers.lvm.LVMVolumeDriver类》《cinder.volume.driver.ISCSIDriver类》
  《cinder.volume.drivers.lvm.LVMISCSIDriver.do_setup方法》
  cinder/volume/dirvers/lvm.py
  class LVMVolumeDriver(driver.VolumeDriver):               
    “”"Executes commands relating to Volumes.”"”
    def __init__(self, *args, **kwargs):
        super(LVMVolumeDriver, self).__init__(*args, **kwargs) 调用基类的初始化方法,具体看《cinder.volume.driver.VolumeDriver类》
        self.configuration.append_config_values(volume_opts)
  def check_for_setup_error(self):
  …………….
  《cinder.volume.iscsi.get_target_admin函数
  cinder/volume/iscsi.py
  def get_target_admin():
    if FLAGS.iscsi_helper == ‘tgtadm’:
        return TgtAdm()                 根据配置我们使用的是tgtadm作为vg的管理工具,返回一个TgtAdm的对象
    elif FLAGS.iscsi_helper == ‘fake’:
        return FakeIscsiHelper()
    elif FLAGS.iscsi_helper == ‘lioadm’:
        return LioAdm()
    else:
        return IetAdm()
  class TgtAdm(TargetAdmin):
    “”"iSCSI target administration using tgtadm.”"”
  def __init__(self, execute=utils.execute):
        super(TgtAdm, self).__init__(‘tgtadm’, execute)  调用基类TargetAdmin的初始化方法
  
    def _get_target(self, iqn):
        (out, err) = self._execute(‘tgt-admin’, ‘–show’, run_as_root=True)
        lines = out.split(‘n’)
        for line in lines:
            if iqn in line:
                parsed = line.split()
                tid = parsed[1]
                return tid[:-1]
  return None
  def create_iscsi_target(self, name, tid, lun, path,
                            chap_auth=None, **kwargs):
        # Note(jdg) tid and lun aren’t used by TgtAdm but remain for
        # compatibility
  utils.ensure_tree(FLAGS.volumes_dir)
  vol_id = name.split(‘:’)[1]
        if chap_auth is None:
            volume_conf = “”"
                <target %s>
                    backing-store %s
                </target>
            “”" % (name, path)
        else:
            volume_conf = “”"
                <target %s>
                    backing-store %s
  %s
                </target>
            “”" % (name, path, chap_auth)
  LOG.info(_(‘Creating volume: %s’) % vol_id)
        volumes_dir = FLAGS.volumes_dir
        volume_path = os.path.join(volumes_dir, vol_id)
  f = open(volume_path, ‘w+’)
        f.write(volume_conf)
        f.close()
  old_persist_file = None
        old_name = kwargs.get(‘old_name’, None)
        if old_name is not None:
            old_persist_file = os.path.join(volumes_dir, old_name)
  try:
            (out, err) = self._execute(‘tgt-admin’,
                                       ‘–update’,
                                       name,
                                       run_as_root=True)
        except exception.ProcessExecutionError, e:
            LOG.error(_(“Failed to create iscsi target for volume “
                        “id:%(vol_id)s.”) % locals())
  #Don’t forget to remove the persistent file we created
            os.unlink(volume_path)
            raise exception.ISCSITargetCreateFailed(volume_id=vol_id)
  iqn = ‘%s%s’ % (FLAGS.iscsi_target_prefix, vol_id)
        tid = self._get_target(iqn)
        if tid is None:
            LOG.error(_(“Failed to create iscsi target for volume “
                        “id:%(vol_id)s. Please ensure your tgtd config file “
  “contains ‘include %(volumes_dir)s/*’”) % locals())
            raise exception.NotFound()
  if old_persist_file is not None and os.path.exists(old_persist_file):
            os.unlink(old_persist_file)
  return tid
  def remove_iscsi_target(self, tid, lun, vol_id, **kwargs):
        LOG.info(_(‘Removing volume: %s’) % vol_id)
        vol_uuid_file = FLAGS.volume_name_template % vol_id
        volume_path = os.path.join(FLAGS.volumes_dir, vol_uuid_file)
        if os.path.isfile(volume_path):
            iqn = ‘%s%s’ % (FLAGS.iscsi_target_prefix,
                            vol_uuid_file)
        else:
            raise exception.ISCSITargetRemoveFailed(volume_id=vol_id)
  try:
            self._execute(‘tgt-admin’,
                          ‘–delete’,
                          iqn,
                          run_as_root=True)
        except exception.ProcessExecutionError, e:
            LOG.error(_(“Failed to delete iscsi target for volume “
                        “id:%(vol_id)s.”) % locals())
            raise exception.ISCSITargetRemoveFailed(volume_id=vol_id)
  os.unlink(volume_path)
  def show_target(self, tid, iqn=None, **kwargs):
        if iqn is None:
            raise exception.InvalidParameterValue(
                err=_(‘valid iqn needed for show_target’))
  tid = self._get_target(iqn)
  if tid is None:
            raise exception.NotFound()
  class TargetAdmin(object):
    “”"iSCSI target administration.
  Base class for iSCSI target admin helpers.
    “”"
  def __init__(self, cmd, execute):
        self._cmd = cmd
        self.set_execute(execute)  设置shell执行对象
  def set_execute(self, execute):
        “”"Set the function to be used to execute commands.”"”
        self._execute = execute
  def _run(self, *args, **kwargs):
        self._execute(self._cmd, *args, run_as_root=True, **kwargs)
  def create_iscsi_target(self, name, tid, lun, path,
                            chap_auth=None, **kwargs):
        “”"Create a iSCSI target and logical unit”"”
        raise NotImplementedError()
  def remove_iscsi_target(self, tid, lun, vol_id, **kwargs):
        “”"Remove a iSCSI target and logical unit”"”
        raise NotImplementedError()
  def _new_target(self, name, tid, **kwargs):
        “”"Create a new iSCSI target.”"”
        raise NotImplementedError()
  def _delete_target(self, tid, **kwargs):
        “”"Delete a target.”"”
        raise NotImplementedError()
  def show_target(self, tid, iqn=None, **kwargs):
        “”"Query the given target ID.”"”
        raise NotImplementedError()
  def _new_logicalunit(self, tid, lun, path, **kwargs):
        “”"Create a new LUN on a target using the supplied path.”"”
        raise NotImplementedError()
  def _delete_logicalunit(self, tid, lun, **kwargs):
        “”"Delete a logical unit from a target.”"”
        raise NotImplementedError()
  《cinder.volume.driver.VolumeDriver类》
  cinder/volume/driver.py
  class LVMVolumeDriver(driver.VolumeDriver):
  def _create_volume(self, volume_name, sizestr):
        cmd = ['lvcreate', '-L', sizestr, '-n', volume_name, self.configuration.volume_group]
        if self.configuration.lvm_mirrors:
            cmd += ['-m', self.configuration.lvm_mirrors, '--nosync']
            terras = int(sizestr[:-1]) / 1024.0
            if terras >= 1.5:
                rsize = int(2 ** math.ceil(math.log(terras) / math.log(2)))
                # NOTE(vish): Next power of two for region size. See:
                #             http://red.ht/U2BPOD
                cmd += ['-R', str(rsize)]
  self._try_execute(*cmd, run_as_root=True)  执行一个创建卷的命令
  def create_volume(self, volume):
        “”"Creates a logical volume. Can optionally return a Dictionary of
        changes to the volume object to be persisted.”"”
        self._create_volume(volume['name'], self._sizestr(volume['size']))
  
  《cinder.volume.driver.ISCSIDriver类》
  
  
  
  
  《cinder.openstack.common.rpc.create_connection方法》
  cinder/openstack/common/rpc.py
  def create_connection(new=True):
  return _get_impl().create_connection(cfg.CONF, new=new)
  
  def _get_impl():
  if _RPCIMPL is None:
        try:
            _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
  在这里我们rpc_backend=cinder.openstack.common.rpc.impl_qpid
  因此使用的rpc连接服务是用impl_qpid完成的,具体看《cinder/openstack/common/rpc/impl_qpid.Connection类》
  《cinder/openstack/common/rpc/impl_qpid.Connection类》
  cinder/openstack/common/rpc/impl_qpid.py
  def create_connection(conf, new=True):
    “”"Create a connection”"”
    return rpc_amqp.create_connection(conf, new,rpc_amqp.get_connection_pool(conf, Connection))  传入Connection类,具体看《openstack/common/rpc/amqp.create_connection函数》
  class Connection(object):
  pool = None
   
    def __init__(self, conf, server_params=None):
        self.session = None
        self.consumers = {}
        self.consumer_thread = None
  self.conf = conf
  if server_params is None:
            server_params = {}
  default_params = dict(hostname=self.conf.qpid_hostname,
                              port=self.conf.qpid_port,
                              username=self.conf.qpid_username,
  password=self.conf.qpid_password)
  params = server_params
        for key in default_params.keys():
            params.setdefault(key, default_params[key])
  self.broker = params['hostname'] + “:” + str(params['port'])
        # Create the connection – this does not open the connection
        self.connection = qpid.messaging.Connection(self.broker)
  # Check if flags are set and if so set them for the connection
        # before we call open
        self.connection.username = params['username']
        self.connection.password = params['password']
        self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
        self.connection.reconnect = self.conf.qpid_reconnect
        if self.conf.qpid_reconnect_timeout:
  self.connection.reconnect_timeout = (
                self.conf.qpid_reconnect_timeout)
        if self.conf.qpid_reconnect_limit:
            self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
        if self.conf.qpid_reconnect_interval_max:
            self.connection.reconnect_interval_max = (
                self.conf.qpid_reconnect_interval_max)
  if self.conf.qpid_reconnect_interval_min:
            self.connection.reconnect_interval_min = (
                self.conf.qpid_reconnect_interval_min)
        if self.conf.qpid_reconnect_interval:
            self.connection.reconnect_interval = (
                self.conf.qpid_reconnect_interval)
        self.connection.heartbeat = self.conf.qpid_heartbeat
        self.connection.protocol = self.conf.qpid_protocol
  self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
  # Open is part of reconnect -
        # NOTE(WGH) not sure we need this with the reconnect flags
        self.reconnect()
  def _register_consumer(self, consumer):
        self.consumers[str(consumer.get_receiver())] = consumer
  def _lookup_consumer(self, receiver):
        return self.consumers[str(receiver)]
  def reconnect(self):
  …………………….
  LOG.info(_(‘Connected to AMQP server on %s’), self.broker)
  …………………….
  def iterconsume(self, limit=None, timeout=None):
        “”"Return an iterator that will consume from all queues/consumers”"”
           
        def _error_callback(exc):
            if isinstance(exc, qpid.messaging.exceptions.Empty):
                LOG.exception(_(‘Timed out waiting for RPC response: %s’) %
                              str(exc))
                raise rpc_common.Timeout()
            else:
                LOG.exception(_(‘Failed to consume message from queue: %s’) %
                              str(exc))
               
        def _consume():
            nxt_receiver = self.session.next_receiver(timeout=timeout)
            try:
                self._lookup_consumer(nxt_receiver).consume()   
  _lookup_consumer查找的是在create_consumer时注册的consumer,既TopicConsumer和FanoutConsumer,这二者的基类是ConsumerBase,
  所以此处后面的consume()就是调用的ConsumerBase的consume 方法,具体看下该类的该方法。
  except Exception:
                LOG.exception(_(“Error processing message.  Skipping it.”))
  for iteration in itertools.count(0):
            if limit and iteration >= limit:
                raise StopIteration
            yield self.ensure(_error_callback, _consume)
  def consume(self, limit=None):
  it = self.iterconsume(limit=limit)
        while True:
            try:
                it.next()
  ……….
  def consume_in_thread(self):
        “”"Consumer from all queues/consumers in a greenthread”"”
        def _consumer_thread():
            try:
                self.consume()
  def create_consumer(self, topic, proxy, fanout=False):
  “”"Create a consumer that calls a method in a proxy object”"”
        proxy_cb = rpc_amqp.ProxyCallback(
            self.conf, proxy,
            rpc_amqp.get_connection_pool(self.conf, Connection))
  if fanout:
            consumer = FanoutConsumer(self.conf,
self.session, topic, proxy_cb)
  else:
            consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
  self._register_consumer(consumer)
  return consumer
  class TopicConsumer(ConsumerBase):  基类为ConsumerBase
  class FanoutConsumer(ConsumerBase):  基类为ConsumerBase
  
  class ConsumerBase(object):
    “”"Consumer base class.”"”
  def __init__(self, session, callback, node_name, node_opts,
                 link_name, link_opts):
  self.callback = callback
        self.receiver = None
        self.session = None
  addr_opts = {
            “create”: “always”,
            “node”: {
                “type”: “topic”,
                “x-declare”: {
                    “durable”: True,
                    “auto-delete”: True,
                },
            },
            “link”: {
                “name”: link_name,
                “durable”: True,
                “x-declare”: {
                    “durable”: False,
                    “auto-delete”: True,
                    “exclusive”: False,
                },
            },
        }
        addr_opts["node"]["x-declare"].update(node_opts)
        addr_opts["link"]["x-declare"].update(link_opts)
  self.address = “%s ; %s” % (node_name, jsonutils.dumps(addr_opts))
  self.reconnect(session)
  def reconnect(self, session):
        “”"Re-declare the receiver after a qpid reconnect”"”
        self.session = session
        self.receiver = session.receiver(self.address)
        self.receiver.capacity = 1
  def consume(self):
        “”"Fetch the message and pass it to the callback object”"”
        message = self.receiver.fetch()
        try:
            print ‘****** cinder:openstack:common:rpc:impl_qpid:ConsumerBase:consume: message.content: ‘, message.content
            self.callback(message.content)  此处的callback实际上是一个rpc_dispatcher,
        except Exception:
            LOG.exception(_(“Failed to process message… skipping it.”))
        finally:
            self.session.acknowledge(message)
  def get_receiver(self):
        return self.receiver
  《cinder/openstack/common/rpc/amqp.create_connection函数》
  cinder/openstack/common/rpc/amqp.py
  def get_connection_pool(conf, connection_cls):
    with _pool_create_sem:
        # Make sure only one thread tries to create the connection pool.
        if not connection_cls.pool:
            connection_cls.pool = Pool(conf, connection_cls)
    return connection_cls.pool
  
  def create_connection(conf, new, connection_pool):
    “”"Create a connection”"”
    return ConnectionContext(conf, connection_pool, pooled=not new)
  class ConnectionContext(rpc_common.Connection):
  def __init__(self, conf, connection_pool, pooled=True, server_params=None):
        “”"Create a new connection, or get one from the pool”"”
        self.connection = None
        self.conf = conf
        self.connection_pool = connection_pool
        if pooled:
            self.connection = connection_pool.get()
        else:
            self.connection = connection_pool.connection_cls(conf,server_params=server_params)         
        self.pooled = pooled
  下面的三个加粗的调用实际就是调用的cinder/openstack/common/rpc/impl_qpid.Connection类的方法
  def create_consumer(self, topic, proxy, fanout=False):
        self.connection.create_consumer(topic, proxy, fanout)
  def create_worker(self, topic, proxy, pool_name):
        self.connection.create_worker(topic, proxy, pool_name)
  def consume_in_thread(self):
        self.connection.consume_in_thread()
  This entry was posted in 系统架构 by admin. Bookmark the permalink.
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-499514-1-1.html 上篇帖子: openstack安装报错及处理 下篇帖子: 一步一步跟着官方文档安装部署Openstack(icehouse)三
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表