cy_88 发表于 2016-1-9 11:56:00

OpenStack Swift源码初探--proxy下的server.py

  OpenStack Swift作为开源的云存储工具,被越来越多的公司使用。为了记录和巩固学习swift的开源源码,所以进行一系列的源码开源学习笔记,供初学者快速学习和理解swift的内部功能。
  proxy下面的server.py模块是所有对account,container,object等对象进行管理操作的在swift的proxy端的总入口。在swift系统在接收到url请求后,先是经过middleware处理链分别验证处理后,再进入到proxy下面的server模块,进入 _call_方法调用后,把对应的请求分发给不同的controller处理,controller再调用各自的nodeStroage服务程序进行处理,返回各自的resp结果到server模块,最后通过mimmdleware处理链再反方向返回最终的请求处理结果
  
   1.首先server.py的整个结构如下图:包括4部分:一堆import引用,一个required_filters的字典,一个名为“Application(object)”的class ,一个app_factory(global_conf, **local_conf) 方法

   2.主要介绍“Application(object)”的class,其中包含了所有主要的功能方法

  
  2.1 _init_ 方法,Application类的初始化方法,主要就是初始化一些对象,包括:conf配置文件参数 的初始化,log日志初始化,memcache对象初始化,account_ring,container_ring, object_ring对象初始化等
  
  2.2  check_config(self) 方法,主要是检查配置文件proxy-server.conf中配置的“read_affinity” 和“sorting_method”属性值是否正确,该方法在 app_factory(global_conf, **local_conf):方法时调用  
  
  2.3 get_controller(self, path)方法,主要是根据传入的 urlPath解析并返回对应的control类和一个字典对象,其中字典对象的值根据传入url格式的不同返回不同的值
  
 view plaincopy



[*]def get_controller(self, path):  
[*]        """ 
[*]        Get the controller to handle a request. 
[*] 
[*]        :param path: path from request 
[*]        :returns: tuple of (controller class, path dictionary) 
[*] 
[*]        :raises: ValueError (thrown by split_path) if given invalid path 
[*]        """  
[*]        if path == '/info':  #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典  
[*]            d = dict(version=None,  
[*]                     expose_info=self.expose_info,  
[*]                     disallowed_sections=self.disallowed_sections,  
[*]                     admin_key=self.admin_key)  
[*]            return InfoController, d  
[*]  
[*]        version, account, container, obj = split_path(path, 1, 4, True)  #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量  
[*]        d = dict(version=version,  
[*]                 account_name=account,  
[*]                 container_name=container,  
[*]                 object_name=obj)  
[*]        if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种  
[*]            return ObjectController, d  
[*]        elif container and account:  
[*]            return ContainerController, d  
[*]        elif account and not container and not obj:  
[*]            return AccountController, d  
[*]        return None, d  

  
 
  
  2.4  __call__(self, env, start_response)方法,是server模块的实际对account、container、object等对象调用处理的功能入口。
  
 view plaincopy



[*]def __call__(self, env, start_response):  
[*]        """ 
[*]        WSGI entry point. 
[*]        Wraps env in swob.Request object and passes it down. 
[*] 
[*]        :param env: WSGI environment dictionary 
[*]        :param start_response: WSGI callable 
[*]        """  
[*]        try:  
[*]            if self.memcache is None: #首先判断是否memcache值存在,不存在再去获取一次  
[*]                self.memcache = cache_from_env(env)  
[*]            req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token  
[*]            return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象  
[*]        except UnicodeError:  
[*]            err = HTTPPreconditionFailed(  
[*]                request=req, body='Invalid UTF8 or contains NULL')  
[*]            return err(env, start_response)  
[*]        except (Exception, Timeout):  
[*]            start_response('500 Server Error',  
[*]                           [('Content-Type', 'text/plain')])  
[*]            return ['Internal server error.\n']  

  
2.5 update_request(self, req)方法,根据请求中header里面的x-storage-token有而x-auth-token没有的情况,把x-storage-token的值赋予x-auth-token
  
  
2.6 handle_request(self, req)方法,server模块实际处理request请求的方法,熟悉servlet的同学可以把它理解成servlet的作用
  
 view plaincopy



[*]def handle_request(self, req):  
[*]        """ 
[*]        Entry point for proxy server. 
[*]        Should return a WSGI-style callable (such as swob.Response). 
[*] 
[*]        :param req: swob.Request object 
[*]        """  
[*]        try:  
[*]            self.logger.set_statsd_prefix('proxy-server') #在日志的开头加上‘proxy-server’,方便跟踪分析  
[*]            if req.content_length and req.content_length < 0: #检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录  
[*]                self.logger.increment('errors')  
[*]                return HTTPBadRequest(request=req,  
[*]                                      body='Invalid Content-Length')  
[*]  
[*]            try:  
[*]                if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录  
[*]                    self.logger.increment('errors')  
[*]                    return HTTPPreconditionFailed(  
[*]                        request=req, body='Invalid UTF8 or contains NULL')  
[*]            except UnicodeError:  
[*]                self.logger.increment('errors')  
[*]                return HTTPPreconditionFailed(  
[*]                    request=req, body='Invalid UTF8 or contains NULL')  
[*]  
[*]            try:  
[*]                controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象  
[*]                p = req.path_info  
[*]                if isinstance(p, unicode):  
[*]                    p = p.encode('utf-8') #path编码Unicode转换utf-8  
[*]            except ValueError:           #发生值异常,返回错误请求,并日志记录  
[*]                self.logger.increment('errors')  
[*]                return HTTPNotFound(request=req)  
[*]            if not controller:         #为找到对应处理的controller类时,返回错误请求,并日志记录  
[*]                self.logger.increment('errors')  
[*]                return HTTPPreconditionFailed(request=req, body='Bad URL')  
[*]            if self.deny_host_headers and \    #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录  
[*]                    req.host.split(':')[0] in self.deny_host_headers:  
[*]                return HTTPForbidden(request=req, body='Invalid host header')  
[*]  
[*]            self.logger.set_statsd_prefix('proxy-server.' +  
[*]                                          controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析  
[*]            controller = controller(self, **path_parts)  #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)  
[*]            if 'swift.trans_id' not in req.environ:    #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID  
[*]                # if this wasn't set by an earlier middleware, set it now  
[*]                trans_id = generate_trans_id(self.trans_id_suffix)  
[*]                req.environ['swift.trans_id'] = trans_id  
[*]                self.logger.txn_id = trans_id  
[*]            req.headers['x-trans-id'] = req.environ['swift.trans_id']  
[*]            controller.trans_id = req.environ['swift.trans_id']  
[*]            self.logger.client_ip = get_remote_client(req)  #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析  
[*]            try:  
[*]                handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)  
[*]                getattr(handler, 'publicly_accessible') #再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)  
[*]            except AttributeError:  
[*]                allowed_methods = getattr(controller, 'allowed_methods', set())  
[*]                return HTTPMethodNotAllowed(  
[*]                    request=req, headers={'Allow': ', '.join(allowed_methods)})  
[*]            if 'swift.authorize' in req.environ: #做鉴权操作  
[*]                # We call authorize before the handler, always. If authorized,  
[*]                # we remove the swift.authorize hook so isn't ever called  
[*]                # again. If not authorized, we return the denial unless the  
[*]                # controller's method indicates it'd like to gather more  
[*]                # information and try again later.  
[*]                resp = req.environ['swift.authorize'](req)  
[*]                if not resp:  
[*]                    # No resp means authorized, no delayed recheck required.  
[*]                    del req.environ['swift.authorize']  
[*]                else:  
[*]                    # Response indicates denial, but we might delay the denial  
[*]                    # and recheck later. If not delayed, return the error now.  
[*]                    if not getattr(handler, 'delay_denial', None):  
[*]                        return resp  
[*]            # Save off original request method (GET, POST, etc.) in case it  
[*]            # gets mutated during handling.  This way logging can display the  
[*]            # method the client actually sent.  
[*]            req.environ['swift.orig_req_method'] = req.method  
[*]            return handler(req)    #调用最终的method方法,并返回resp结果  
[*]        except HTTPException as error_response:  
[*]            return error_response  
[*]        except (Exception, Timeout):  
[*]            self.logger.exception(_('ERROR Unhandled exception in request'))  
[*]            return HTTPServerError(request=req)  

  
  2.7 sort_nodes(self, nodes)方法,对nodes对象进行排序处理,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用
  
 view plaincopy



[*]def sort_nodes(self, nodes):  
[*]        ''''' 
[*]        Sorts nodes in-place (and returns the sorted list) according to 
[*]        the configured strategy. The default "sorting" is to randomly 
[*]        shuffle the nodes. If the "timing" strategy is chosen, the nodes 
[*]        are sorted according to the stored timing data. 
[*]        '''  
[*]        # In the case of timing sorting, shuffling ensures that close timings  
[*]        # (ie within the rounding resolution) won't prefer one over another.  
[*]        # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)  
[*]        shuffle(nodes)  
[*]        if self.sorting_method == 'timing':  #配置文件中排序方法为timing时,以时间排序  
[*]            now = time()  
[*]  
[*]            def key_func(node):  
[*]                timing, expires = self.node_timings.get(node['ip'], (-1.0, 0))  
[*]                return timing if expires > now else -1.0  
[*]            nodes.sort(key=key_func)  
[*]        elif self.sorting_method == 'affinity': #配置文件中排序方法为affinity时,以自定义的亲和力规则排序  
[*]            nodes.sort(key=self.read_affinity_sort_key)  
[*]        return nodes  

  
  2.8 set_node_timing(self, node, timing)方法,提供给外部程序调用
  
  2.9    error_limited(self, node)方法,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用
  
 view plaincopy



[*]def error_limited(self, node):  
[*]        """ 
[*]        Check if the node is currently error limited. 
[*] 
[*]        :param node: dictionary of node to check 
[*]        :returns: True if error limited, False otherwise 
[*]        """  
[*]        now = time()  
[*]        if 'errors' not in node:  #errors没在node中时返回false  
[*]            return False  
[*]        if 'last_error' in node and node['last_error'] < \   #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔  
[*]                now - self.error_suppression_interval:  
[*]            del node['last_error'] #node去掉last_error  
[*]            if 'errors' in node: #errors在node中时返回 去掉errors,且返回false  
[*]                del node['errors']  
[*]            return False  
[*]        limited = node['errors'] > self.error_suppression_limit  #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录  
[*]        if limited:  
[*]            self.logger.debug(  
[*]                _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)  
[*]        return limited  

  
  
  2.10  error_limit(self, node, msg)方法,提供给外部程序调用,用于给node直接增加errors到比系统允许的次数+1,并记录last_error时间,和做日志记录
  
  2.11  error_limit(self, node, msg)方法,提供给外部程序调用,用于给node增加errors次数,并记录last_error时间,和做日志记录
  
  2.12  iter_nodes(self, ring, partition, node_iter=None)方法,提供给外部程序调用,用于对nodes做排序后生成的nodes迭代器
  
  2.13  exception_occurred(self, node, typ, additional_info)方法,提供给外部程序调用,用于当node发生异常了,进行日志记录
  2.14  modify_wsgi_pipeline(self, pipe)方法,提供给外部程序调用,用于系统启动时,初始化pipeline,并做日志记录
  
 view plaincopy



[*]def modify_wsgi_pipeline(self, pipe):  
[*]        """ 
[*]        Called during WSGI pipeline creation. Modifies the WSGI pipeline 
[*]        context to ensure that mandatory middleware is present in the pipeline. 
[*] 
[*]        :param pipe: A PipelineWrapper object 
[*]        """  
[*]        pipeline_was_modified = False  
[*]        for filter_spec in reversed(required_filters):  #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理  
[*]            filter_name = filter_spec['name']  
[*]            if filter_name not in pipe:  
[*]                afters = filter_spec.get('after_fn', lambda _junk: [])(pipe)  
[*]                insert_at = 0  
[*]                for after in afters:  
[*]                    try:  
[*]                        insert_at = max(insert_at, pipe.index(after) + 1)  
[*]                    except ValueError:  # not in pipeline; ignore it  
[*]                        pass  
[*]                self.logger.info(  
[*]                    'Adding required filter %s to pipeline at position %d' %  
[*]                    (filter_name, insert_at))  
[*]                ctx = pipe.create_filter(filter_name)  
[*]                pipe.insert_filter(ctx, index=insert_at)  
[*]                pipeline_was_modified = True  
[*]  
[*]        if pipeline_was_modified:    
[*]            self.logger.info("Pipeline was modified. New pipeline is \"%s\".",  
[*]                             pipe)  
[*]        else:  
[*]            self.logger.debug("Pipeline is \"%s\"", pipe)  

  
 
  以下源码为2014、3、12的最新的Proxy的server.py源码,只加了部分代码注释:
  
 view plaincopy



[*]# Copyright (c) 2010-2012 OpenStack Foundation  
[*]#  
[*]# Licensed under the Apache License, Version 2.0 (the "License");  
[*]# you may not use this file except in compliance with the License.  
[*]# You may obtain a copy of the License at  
[*]#  
[*]#    http://www.apache.org/licenses/LICENSE-2.0  
[*]#  
[*]# Unless required by applicable law or agreed to in writing, software  
[*]# distributed under the License is distributed on an "AS IS" BASIS,  
[*]# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or  
[*]# implied.  
[*]# See the License for the specific language governing permissions and  
[*]# limitations under the License.  
[*]  
[*]import mimetypes  
[*]import os  
[*]import socket  
[*]from swift import gettext_ as _  
[*]from random import shuffle  
[*]from time import time  
[*]import itertools  
[*]  
[*]from eventlet import Timeout  
[*]  
[*]from swift import __canonical_version__ as swift_version  
[*]from swift.common import constraints  
[*]from swift.common.ring import Ring  
[*]from swift.common.utils import cache_from_env, get_logger, \  
[*]    get_remote_client, split_path, config_true_value, generate_trans_id, \  
[*]    affinity_key_function, affinity_locality_predicate, list_from_csv, \  
[*]    register_swift_info  
[*]from swift.common.constraints import check_utf8  
[*]from swift.proxy.controllers import AccountController, ObjectController, \  
[*]    ContainerController, InfoController  
[*]from swift.common.swob import HTTPBadRequest, HTTPForbidden, \  
[*]    HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \  
[*]    HTTPServerError, HTTPException, Request  
[*]  
[*]  
[*]# List of entry points for mandatory middlewares.  
[*]#  
[*]# Fields:  
[*]#  
[*]# "name" (required) is the entry point name from setup.py.  
[*]#  
[*]# "after_fn" (optional) a function that takes a PipelineWrapper object as its  
[*]# single argument and returns a list of middlewares that this middleware  
[*]# should come after. Any middlewares in the returned list that are not present  
[*]# in the pipeline will be ignored, so you can safely name optional middlewares  
[*]# to come after. For example, ["catch_errors", "bulk"] would install this  
[*]# middleware after catch_errors and bulk if both were present, but if bulk  
[*]# were absent, would just install it after catch_errors.  
[*]  
[*]required_filters = [  
[*]    {'name': 'catch_errors'},  
[*]    {'name': 'gatekeeper',  
[*]     'after_fn': lambda pipe: (['catch_errors']  
[*]                               if pipe.startswith("catch_errors")  
[*]                               else [])},  
[*]    {'name': 'dlo', 'after_fn': lambda _junk: ['catch_errors', 'gatekeeper',  
[*]                                               'proxy_logging']}]  
[*]  
[*]  
[*]class Application(object):  
[*]    """WSGI application for the proxy server."""  
[*]  
[*]    def __init__(self, conf, memcache=None, logger=None, account_ring=None,  
[*]                 container_ring=None, object_ring=None):  
[*]        if conf is None:  
[*]            conf = {}  
[*]        if logger is None:  
[*]            self.logger = get_logger(conf, log_route='proxy-server')  
[*]        else:  
[*]            self.logger = logger  
[*]  
[*]        swift_dir = conf.get('swift_dir', '/etc/swift')  
[*]        self.node_timeout = int(conf.get('node_timeout', 10))  
[*]        self.recoverable_node_timeout = int(  
[*]            conf.get('recoverable_node_timeout', self.node_timeout))  
[*]        self.conn_timeout = float(conf.get('conn_timeout', 0.5))  
[*]        self.client_timeout = int(conf.get('client_timeout', 60))  
[*]        self.put_queue_depth = int(conf.get('put_queue_depth', 10))  
[*]        self.object_chunk_size = int(conf.get('object_chunk_size', 65536))  
[*]        self.client_chunk_size = int(conf.get('client_chunk_size', 65536))  
[*]        self.trans_id_suffix = conf.get('trans_id_suffix', '')  
[*]        self.post_quorum_timeout = float(conf.get('post_quorum_timeout', 0.5))  
[*]        self.error_suppression_interval = \  
[*]            int(conf.get('error_suppression_interval', 60))  
[*]        self.error_suppression_limit = \  
[*]            int(conf.get('error_suppression_limit', 10))  
[*]        self.recheck_container_existence = \  
[*]            int(conf.get('recheck_container_existence', 60))  
[*]        self.recheck_account_existence = \  
[*]            int(conf.get('recheck_account_existence', 60))  
[*]        self.allow_account_management = \  
[*]            config_true_value(conf.get('allow_account_management', 'no'))  
[*]        self.object_post_as_copy = \  
[*]            config_true_value(conf.get('object_post_as_copy', 'true'))  
[*]        self.object_ring = object_ring or Ring(swift_dir, ring_name='object')  
[*]        self.container_ring = container_ring or Ring(swift_dir,  
[*]                                                     ring_name='container')  
[*]        self.account_ring = account_ring or Ring(swift_dir,  
[*]                                                 ring_name='account')  
[*]        self.memcache = memcache  
[*]        mimetypes.init(mimetypes.knownfiles +  
[*]                       'mime.types')])  
[*]        self.account_autocreate = \  
[*]            config_true_value(conf.get('account_autocreate', 'no'))  
[*]        self.expiring_objects_account = \  
[*]            (conf.get('auto_create_account_prefix') or '.') + \  
[*]            (conf.get('expiring_objects_account_name') or 'expiring_objects')  
[*]        self.expiring_objects_container_divisor = \  
[*]            int(conf.get('expiring_objects_container_divisor') or 86400)  
[*]        self.max_containers_per_account = \  
[*]            int(conf.get('max_containers_per_account') or 0)  
[*]        self.max_containers_whitelist = [  
[*]            a.strip()  
[*]            for a in conf.get('max_containers_whitelist', '').split(',')  
[*]            if a.strip()]  
[*]        self.deny_host_headers = [  
[*]            host.strip() for host in  
[*]            conf.get('deny_host_headers', '').split(',') if host.strip()]  
[*]        self.rate_limit_after_segment = \  
[*]            int(conf.get('rate_limit_after_segment', 10))  
[*]        self.rate_limit_segments_per_sec = \  
[*]            int(conf.get('rate_limit_segments_per_sec', 1))  
[*]        self.log_handoffs = config_true_value(conf.get('log_handoffs', 'true'))  
[*]        self.cors_allow_origin = [  
[*]            a.strip()  
[*]            for a in conf.get('cors_allow_origin', '').split(',')  
[*]            if a.strip()]  
[*]        self.strict_cors_mode = config_true_value(  
[*]            conf.get('strict_cors_mode', 't'))  
[*]        self.node_timings = {}  
[*]        self.timing_expiry = int(conf.get('timing_expiry', 300))  
[*]        self.sorting_method = conf.get('sorting_method', 'shuffle').lower()  
[*]        self.max_large_object_get_time = float(  
[*]            conf.get('max_large_object_get_time', '86400'))  
[*]        value = conf.get('request_node_count', '2 * replicas').lower().split()  
[*]        if len(value) == 1:  
[*]            value = int(value[0])  
[*]            self.request_node_count = lambda replicas: value  
[*]        elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':  
[*]            value = int(value[0])  
[*]            self.request_node_count = lambda replicas: value * replicas  
[*]        else:  
[*]            raise ValueError(  
[*]                'Invalid request_node_count value: %r' % ''.join(value))  
[*]        try:  
[*]            self._read_affinity = read_affinity = conf.get('read_affinity', '')  
[*]            self.read_affinity_sort_key = affinity_key_function(read_affinity)  
[*]        except ValueError as err:  
[*]            # make the message a little more useful  
[*]            raise ValueError("Invalid read_affinity value: %r (%s)" %  
[*]                             (read_affinity, err.message))  
[*]        try:  
[*]            write_affinity = conf.get('write_affinity', '')  
[*]            self.write_affinity_is_local_fn \  
[*]                = affinity_locality_predicate(write_affinity)  
[*]        except ValueError as err:  
[*]            # make the message a little more useful  
[*]            raise ValueError("Invalid write_affinity value: %r (%s)" %  
[*]                             (write_affinity, err.message))  
[*]        value = conf.get('write_affinity_node_count',  
[*]                         '2 * replicas').lower().split()  
[*]        if len(value) == 1:  
[*]            value = int(value[0])  
[*]            self.write_affinity_node_count = lambda replicas: value  
[*]        elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':  
[*]            value = int(value[0])  
[*]            self.write_affinity_node_count = lambda replicas: value * replicas  
[*]        else:  
[*]            raise ValueError(  
[*]                'Invalid write_affinity_node_count value: %r' % ''.join(value))  
[*]        # swift_owner_headers are stripped by the account and container  
[*]        # controllers; we should extend header stripping to object controller  
[*]        # when a privileged object header is implemented.  
[*]        swift_owner_headers = conf.get(  
[*]            'swift_owner_headers',  
[*]            'x-container-read, x-container-write, '  
[*]            'x-container-sync-key, x-container-sync-to, '  
[*]            'x-account-meta-temp-url-key, x-account-meta-temp-url-key-2, '  
[*]            'x-account-access-control')  
[*]        self.swift_owner_headers = [  
[*]            name.strip().title()  
[*]            for name in swift_owner_headers.split(',') if name.strip()]  
[*]        # Initialization was successful, so now apply the client chunk size  
[*]        # parameter as the default read / write buffer size for the network  
[*]        # sockets.  
[*]        #  
[*]        # NOTE WELL: This is a class setting, so until we get set this on a  
[*]        # per-connection basis, this affects reading and writing on ALL  
[*]        # sockets, those between the proxy servers and external clients, and  
[*]        # those between the proxy servers and the other internal servers.  
[*]        #  
[*]        # ** Because it affects the client as well, currently, we use the  
[*]        # client chunk size as the govenor and not the object chunk size.  
[*]        socket._fileobject.default_bufsize = self.client_chunk_size  
[*]        self.expose_info = config_true_value(  
[*]            conf.get('expose_info', 'yes'))  
[*]        self.disallowed_sections = list_from_csv(  
[*]            conf.get('disallowed_sections'))  
[*]        self.admin_key = conf.get('admin_key', None)  
[*]        register_swift_info(  
[*]            version=swift_version,  
[*]            strict_cors_mode=self.strict_cors_mode,  
[*]            **constraints.EFFECTIVE_CONSTRAINTS)  
[*]  
[*]    def check_config(self):  
[*]        """ 
[*]        Check the configuration for possible errors 
[*]        """  
[*]        if self._read_affinity and self.sorting_method != 'affinity':  
[*]            self.logger.warn("sorting_method is set to '%s', not 'affinity'; "  
[*]                             "read_affinity setting will have no effect." %  
[*]                             self.sorting_method)  
[*]  
[*]    def get_controller(self, path):  
[*]        """ 
[*]        Get the controller to handle a request. 
[*] 
[*]        :param path: path from request 
[*]        :returns: tuple of (controller class, path dictionary) 
[*] 
[*]        :raises: ValueError (thrown by split_path) if given invalid path 
[*]        """  
[*]        if path == '/info':  #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典  
[*]            d = dict(version=None,  
[*]                     expose_info=self.expose_info,  
[*]                     disallowed_sections=self.disallowed_sections,  
[*]                     admin_key=self.admin_key)  
[*]            return InfoController, d  
[*]  
[*]        version, account, container, obj = split_path(path, 1, 4, True)  #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量  
[*]        d = dict(version=version,  
[*]                 account_name=account,  
[*]                 container_name=container,  
[*]                 object_name=obj)  
[*]        if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种  
[*]            return ObjectController, d  
[*]        elif container and account:  
[*]            return ContainerController, d  
[*]        elif account and not container and not obj:  
[*]            return AccountController, d  
[*]        return None, d  
[*]  
[*]    def __call__(self, env, start_response):  
[*]        """ 
[*]        WSGI entry point. 
[*]        Wraps env in swob.Request object and passes it down. 
[*] 
[*]        :param env: WSGI environment dictionary 
[*]        :param start_response: WSGI callable 
[*]        """  
[*]        try:  
[*]            if self.memcache is None: #首先判断是否memcache值存在,不存在再去获取一次  
[*]                self.memcache = cache_from_env(env)  
[*]            req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token  
[*]            return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象  
[*]        except UnicodeError:  
[*]            err = HTTPPreconditionFailed(  
[*]                request=req, body='Invalid UTF8 or contains NULL')  
[*]            return err(env, start_response)  
[*]        except (Exception, Timeout):  
[*]            start_response('500 Server Error',  
[*]                           [('Content-Type', 'text/plain')])  
[*]            return ['Internal server error.\n']  
[*]  
[*]    def update_request(self, req):  
[*]        if 'x-storage-token' in req.headers and \  
[*]                'x-auth-token' not in req.headers:  
[*]            req.headers['x-auth-token'] = req.headers['x-storage-token']  
[*]        return req  
[*]  
[*]    def handle_request(self, req):  
[*]        """ 
[*]        Entry point for proxy server. 
[*]        Should return a WSGI-style callable (such as swob.Response). 
[*] 
[*]        :param req: swob.Request object 
[*]        """  
[*]        try:  
[*]            self.logger.set_statsd_prefix('proxy-server') #在日志的开头加上‘proxy-server’,方便跟踪分析  
[*]            if req.content_length and req.content_length < 0: #检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录  
[*]                self.logger.increment('errors')  
[*]                return HTTPBadRequest(request=req,  
[*]                                      body='Invalid Content-Length')  
[*]  
[*]            try:  
[*]                if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录  
[*]                    self.logger.increment('errors')  
[*]                    return HTTPPreconditionFailed(  
[*]                        request=req, body='Invalid UTF8 or contains NULL')  
[*]            except UnicodeError:  
[*]                self.logger.increment('errors')  
[*]                return HTTPPreconditionFailed(  
[*]                    request=req, body='Invalid UTF8 or contains NULL')  
[*]  
[*]            try:  
[*]                controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象  
[*]                p = req.path_info  
[*]                if isinstance(p, unicode):  
[*]                    p = p.encode('utf-8') #path编码Unicode转换utf-8  
[*]            except ValueError:           #发生值异常,返回错误请求,并日志记录  
[*]                self.logger.increment('errors')  
[*]                return HTTPNotFound(request=req)  
[*]            if not controller:         #为找到对应处理的controller类时,返回错误请求,并日志记录  
[*]                self.logger.increment('errors')  
[*]                return HTTPPreconditionFailed(request=req, body='Bad URL')  
[*]            if self.deny_host_headers and \    #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录  
[*]                    req.host.split(':')[0] in self.deny_host_headers:  
[*]                return HTTPForbidden(request=req, body='Invalid host header')  
[*]  
[*]            self.logger.set_statsd_prefix('proxy-server.' +  
[*]                                          controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析  
[*]            controller = controller(self, **path_parts)  #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)  
[*]            if 'swift.trans_id' not in req.environ:    #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID  
[*]                # if this wasn't set by an earlier middleware, set it now  
[*]                trans_id = generate_trans_id(self.trans_id_suffix)  
[*]                req.environ['swift.trans_id'] = trans_id  
[*]                self.logger.txn_id = trans_id  
[*]            req.headers['x-trans-id'] = req.environ['swift.trans_id']  
[*]            controller.trans_id = req.environ['swift.trans_id']  
[*]            self.logger.client_ip = get_remote_client(req)  #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析  
[*]            try:  
[*]                handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)  
[*]                getattr(handler, 'publicly_accessible') #再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)  
[*]            except AttributeError:  
[*]                allowed_methods = getattr(controller, 'allowed_methods', set())  
[*]                return HTTPMethodNotAllowed(  
[*]                    request=req, headers={'Allow': ', '.join(allowed_methods)})  
[*]            if 'swift.authorize' in req.environ: #做鉴权操作  
[*]                # We call authorize before the handler, always. If authorized,  
[*]                # we remove the swift.authorize hook so isn't ever called  
[*]                # again. If not authorized, we return the denial unless the  
[*]                # controller's method indicates it'd like to gather more  
[*]                # information and try again later.  
[*]                resp = req.environ['swift.authorize'](req)  
[*]                if not resp:  
[*]                    # No resp means authorized, no delayed recheck required.  
[*]                    del req.environ['swift.authorize']  
[*]                else:  
[*]                    # Response indicates denial, but we might delay the denial  
[*]                    # and recheck later. If not delayed, return the error now.  
[*]                    if not getattr(handler, 'delay_denial', None):  
[*]                        return resp  
[*]            # Save off original request method (GET, POST, etc.) in case it  
[*]            # gets mutated during handling.  This way logging can display the  
[*]            # method the client actually sent.  
[*]            req.environ['swift.orig_req_method'] = req.method  
[*]            return handler(req)    #调用最终的method方法,并返回resp结果  
[*]        except HTTPException as error_response:  
[*]            return error_response  
[*]        except (Exception, Timeout):  
[*]            self.logger.exception(_('ERROR Unhandled exception in request'))  
[*]            return HTTPServerError(request=req)  
[*]  
[*]    def sort_nodes(self, nodes):  
[*]        ''''' 
[*]        Sorts nodes in-place (and returns the sorted list) according to 
[*]        the configured strategy. The default "sorting" is to randomly 
[*]        shuffle the nodes. If the "timing" strategy is chosen, the nodes 
[*]        are sorted according to the stored timing data. 
[*]        '''  
[*]        # In the case of timing sorting, shuffling ensures that close timings  
[*]        # (ie within the rounding resolution) won't prefer one over another.  
[*]        # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)  
[*]        shuffle(nodes)  
[*]        if self.sorting_method == 'timing':  #配置文件中排序方法为timing时,以时间排序  
[*]            now = time()  
[*]  
[*]            def key_func(node):  
[*]                timing, expires = self.node_timings.get(node['ip'], (-1.0, 0))  
[*]                return timing if expires > now else -1.0  
[*]            nodes.sort(key=key_func)  
[*]        elif self.sorting_method == 'affinity': #配置文件中排序方法为affinity时,以自定义的亲和力规则排序  
[*]            nodes.sort(key=self.read_affinity_sort_key)  
[*]        return nodes  
[*]  
[*]    def set_node_timing(self, node, timing):  
[*]        if self.sorting_method != 'timing':  
[*]            return  
[*]        now = time()  
[*]        timing = round(timing, 3)  # sort timings to the millisecond  
[*]        self.node_timings'ip']] = (timing, now + self.timing_expiry)  
[*]  
[*]    def error_limited(self, node):  
[*]        """ 
[*]        Check if the node is currently error limited. 
[*] 
[*]        :param node: dictionary of node to check 
[*]        :returns: True if error limited, False otherwise 
[*]        """  
[*]        now = time()  
[*]        if 'errors' not in node:  #errors没在node中时返回false  
[*]            return False  
[*]        if 'last_error' in node and node['last_error'] < \   #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔  
[*]                now - self.error_suppression_interval:  
[*]            del node['last_error'] #node去掉last_error  
[*]            if 'errors' in node: #errors在node中时返回 去掉errors,且返回false  
[*]                del node['errors']  
[*]            return False  
[*]        limited = node['errors'] > self.error_suppression_limit  #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录  
[*]        if limited:  
[*]            self.logger.debug(  
[*]                _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)  
[*]        return limited  
[*]  
[*]    def error_limit(self, node, msg):  
[*]        """ 
[*]        Mark a node as error limited. This immediately pretends the 
[*]        node received enough errors to trigger error suppression. Use 
[*]        this for errors like Insufficient Storage. For other errors 
[*]        use :func:`error_occurred`. 
[*] 
[*]        :param node: dictionary of node to error limit 
[*]        :param msg: error message 
[*]        """  
[*]        node['errors'] = self.error_suppression_limit + 1  
[*]        node['last_error'] = time()  
[*]        self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),  
[*]                          {'msg': msg, 'ip': node['ip'],  
[*]                          'port': node['port'], 'device': node['device']})  
[*]  
[*]    def error_occurred(self, node, msg):  
[*]        """ 
[*]        Handle logging, and handling of errors. 
[*] 
[*]        :param node: dictionary of node to handle errors for 
[*]        :param msg: error message 
[*]        """  
[*]        node['errors'] = node.get('errors', 0) + 1  
[*]        node['last_error'] = time()  
[*]        self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),  
[*]                          {'msg': msg, 'ip': node['ip'],  
[*]                          'port': node['port'], 'device': node['device']})  
[*]  
[*]    def iter_nodes(self, ring, partition, node_iter=None):  
[*]        """ 
[*]        Yields nodes for a ring partition, skipping over error 
[*]        limited nodes and stopping at the configurable number of 
[*]        nodes. If a node yielded subsequently gets error limited, an 
[*]        extra node will be yielded to take its place. 
[*] 
[*]        Note that if you're going to iterate over this concurrently from 
[*]        multiple greenthreads, you'll want to use a 
[*]        swift.common.utils.GreenthreadSafeIterator to serialize access. 
[*]        Otherwise, you may get ValueErrors from concurrent access. (You also 
[*]        may not, depending on how logging is configured, the vagaries of 
[*]        socket IO and eventlet, and the phase of the moon.) 
[*] 
[*]        :param ring: ring to get yield nodes from 
[*]        :param partition: ring partition to yield nodes for 
[*]        :param node_iter: optional iterable of nodes to try. Useful if you 
[*]            want to filter or reorder the nodes. 
[*]        """  
[*]        part_nodes = ring.get_part_nodes(partition)  
[*]        if node_iter is None:  
[*]            node_iter = itertools.chain(part_nodes,  
[*]                                        ring.get_more_nodes(partition))  
[*]        num_primary_nodes = len(part_nodes)  
[*]  
[*]        # Use of list() here forcibly yanks the first N nodes (the primary  
[*]        # nodes) from node_iter, so the rest of its values are handoffs.  
[*]        primary_nodes = self.sort_nodes(  
[*]            list(itertools.islice(node_iter, num_primary_nodes)))  
[*]        handoff_nodes = node_iter  
[*]        nodes_left = self.request_node_count(len(primary_nodes))  
[*]  
[*]        for node in primary_nodes:  
[*]            if not self.error_limited(node):  
[*]                yield node  
[*]                if not self.error_limited(node):  
[*]                    nodes_left -= 1  
[*]                    if nodes_left <= 0:  
[*]                        return  
[*]        handoffs = 0  
[*]        for node in handoff_nodes:  
[*]            if not self.error_limited(node):  
[*]                handoffs += 1  
[*]                if self.log_handoffs:  
[*]                    self.logger.increment('handoff_count')  
[*]                    self.logger.warning(  
[*]                        'Handoff requested (%d)' % handoffs)  
[*]                    if handoffs == len(primary_nodes):  
[*]                        self.logger.increment('handoff_all_count')  
[*]                yield node  
[*]                if not self.error_limited(node):  
[*]                    nodes_left -= 1  
[*]                    if nodes_left <= 0:  
[*]                        return  
[*]  
[*]    def exception_occurred(self, node, typ, additional_info):  
[*]        """ 
[*]        Handle logging of generic exceptions. 
[*] 
[*]        :param node: dictionary of node to log the error for 
[*]        :param typ: server type 
[*]        :param additional_info: additional information to log 
[*]        """  
[*]        self.logger.exception(  
[*]            _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '  
[*]              '%(info)s'),  
[*]            {'type': typ, 'ip': node['ip'], 'port': node['port'],  
[*]             'device': node['device'], 'info': additional_info})  
[*]  
[*]    def modify_wsgi_pipeline(self, pipe):  
[*]        """ 
[*]        Called during WSGI pipeline creation. Modifies the WSGI pipeline 
[*]        context to ensure that mandatory middleware is present in the pipeline. 
[*] 
[*]        :param pipe: A PipelineWrapper object 
[*]        """  
[*]        pipeline_was_modified = False  
[*]        for filter_spec in reversed(required_filters):  #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理  
[*]            filter_name = filter_spec['name']  
[*]            if filter_name not in pipe:  
[*]                afters = filter_spec.get('after_fn', lambda _junk: [])(pipe)  
[*]                insert_at = 0  
[*]                for after in afters:  
[*]                    try:  
[*]                        insert_at = max(insert_at, pipe.index(after) + 1)  
[*]                    except ValueError:  # not in pipeline; ignore it  
[*]                        pass  
[*]                self.logger.info(  
[*]                    'Adding required filter %s to pipeline at position %d' %  
[*]                    (filter_name, insert_at))  
[*]                ctx = pipe.create_filter(filter_name)  
[*]                pipe.insert_filter(ctx, index=insert_at)  
[*]                pipeline_was_modified = True  
[*]  
[*]        if pipeline_was_modified:    
[*]            self.logger.info("Pipeline was modified. New pipeline is \"%s\".",  
[*]                             pipe)  
[*]        else:  
[*]            self.logger.debug("Pipeline is \"%s\"", pipe)  
[*]  
[*]  
[*]def app_factory(global_conf, **local_conf):  
[*]    """paste.deploy app factory for creating WSGI proxy apps."""  
[*]    conf = global_conf.copy()  
[*]    conf.update(local_conf)  
[*]    app = Application(conf)  
[*]    app.check_config()  
[*]    return app  
页: [1]
查看完整版本: OpenStack Swift源码初探--proxy下的server.py