设为首页 收藏本站
查看: 886|回复: 0

[经验分享] OpenStack Swift源码初探--proxy下的server.py

[复制链接]

尚未签到

发表于 2016-1-9 11:56:00 | 显示全部楼层 |阅读模式
  OpenStack Swift作为开源的云存储工具,被越来越多的公司使用。为了记录和巩固学习swift的开源源码,所以进行一系列的源码开源学习笔记,供初学者快速学习和理解swift的内部功能。
  proxy下面的server.py模块是所有对account,container,object等对象进行管理操作的在swift的proxy端的总入口。在swift系统在接收到url请求后,先是经过middleware处理链分别验证处理后,再进入到proxy下面的server模块,进入 _call_方法调用后,把对应的请求分发给不同的controller处理,controller再调用各自的nodeStroage服务程序进行处理,返回各自的resp结果到server模块,最后通过mimmdleware处理链再反方向返回最终的请求处理结果
  
   1.首先server.py的整个结构如下图:包括4部分:一堆import引用,一个required_filters的字典,一个名为“Application(object)”的class ,一个app_factory(global_conf, **local_conf) 方法
DSC0000.jpg
   2.主要介绍“Application(object)”的class,其中包含了所有主要的功能方法
DSC0001.jpg
  
  2.1 _init_ 方法,Application类的初始化方法,主要就是初始化一些对象,包括:conf配置文件参数 的初始化,log日志初始化,memcache对象初始化,account_ring,container_ring, object_ring对象初始化等
  
  2.2  check_config(self) 方法,主要是检查配置文件proxy-server.conf中配置的“read_affinity” 和“sorting_method”属性值是否正确,该方法在 app_factory(global_conf, **local_conf):方法时调用  
  
  2.3 get_controller(self, path)方法,主要是根据传入的 urlPath解析并返回对应的control类和一个字典对象,其中字典对象的值根据传入url格式的不同返回不同的值
  
[python] view plaincopy



  • def get_controller(self, path):  
  •         """ 
  •         Get the controller to handle a request. 
  •  
  •         :param path: path from request 
  •         :returns: tuple of (controller class, path dictionary) 
  •  
  •         :raises: ValueError (thrown by split_path) if given invalid path 
  •         """  
  •         if path == '/info':  #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典  
  •             d = dict(version=None,  
  •                      expose_info=self.expose_info,  
  •                      disallowed_sections=self.disallowed_sections,  
  •                      admin_key=self.admin_key)  
  •             return InfoController, d  
  •   
  •         version, account, container, obj = split_path(path, 14True)  #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量  
  •         d = dict(version=version,  
  •                  account_name=account,  
  •                  container_name=container,  
  •                  object_name=obj)  
  •         if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种  
  •             return ObjectController, d  
  •         elif container and account:  
  •             return ContainerController, d  
  •         elif account and not container and not obj:  
  •             return AccountController, d  
  •         return None, d  

  
 
  
  2.4  __call__(self, env, start_response)方法,是server模块的实际对account、container、object等对象调用处理的功能入口。
  
[python] view plaincopy



  • def __call__(self, env, start_response):  
  •         """ 
  •         WSGI entry point. 
  •         Wraps env in swob.Request object and passes it down. 
  •  
  •         :param env: WSGI environment dictionary 
  •         :param start_response: WSGI callable 
  •         """  
  •         try:  
  •             if self.memcache is None#首先判断是否memcache值存在,不存在再去获取一次  
  •                 self.memcache = cache_from_env(env)  
  •             req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token  
  •             return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象  
  •         except UnicodeError:  
  •             err = HTTPPreconditionFailed(  
  •                 request=req, body='Invalid UTF8 or contains NULL')  
  •             return err(env, start_response)  
  •         except (Exception, Timeout):  
  •             start_response('500 Server Error',  
  •                            [('Content-Type''text/plain')])  
  •             return ['Internal server error.\n']  

  
2.5 update_request(self, req)方法,根据请求中header里面的x-storage-token有而x-auth-token没有的情况,把x-storage-token的值赋予x-auth-token
  
  
2.6 handle_request(self, req)方法,server模块实际处理request请求的方法,熟悉servlet的同学可以把它理解成servlet的作用
  
[python] view plaincopy



  • def handle_request(self, req):  
  •         """ 
  •         Entry point for proxy server. 
  •         Should return a WSGI-style callable (such as swob.Response). 
  •  
  •         :param req: swob.Request object 
  •         """  
  •         try:  
  •             self.logger.set_statsd_prefix('proxy-server'#在日志的开头加上‘proxy-server’,方便跟踪分析  
  •             if req.content_length and req.content_length < 0#检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录  
  •                 self.logger.increment('errors')  
  •                 return HTTPBadRequest(request=req,  
  •                                       body='Invalid Content-Length')  
  •   
  •             try:  
  •                 if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录  
  •                     self.logger.increment('errors')  
  •                     return HTTPPreconditionFailed(  
  •                         request=req, body='Invalid UTF8 or contains NULL')  
  •             except UnicodeError:  
  •                 self.logger.increment('errors')  
  •                 return HTTPPreconditionFailed(  
  •                     request=req, body='Invalid UTF8 or contains NULL')  
  •   
  •             try:  
  •                 controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象  
  •                 p = req.path_info  
  •                 if isinstance(p, unicode):  
  •                     p = p.encode('utf-8'#path编码Unicode转换utf-8  
  •             except ValueError:           #发生值异常,返回错误请求,并日志记录  
  •                 self.logger.increment('errors')  
  •                 return HTTPNotFound(request=req)  
  •             if not controller:         #为找到对应处理的controller类时,返回错误请求,并日志记录  
  •                 self.logger.increment('errors')  
  •                 return HTTPPreconditionFailed(request=req, body='Bad URL')  
  •             if self.deny_host_headers and \    #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录  
  •                     req.host.split(':')[0in self.deny_host_headers:  
  •                 return HTTPForbidden(request=req, body='Invalid host header')  
  •   
  •             self.logger.set_statsd_prefix('proxy-server.' +  
  •                                           controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析  
  •             controller = controller(self, **path_parts)  #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)  
  •             if 'swift.trans_id' not in req.environ:    #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID  
  •                 # if this wasn't set by an earlier middleware, set it now  
  •                 trans_id = generate_trans_id(self.trans_id_suffix)  
  •                 req.environ['swift.trans_id'] = trans_id  
  •                 self.logger.txn_id = trans_id  
  •             req.headers['x-trans-id'] = req.environ['swift.trans_id']  
  •             controller.trans_id = req.environ['swift.trans_id']  
  •             self.logger.client_ip = get_remote_client(req)  #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析  
  •             try:  
  •                 handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)  
  •                 getattr(handler, 'publicly_accessible'#再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)  
  •             except AttributeError:  
  •                 allowed_methods = getattr(controller, 'allowed_methods', set())  
  •                 return HTTPMethodNotAllowed(  
  •                     request=req, headers={'Allow'', '.join(allowed_methods)})  
  •             if 'swift.authorize' in req.environ: #做鉴权操作  
  •                 # We call authorize before the handler, always. If authorized,  
  •                 # we remove the swift.authorize hook so isn't ever called  
  •                 # again. If not authorized, we return the denial unless the  
  •                 # controller's method indicates it'd like to gather more  
  •                 # information and try again later.  
  •                 resp = req.environ['swift.authorize'](req)  
  •                 if not resp:  
  •                     # No resp means authorized, no delayed recheck required.  
  •                     del req.environ['swift.authorize']  
  •                 else:  
  •                     # Response indicates denial, but we might delay the denial  
  •                     # and recheck later. If not delayed, return the error now.  
  •                     if not getattr(handler, 'delay_denial'None):  
  •                         return resp  
  •             # Save off original request method (GET, POST, etc.) in case it  
  •             # gets mutated during handling.  This way logging can display the  
  •             # method the client actually sent.  
  •             req.environ['swift.orig_req_method'] = req.method  
  •             return handler(req)    #调用最终的method方法,并返回resp结果  
  •         except HTTPException as error_response:  
  •             return error_response  
  •         except (Exception, Timeout):  
  •             self.logger.exception(_('ERROR Unhandled exception in request'))  
  •             return HTTPServerError(request=req)  

  
  2.7 sort_nodes(self, nodes)方法,对nodes对象进行排序处理,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用
  
[python] view plaincopy



  • def sort_nodes(self, nodes):  
  •         ''''' 
  •         Sorts nodes in-place (and returns the sorted list) according to 
  •         the configured strategy. The default "sorting" is to randomly 
  •         shuffle the nodes. If the "timing" strategy is chosen, the nodes 
  •         are sorted according to the stored timing data. 
  •         '''  
  •         # In the case of timing sorting, shuffling ensures that close timings  
  •         # (ie within the rounding resolution) won't prefer one over another.  
  •         # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)  
  •         shuffle(nodes)  
  •         if self.sorting_method == 'timing':  #配置文件中排序方法为timing时,以时间排序  
  •             now = time()  
  •   
  •             def key_func(node):  
  •                 timing, expires = self.node_timings.get(node['ip'], (-1.00))  
  •                 return timing if expires > now else -1.0  
  •             nodes.sort(key=key_func)  
  •         elif self.sorting_method == 'affinity'#配置文件中排序方法为affinity时,以自定义的亲和力规则排序  
  •             nodes.sort(key=self.read_affinity_sort_key)  
  •         return nodes  

  
  2.8 set_node_timing(self, node, timing)方法,提供给外部程序调用
  
  2.9    error_limited(self, node)方法,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用
  
[python] view plaincopy



  • def error_limited(self, node):  
  •         """ 
  •         Check if the node is currently error limited. 
  •  
  •         :param node: dictionary of node to check 
  •         :returns: True if error limited, False otherwise 
  •         """  
  •         now = time()  
  •         if 'errors' not in node:  #errors没在node中时返回false  
  •             return False  
  •         if 'last_error' in node and node['last_error'] < \   #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔  
  •                 now - self.error_suppression_interval:  
  •             del node['last_error'#node去掉last_error  
  •             if 'errors' in node: #errors在node中时返回 去掉errors,且返回false  
  •                 del node['errors']  
  •             return False  
  •         limited = node['errors'] > self.error_suppression_limit  #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录  
  •         if limited:  
  •             self.logger.debug(  
  •                 _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)  
  •         return limited  

  
  
  2.10  error_limit(self, node, msg)方法,提供给外部程序调用,用于给node直接增加errors到比系统允许的次数+1,并记录last_error时间,和做日志记录
  
  2.11  error_limit(self, node, msg)方法,提供给外部程序调用,用于给node增加errors次数,并记录last_error时间,和做日志记录
  
  2.12  iter_nodes(self, ring, partition, node_iter=None)方法,提供给外部程序调用,用于对nodes做排序后生成的nodes迭代器
  
  2.13  exception_occurred(self, node, typ, additional_info)方法,提供给外部程序调用,用于当node发生异常了,进行日志记录
  2.14  modify_wsgi_pipeline(self, pipe)方法,提供给外部程序调用,用于系统启动时,初始化pipeline,并做日志记录
  
[python] view plaincopy



  • def modify_wsgi_pipeline(self, pipe):  
  •         """ 
  •         Called during WSGI pipeline creation. Modifies the WSGI pipeline 
  •         context to ensure that mandatory middleware is present in the pipeline. 
  •  
  •         :param pipe: A PipelineWrapper object 
  •         """  
  •         pipeline_was_modified = False  
  •         for filter_spec in reversed(required_filters):  #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理  
  •             filter_name = filter_spec['name']  
  •             if filter_name not in pipe:  
  •                 afters = filter_spec.get('after_fn'lambda _junk: [])(pipe)  
  •                 insert_at = 0  
  •                 for after in afters:  
  •                     try:  
  •                         insert_at = max(insert_at, pipe.index(after) + 1)  
  •                     except ValueError:  # not in pipeline; ignore it  
  •                         pass  
  •                 self.logger.info(  
  •                     'Adding required filter %s to pipeline at position %d' %  
  •                     (filter_name, insert_at))  
  •                 ctx = pipe.create_filter(filter_name)  
  •                 pipe.insert_filter(ctx, index=insert_at)  
  •                 pipeline_was_modified = True  
  •   
  •         if pipeline_was_modified:    
  •             self.logger.info("Pipeline was modified. New pipeline is \"%s\".",  
  •                              pipe)  
  •         else:  
  •             self.logger.debug("Pipeline is \"%s\"", pipe)  

  
 
  以下源码为2014、3、12的最新的Proxy的server.py源码,只加了部分代码注释:
  
[python] view plaincopy



  • # Copyright (c) 2010-2012 OpenStack Foundation  
  • #  
  • # Licensed under the Apache License, Version 2.0 (the "License");  
  • # you may not use this file except in compliance with the License.  
  • # You may obtain a copy of the License at  
  • #  
  • #    http://www.apache.org/licenses/LICENSE-2.0  
  • #  
  • # Unless required by applicable law or agreed to in writing, software  
  • # distributed under the License is distributed on an "AS IS" BASIS,  
  • # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or  
  • # implied.  
  • # See the License for the specific language governing permissions and  
  • # limitations under the License.  
  •   
  • import mimetypes  
  • import os  
  • import socket  
  • from swift import gettext_ as _  
  • from random import shuffle  
  • from time import time  
  • import itertools  
  •   
  • from eventlet import Timeout  
  •   
  • from swift import __canonical_version__ as swift_version  
  • from swift.common import constraints  
  • from swift.common.ring import Ring  
  • from swift.common.utils import cache_from_env, get_logger, \  
  •     get_remote_client, split_path, config_true_value, generate_trans_id, \  
  •     affinity_key_function, affinity_locality_predicate, list_from_csv, \  
  •     register_swift_info  
  • from swift.common.constraints import check_utf8  
  • from swift.proxy.controllers import AccountController, ObjectController, \  
  •     ContainerController, InfoController  
  • from swift.common.swob import HTTPBadRequest, HTTPForbidden, \  
  •     HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \  
  •     HTTPServerError, HTTPException, Request  
  •   
  •   
  • # List of entry points for mandatory middlewares.  
  • #  
  • # Fields:  
  • #  
  • # "name" (required) is the entry point name from setup.py.  
  • #  
  • # "after_fn" (optional) a function that takes a PipelineWrapper object as its  
  • # single argument and returns a list of middlewares that this middleware  
  • # should come after. Any middlewares in the returned list that are not present  
  • # in the pipeline will be ignored, so you can safely name optional middlewares  
  • # to come after. For example, ["catch_errors", "bulk"] would install this  
  • # middleware after catch_errors and bulk if both were present, but if bulk  
  • # were absent, would just install it after catch_errors.  
  •   
  • required_filters = [  
  •     {'name''catch_errors'},  
  •     {'name''gatekeeper',  
  •      'after_fn'lambda pipe: (['catch_errors']  
  •                                if pipe.startswith("catch_errors")  
  •                                else [])},  
  •     {'name''dlo''after_fn'lambda _junk: ['catch_errors''gatekeeper',  
  •                                                'proxy_logging']}]  
  •   
  •   
  • class Application(object):  
  •     """WSGI application for the proxy server."""  
  •   
  •     def __init__(self, conf, memcache=None, logger=None, account_ring=None,  
  •                  container_ring=None, object_ring=None):  
  •         if conf is None:  
  •             conf = {}  
  •         if logger is None:  
  •             self.logger = get_logger(conf, log_route='proxy-server')  
  •         else:  
  •             self.logger = logger  
  •   
  •         swift_dir = conf.get('swift_dir''/etc/swift')  
  •         self.node_timeout = int(conf.get('node_timeout'10))  
  •         self.recoverable_node_timeout = int(  
  •             conf.get('recoverable_node_timeout'self.node_timeout))  
  •         self.conn_timeout = float(conf.get('conn_timeout'0.5))  
  •         self.client_timeout = int(conf.get('client_timeout'60))  
  •         self.put_queue_depth = int(conf.get('put_queue_depth'10))  
  •         self.object_chunk_size = int(conf.get('object_chunk_size'65536))  
  •         self.client_chunk_size = int(conf.get('client_chunk_size'65536))  
  •         self.trans_id_suffix = conf.get('trans_id_suffix''')  
  •         self.post_quorum_timeout = float(conf.get('post_quorum_timeout'0.5))  
  •         self.error_suppression_interval = \  
  •             int(conf.get('error_suppression_interval'60))  
  •         self.error_suppression_limit = \  
  •             int(conf.get('error_suppression_limit'10))  
  •         self.recheck_container_existence = \  
  •             int(conf.get('recheck_container_existence'60))  
  •         self.recheck_account_existence = \  
  •             int(conf.get('recheck_account_existence'60))  
  •         self.allow_account_management = \  
  •             config_true_value(conf.get('allow_account_management''no'))  
  •         self.object_post_as_copy = \  
  •             config_true_value(conf.get('object_post_as_copy''true'))  
  •         self.object_ring = object_ring or Ring(swift_dir, ring_name='object')  
  •         self.container_ring = container_ring or Ring(swift_dir,  
  •                                                      ring_name='container')  
  •         self.account_ring = account_ring or Ring(swift_dir,  
  •                                                  ring_name='account')  
  •         self.memcache = memcache  
  •         mimetypes.init(mimetypes.knownfiles +  
  •                        [os.path.join(swift_dir, 'mime.types')])  
  •         self.account_autocreate = \  
  •             config_true_value(conf.get('account_autocreate''no'))  
  •         self.expiring_objects_account = \  
  •             (conf.get('auto_create_account_prefix'or '.') + \  
  •             (conf.get('expiring_objects_account_name'or 'expiring_objects')  
  •         self.expiring_objects_container_divisor = \  
  •             int(conf.get('expiring_objects_container_divisor'or 86400)  
  •         self.max_containers_per_account = \  
  •             int(conf.get('max_containers_per_account'or 0)  
  •         self.max_containers_whitelist = [  
  •             a.strip()  
  •             for a in conf.get('max_containers_whitelist''').split(',')  
  •             if a.strip()]  
  •         self.deny_host_headers = [  
  •             host.strip() for host in  
  •             conf.get('deny_host_headers''').split(','if host.strip()]  
  •         self.rate_limit_after_segment = \  
  •             int(conf.get('rate_limit_after_segment'10))  
  •         self.rate_limit_segments_per_sec = \  
  •             int(conf.get('rate_limit_segments_per_sec'1))  
  •         self.log_handoffs = config_true_value(conf.get('log_handoffs''true'))  
  •         self.cors_allow_origin = [  
  •             a.strip()  
  •             for a in conf.get('cors_allow_origin''').split(',')  
  •             if a.strip()]  
  •         self.strict_cors_mode = config_true_value(  
  •             conf.get('strict_cors_mode''t'))  
  •         self.node_timings = {}  
  •         self.timing_expiry = int(conf.get('timing_expiry'300))  
  •         self.sorting_method = conf.get('sorting_method''shuffle').lower()  
  •         self.max_large_object_get_time = float(  
  •             conf.get('max_large_object_get_time''86400'))  
  •         value = conf.get('request_node_count''2 * replicas').lower().split()  
  •         if len(value) == 1:  
  •             value = int(value[0])  
  •             self.request_node_count = lambda replicas: value  
  •         elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':  
  •             value = int(value[0])  
  •             self.request_node_count = lambda replicas: value * replicas  
  •         else:  
  •             raise ValueError(  
  •                 'Invalid request_node_count value: %r' % ''.join(value))  
  •         try:  
  •             self._read_affinity = read_affinity = conf.get('read_affinity''')  
  •             self.read_affinity_sort_key = affinity_key_function(read_affinity)  
  •         except ValueError as err:  
  •             # make the message a little more useful  
  •             raise ValueError("Invalid read_affinity value: %r (%s)" %  
  •                              (read_affinity, err.message))  
  •         try:  
  •             write_affinity = conf.get('write_affinity''')  
  •             self.write_affinity_is_local_fn \  
  •                 = affinity_locality_predicate(write_affinity)  
  •         except ValueError as err:  
  •             # make the message a little more useful  
  •             raise ValueError("Invalid write_affinity value: %r (%s)" %  
  •                              (write_affinity, err.message))  
  •         value = conf.get('write_affinity_node_count',  
  •                          '2 * replicas').lower().split()  
  •         if len(value) == 1:  
  •             value = int(value[0])  
  •             self.write_affinity_node_count = lambda replicas: value  
  •         elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':  
  •             value = int(value[0])  
  •             self.write_affinity_node_count = lambda replicas: value * replicas  
  •         else:  
  •             raise ValueError(  
  •                 'Invalid write_affinity_node_count value: %r' % ''.join(value))  
  •         # swift_owner_headers are stripped by the account and container  
  •         # controllers; we should extend header stripping to object controller  
  •         # when a privileged object header is implemented.  
  •         swift_owner_headers = conf.get(  
  •             'swift_owner_headers',  
  •             'x-container-read, x-container-write, '  
  •             'x-container-sync-key, x-container-sync-to, '  
  •             'x-account-meta-temp-url-key, x-account-meta-temp-url-key-2, '  
  •             'x-account-access-control')  
  •         self.swift_owner_headers = [  
  •             name.strip().title()  
  •             for name in swift_owner_headers.split(','if name.strip()]  
  •         # Initialization was successful, so now apply the client chunk size  
  •         # parameter as the default read / write buffer size for the network  
  •         # sockets.  
  •         #  
  •         # NOTE WELL: This is a class setting, so until we get set this on a  
  •         # per-connection basis, this affects reading and writing on ALL  
  •         # sockets, those between the proxy servers and external clients, and  
  •         # those between the proxy servers and the other internal servers.  
  •         #  
  •         # ** Because it affects the client as well, currently, we use the  
  •         # client chunk size as the govenor and not the object chunk size.  
  •         socket._fileobject.default_bufsize = self.client_chunk_size  
  •         self.expose_info = config_true_value(  
  •             conf.get('expose_info''yes'))  
  •         self.disallowed_sections = list_from_csv(  
  •             conf.get('disallowed_sections'))  
  •         self.admin_key = conf.get('admin_key'None)  
  •         register_swift_info(  
  •             version=swift_version,  
  •             strict_cors_mode=self.strict_cors_mode,  
  •             **constraints.EFFECTIVE_CONSTRAINTS)  
  •   
  •     def check_config(self):  
  •         """ 
  •         Check the configuration for possible errors 
  •         """  
  •         if self._read_affinity and self.sorting_method != 'affinity':  
  •             self.logger.warn("sorting_method is set to '%s', not 'affinity'; "  
  •                              "read_affinity setting will have no effect." %  
  •                              self.sorting_method)  
  •   
  •     def get_controller(self, path):  
  •         """ 
  •         Get the controller to handle a request. 
  •  
  •         :param path: path from request 
  •         :returns: tuple of (controller class, path dictionary) 
  •  
  •         :raises: ValueError (thrown by split_path) if given invalid path 
  •         """  
  •         if path == '/info':  #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典  
  •             d = dict(version=None,  
  •                      expose_info=self.expose_info,  
  •                      disallowed_sections=self.disallowed_sections,  
  •                      admin_key=self.admin_key)  
  •             return InfoController, d  
  •   
  •         version, account, container, obj = split_path(path, 14True)  #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量  
  •         d = dict(version=version,  
  •                  account_name=account,  
  •                  container_name=container,  
  •                  object_name=obj)  
  •         if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种  
  •             return ObjectController, d  
  •         elif container and account:  
  •             return ContainerController, d  
  •         elif account and not container and not obj:  
  •             return AccountController, d  
  •         return None, d  
  •   
  •     def __call__(self, env, start_response):  
  •         """ 
  •         WSGI entry point. 
  •         Wraps env in swob.Request object and passes it down. 
  •  
  •         :param env: WSGI environment dictionary 
  •         :param start_response: WSGI callable 
  •         """  
  •         try:  
  •             if self.memcache is None#首先判断是否memcache值存在,不存在再去获取一次  
  •                 self.memcache = cache_from_env(env)  
  •             req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token  
  •             return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象  
  •         except UnicodeError:  
  •             err = HTTPPreconditionFailed(  
  •                 request=req, body='Invalid UTF8 or contains NULL')  
  •             return err(env, start_response)  
  •         except (Exception, Timeout):  
  •             start_response('500 Server Error',  
  •                            [('Content-Type''text/plain')])  
  •             return ['Internal server error.\n']  
  •   
  •     def update_request(self, req):  
  •         if 'x-storage-token' in req.headers and \  
  •                 'x-auth-token' not in req.headers:  
  •             req.headers['x-auth-token'] = req.headers['x-storage-token']  
  •         return req  
  •   
  •     def handle_request(self, req):  
  •         """ 
  •         Entry point for proxy server. 
  •         Should return a WSGI-style callable (such as swob.Response). 
  •  
  •         :param req: swob.Request object 
  •         """  
  •         try:  
  •             self.logger.set_statsd_prefix('proxy-server'#在日志的开头加上‘proxy-server’,方便跟踪分析  
  •             if req.content_length and req.content_length < 0#检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录  
  •                 self.logger.increment('errors')  
  •                 return HTTPBadRequest(request=req,  
  •                                       body='Invalid Content-Length')  
  •   
  •             try:  
  •                 if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录  
  •                     self.logger.increment('errors')  
  •                     return HTTPPreconditionFailed(  
  •                         request=req, body='Invalid UTF8 or contains NULL')  
  •             except UnicodeError:  
  •                 self.logger.increment('errors')  
  •                 return HTTPPreconditionFailed(  
  •                     request=req, body='Invalid UTF8 or contains NULL')  
  •   
  •             try:  
  •                 controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象  
  •                 p = req.path_info  
  •                 if isinstance(p, unicode):  
  •                     p = p.encode('utf-8'#path编码Unicode转换utf-8  
  •             except ValueError:           #发生值异常,返回错误请求,并日志记录  
  •                 self.logger.increment('errors')  
  •                 return HTTPNotFound(request=req)  
  •             if not controller:         #为找到对应处理的controller类时,返回错误请求,并日志记录  
  •                 self.logger.increment('errors')  
  •                 return HTTPPreconditionFailed(request=req, body='Bad URL')  
  •             if self.deny_host_headers and \    #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录  
  •                     req.host.split(':')[0in self.deny_host_headers:  
  •                 return HTTPForbidden(request=req, body='Invalid host header')  
  •   
  •             self.logger.set_statsd_prefix('proxy-server.' +  
  •                                           controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析  
  •             controller = controller(self, **path_parts)  #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)  
  •             if 'swift.trans_id' not in req.environ:    #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID  
  •                 # if this wasn't set by an earlier middleware, set it now  
  •                 trans_id = generate_trans_id(self.trans_id_suffix)  
  •                 req.environ['swift.trans_id'] = trans_id  
  •                 self.logger.txn_id = trans_id  
  •             req.headers['x-trans-id'] = req.environ['swift.trans_id']  
  •             controller.trans_id = req.environ['swift.trans_id']  
  •             self.logger.client_ip = get_remote_client(req)  #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析  
  •             try:  
  •                 handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)  
  •                 getattr(handler, 'publicly_accessible'#再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)  
  •             except AttributeError:  
  •                 allowed_methods = getattr(controller, 'allowed_methods', set())  
  •                 return HTTPMethodNotAllowed(  
  •                     request=req, headers={'Allow'', '.join(allowed_methods)})  
  •             if 'swift.authorize' in req.environ: #做鉴权操作  
  •                 # We call authorize before the handler, always. If authorized,  
  •                 # we remove the swift.authorize hook so isn't ever called  
  •                 # again. If not authorized, we return the denial unless the  
  •                 # controller's method indicates it'd like to gather more  
  •                 # information and try again later.  
  •                 resp = req.environ['swift.authorize'](req)  
  •                 if not resp:  
  •                     # No resp means authorized, no delayed recheck required.  
  •                     del req.environ['swift.authorize']  
  •                 else:  
  •                     # Response indicates denial, but we might delay the denial  
  •                     # and recheck later. If not delayed, return the error now.  
  •                     if not getattr(handler, 'delay_denial'None):  
  •                         return resp  
  •             # Save off original request method (GET, POST, etc.) in case it  
  •             # gets mutated during handling.  This way logging can display the  
  •             # method the client actually sent.  
  •             req.environ['swift.orig_req_method'] = req.method  
  •             return handler(req)    #调用最终的method方法,并返回resp结果  
  •         except HTTPException as error_response:  
  •             return error_response  
  •         except (Exception, Timeout):  
  •             self.logger.exception(_('ERROR Unhandled exception in request'))  
  •             return HTTPServerError(request=req)  
  •   
  •     def sort_nodes(self, nodes):  
  •         ''''' 
  •         Sorts nodes in-place (and returns the sorted list) according to 
  •         the configured strategy. The default "sorting" is to randomly 
  •         shuffle the nodes. If the "timing" strategy is chosen, the nodes 
  •         are sorted according to the stored timing data. 
  •         '''  
  •         # In the case of timing sorting, shuffling ensures that close timings  
  •         # (ie within the rounding resolution) won't prefer one over another.  
  •         # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)  
  •         shuffle(nodes)  
  •         if self.sorting_method == 'timing':  #配置文件中排序方法为timing时,以时间排序  
  •             now = time()  
  •   
  •             def key_func(node):  
  •                 timing, expires = self.node_timings.get(node['ip'], (-1.00))  
  •                 return timing if expires > now else -1.0  
  •             nodes.sort(key=key_func)  
  •         elif self.sorting_method == 'affinity'#配置文件中排序方法为affinity时,以自定义的亲和力规则排序  
  •             nodes.sort(key=self.read_affinity_sort_key)  
  •         return nodes  
  •   
  •     def set_node_timing(self, node, timing):  
  •         if self.sorting_method != 'timing':  
  •             return  
  •         now = time()  
  •         timing = round(timing, 3)  # sort timings to the millisecond  
  •         self.node_timings[node['ip']] = (timing, now + self.timing_expiry)  
  •   
  •     def error_limited(self, node):  
  •         """ 
  •         Check if the node is currently error limited. 
  •  
  •         :param node: dictionary of node to check 
  •         :returns: True if error limited, False otherwise 
  •         """  
  •         now = time()  
  •         if 'errors' not in node:  #errors没在node中时返回false  
  •             return False  
  •         if 'last_error' in node and node['last_error'] < \   #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔  
  •                 now - self.error_suppression_interval:  
  •             del node['last_error'#node去掉last_error  
  •             if 'errors' in node: #errors在node中时返回 去掉errors,且返回false  
  •                 del node['errors']  
  •             return False  
  •         limited = node['errors'] > self.error_suppression_limit  #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录  
  •         if limited:  
  •             self.logger.debug(  
  •                 _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)  
  •         return limited  
  •   
  •     def error_limit(self, node, msg):  
  •         """ 
  •         Mark a node as error limited. This immediately pretends the 
  •         node received enough errors to trigger error suppression. Use 
  •         this for errors like Insufficient Storage. For other errors 
  •         use :func:`error_occurred`. 
  •  
  •         :param node: dictionary of node to error limit 
  •         :param msg: error message 
  •         """  
  •         node['errors'] = self.error_suppression_limit + 1  
  •         node['last_error'] = time()  
  •         self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),  
  •                           {'msg': msg, 'ip': node['ip'],  
  •                           'port': node['port'], 'device': node['device']})  
  •   
  •     def error_occurred(self, node, msg):  
  •         """ 
  •         Handle logging, and handling of errors. 
  •  
  •         :param node: dictionary of node to handle errors for 
  •         :param msg: error message 
  •         """  
  •         node['errors'] = node.get('errors'0) + 1  
  •         node['last_error'] = time()  
  •         self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),  
  •                           {'msg': msg, 'ip': node['ip'],  
  •                           'port': node['port'], 'device': node['device']})  
  •   
  •     def iter_nodes(self, ring, partition, node_iter=None):  
  •         """ 
  •         Yields nodes for a ring partition, skipping over error 
  •         limited nodes and stopping at the configurable number of 
  •         nodes. If a node yielded subsequently gets error limited, an 
  •         extra node will be yielded to take its place. 
  •  
  •         Note that if you're going to iterate over this concurrently from 
  •         multiple greenthreads, you'll want to use a 
  •         swift.common.utils.GreenthreadSafeIterator to serialize access. 
  •         Otherwise, you may get ValueErrors from concurrent access. (You also 
  •         may not, depending on how logging is configured, the vagaries of 
  •         socket IO and eventlet, and the phase of the moon.) 
  •  
  •         :param ring: ring to get yield nodes from 
  •         :param partition: ring partition to yield nodes for 
  •         :param node_iter: optional iterable of nodes to try. Useful if you 
  •             want to filter or reorder the nodes. 
  •         """  
  •         part_nodes = ring.get_part_nodes(partition)  
  •         if node_iter is None:  
  •             node_iter = itertools.chain(part_nodes,  
  •                                         ring.get_more_nodes(partition))  
  •         num_primary_nodes = len(part_nodes)  
  •   
  •         # Use of list() here forcibly yanks the first N nodes (the primary  
  •         # nodes) from node_iter, so the rest of its values are handoffs.  
  •         primary_nodes = self.sort_nodes(  
  •             list(itertools.islice(node_iter, num_primary_nodes)))  
  •         handoff_nodes = node_iter  
  •         nodes_left = self.request_node_count(len(primary_nodes))  
  •   
  •         for node in primary_nodes:  
  •             if not self.error_limited(node):  
  •                 yield node  
  •                 if not self.error_limited(node):  
  •                     nodes_left -= 1  
  •                     if nodes_left <= 0:  
  •                         return  
  •         handoffs = 0  
  •         for node in handoff_nodes:  
  •             if not self.error_limited(node):  
  •                 handoffs += 1  
  •                 if self.log_handoffs:  
  •                     self.logger.increment('handoff_count')  
  •                     self.logger.warning(  
  •                         'Handoff requested (%d)' % handoffs)  
  •                     if handoffs == len(primary_nodes):  
  •                         self.logger.increment('handoff_all_count')  
  •                 yield node  
  •                 if not self.error_limited(node):  
  •                     nodes_left -= 1  
  •                     if nodes_left <= 0:  
  •                         return  
  •   
  •     def exception_occurred(self, node, typ, additional_info):  
  •         """ 
  •         Handle logging of generic exceptions. 
  •  
  •         :param node: dictionary of node to log the error for 
  •         :param typ: server type 
  •         :param additional_info: additional information to log 
  •         """  
  •         self.logger.exception(  
  •             _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '  
  •               '%(info)s'),  
  •             {'type': typ, 'ip': node['ip'], 'port': node['port'],  
  •              'device': node['device'], 'info': additional_info})  
  •   
  •     def modify_wsgi_pipeline(self, pipe):  
  •         """ 
  •         Called during WSGI pipeline creation. Modifies the WSGI pipeline 
  •         context to ensure that mandatory middleware is present in the pipeline. 
  •  
  •         :param pipe: A PipelineWrapper object 
  •         """  
  •         pipeline_was_modified = False  
  •         for filter_spec in reversed(required_filters):  #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理  
  •             filter_name = filter_spec['name']  
  •             if filter_name not in pipe:  
  •                 afters = filter_spec.get('after_fn'lambda _junk: [])(pipe)  
  •                 insert_at = 0  
  •                 for after in afters:  
  •                     try:  
  •                         insert_at = max(insert_at, pipe.index(after) + 1)  
  •                     except ValueError:  # not in pipeline; ignore it  
  •                         pass  
  •                 self.logger.info(  
  •                     'Adding required filter %s to pipeline at position %d' %  
  •                     (filter_name, insert_at))  
  •                 ctx = pipe.create_filter(filter_name)  
  •                 pipe.insert_filter(ctx, index=insert_at)  
  •                 pipeline_was_modified = True  
  •   
  •         if pipeline_was_modified:    
  •             self.logger.info("Pipeline was modified. New pipeline is \"%s\".",  
  •                              pipe)  
  •         else:  
  •             self.logger.debug("Pipeline is \"%s\"", pipe)  
  •   
  •   
  • def app_factory(global_conf, **local_conf):  
  •     """paste.deploy app factory for creating WSGI proxy apps."""  
  •     conf = global_conf.copy()  
  •     conf.update(local_conf)  
  •     app = Application(conf)  
  •     app.check_config()  
  •     return app  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-162150-1-1.html 上篇帖子: CentOS 6.5 Openstack Havana 多节点安装(Flat网络模式) 下篇帖子: OneStack:Ubuntu 12.04 (或11.10) 一键部署安装OpenStack云计算平台
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表