OpenStack Swift源码初探--proxy下的server.py
OpenStack Swift作为开源的云存储工具,被越来越多的公司使用。为了记录和巩固学习swift的开源源码,所以进行一系列的源码开源学习笔记,供初学者快速学习和理解swift的内部功能。proxy下面的server.py模块是所有对account,container,object等对象进行管理操作的在swift的proxy端的总入口。在swift系统在接收到url请求后,先是经过middleware处理链分别验证处理后,再进入到proxy下面的server模块,进入 _call_方法调用后,把对应的请求分发给不同的controller处理,controller再调用各自的nodeStroage服务程序进行处理,返回各自的resp结果到server模块,最后通过mimmdleware处理链再反方向返回最终的请求处理结果
1.首先server.py的整个结构如下图:包括4部分:一堆import引用,一个required_filters的字典,一个名为“Application(object)”的class ,一个app_factory(global_conf, **local_conf) 方法
2.主要介绍“Application(object)”的class,其中包含了所有主要的功能方法
2.1 _init_ 方法,Application类的初始化方法,主要就是初始化一些对象,包括:conf配置文件参数 的初始化,log日志初始化,memcache对象初始化,account_ring,container_ring, object_ring对象初始化等
2.2 check_config(self) 方法,主要是检查配置文件proxy-server.conf中配置的“read_affinity” 和“sorting_method”属性值是否正确,该方法在 app_factory(global_conf, **local_conf):方法时调用
2.3 get_controller(self, path)方法,主要是根据传入的 urlPath解析并返回对应的control类和一个字典对象,其中字典对象的值根据传入url格式的不同返回不同的值
view plaincopy
[*]def get_controller(self, path):
[*] """
[*] Get the controller to handle a request.
[*]
[*] :param path: path from request
[*] :returns: tuple of (controller class, path dictionary)
[*]
[*] :raises: ValueError (thrown by split_path) if given invalid path
[*] """
[*] if path == '/info': #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典
[*] d = dict(version=None,
[*] expose_info=self.expose_info,
[*] disallowed_sections=self.disallowed_sections,
[*] admin_key=self.admin_key)
[*] return InfoController, d
[*]
[*] version, account, container, obj = split_path(path, 1, 4, True) #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量
[*] d = dict(version=version,
[*] account_name=account,
[*] container_name=container,
[*] object_name=obj)
[*] if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种
[*] return ObjectController, d
[*] elif container and account:
[*] return ContainerController, d
[*] elif account and not container and not obj:
[*] return AccountController, d
[*] return None, d
2.4 __call__(self, env, start_response)方法,是server模块的实际对account、container、object等对象调用处理的功能入口。
view plaincopy
[*]def __call__(self, env, start_response):
[*] """
[*] WSGI entry point.
[*] Wraps env in swob.Request object and passes it down.
[*]
[*] :param env: WSGI environment dictionary
[*] :param start_response: WSGI callable
[*] """
[*] try:
[*] if self.memcache is None: #首先判断是否memcache值存在,不存在再去获取一次
[*] self.memcache = cache_from_env(env)
[*] req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token
[*] return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象
[*] except UnicodeError:
[*] err = HTTPPreconditionFailed(
[*] request=req, body='Invalid UTF8 or contains NULL')
[*] return err(env, start_response)
[*] except (Exception, Timeout):
[*] start_response('500 Server Error',
[*] [('Content-Type', 'text/plain')])
[*] return ['Internal server error.\n']
2.5 update_request(self, req)方法,根据请求中header里面的x-storage-token有而x-auth-token没有的情况,把x-storage-token的值赋予x-auth-token
2.6 handle_request(self, req)方法,server模块实际处理request请求的方法,熟悉servlet的同学可以把它理解成servlet的作用
view plaincopy
[*]def handle_request(self, req):
[*] """
[*] Entry point for proxy server.
[*] Should return a WSGI-style callable (such as swob.Response).
[*]
[*] :param req: swob.Request object
[*] """
[*] try:
[*] self.logger.set_statsd_prefix('proxy-server') #在日志的开头加上‘proxy-server’,方便跟踪分析
[*] if req.content_length and req.content_length < 0: #检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录
[*] self.logger.increment('errors')
[*] return HTTPBadRequest(request=req,
[*] body='Invalid Content-Length')
[*]
[*] try:
[*] if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录
[*] self.logger.increment('errors')
[*] return HTTPPreconditionFailed(
[*] request=req, body='Invalid UTF8 or contains NULL')
[*] except UnicodeError:
[*] self.logger.increment('errors')
[*] return HTTPPreconditionFailed(
[*] request=req, body='Invalid UTF8 or contains NULL')
[*]
[*] try:
[*] controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象
[*] p = req.path_info
[*] if isinstance(p, unicode):
[*] p = p.encode('utf-8') #path编码Unicode转换utf-8
[*] except ValueError: #发生值异常,返回错误请求,并日志记录
[*] self.logger.increment('errors')
[*] return HTTPNotFound(request=req)
[*] if not controller: #为找到对应处理的controller类时,返回错误请求,并日志记录
[*] self.logger.increment('errors')
[*] return HTTPPreconditionFailed(request=req, body='Bad URL')
[*] if self.deny_host_headers and \ #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录
[*] req.host.split(':')[0] in self.deny_host_headers:
[*] return HTTPForbidden(request=req, body='Invalid host header')
[*]
[*] self.logger.set_statsd_prefix('proxy-server.' +
[*] controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析
[*] controller = controller(self, **path_parts) #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)
[*] if 'swift.trans_id' not in req.environ: #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID
[*] # if this wasn't set by an earlier middleware, set it now
[*] trans_id = generate_trans_id(self.trans_id_suffix)
[*] req.environ['swift.trans_id'] = trans_id
[*] self.logger.txn_id = trans_id
[*] req.headers['x-trans-id'] = req.environ['swift.trans_id']
[*] controller.trans_id = req.environ['swift.trans_id']
[*] self.logger.client_ip = get_remote_client(req) #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析
[*] try:
[*] handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)
[*] getattr(handler, 'publicly_accessible') #再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)
[*] except AttributeError:
[*] allowed_methods = getattr(controller, 'allowed_methods', set())
[*] return HTTPMethodNotAllowed(
[*] request=req, headers={'Allow': ', '.join(allowed_methods)})
[*] if 'swift.authorize' in req.environ: #做鉴权操作
[*] # We call authorize before the handler, always. If authorized,
[*] # we remove the swift.authorize hook so isn't ever called
[*] # again. If not authorized, we return the denial unless the
[*] # controller's method indicates it'd like to gather more
[*] # information and try again later.
[*] resp = req.environ['swift.authorize'](req)
[*] if not resp:
[*] # No resp means authorized, no delayed recheck required.
[*] del req.environ['swift.authorize']
[*] else:
[*] # Response indicates denial, but we might delay the denial
[*] # and recheck later. If not delayed, return the error now.
[*] if not getattr(handler, 'delay_denial', None):
[*] return resp
[*] # Save off original request method (GET, POST, etc.) in case it
[*] # gets mutated during handling. This way logging can display the
[*] # method the client actually sent.
[*] req.environ['swift.orig_req_method'] = req.method
[*] return handler(req) #调用最终的method方法,并返回resp结果
[*] except HTTPException as error_response:
[*] return error_response
[*] except (Exception, Timeout):
[*] self.logger.exception(_('ERROR Unhandled exception in request'))
[*] return HTTPServerError(request=req)
2.7 sort_nodes(self, nodes)方法,对nodes对象进行排序处理,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用
view plaincopy
[*]def sort_nodes(self, nodes):
[*] '''''
[*] Sorts nodes in-place (and returns the sorted list) according to
[*] the configured strategy. The default "sorting" is to randomly
[*] shuffle the nodes. If the "timing" strategy is chosen, the nodes
[*] are sorted according to the stored timing data.
[*] '''
[*] # In the case of timing sorting, shuffling ensures that close timings
[*] # (ie within the rounding resolution) won't prefer one over another.
[*] # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)
[*] shuffle(nodes)
[*] if self.sorting_method == 'timing': #配置文件中排序方法为timing时,以时间排序
[*] now = time()
[*]
[*] def key_func(node):
[*] timing, expires = self.node_timings.get(node['ip'], (-1.0, 0))
[*] return timing if expires > now else -1.0
[*] nodes.sort(key=key_func)
[*] elif self.sorting_method == 'affinity': #配置文件中排序方法为affinity时,以自定义的亲和力规则排序
[*] nodes.sort(key=self.read_affinity_sort_key)
[*] return nodes
2.8 set_node_timing(self, node, timing)方法,提供给外部程序调用
2.9 error_limited(self, node)方法,该方法在iter_nodes(self, ring, partition, node_iter=None)中调用
view plaincopy
[*]def error_limited(self, node):
[*] """
[*] Check if the node is currently error limited.
[*]
[*] :param node: dictionary of node to check
[*] :returns: True if error limited, False otherwise
[*] """
[*] now = time()
[*] if 'errors' not in node: #errors没在node中时返回false
[*] return False
[*] if 'last_error' in node and node['last_error'] < \ #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔
[*] now - self.error_suppression_interval:
[*] del node['last_error'] #node去掉last_error
[*] if 'errors' in node: #errors在node中时返回 去掉errors,且返回false
[*] del node['errors']
[*] return False
[*] limited = node['errors'] > self.error_suppression_limit #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录
[*] if limited:
[*] self.logger.debug(
[*] _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
[*] return limited
2.10 error_limit(self, node, msg)方法,提供给外部程序调用,用于给node直接增加errors到比系统允许的次数+1,并记录last_error时间,和做日志记录
2.11 error_limit(self, node, msg)方法,提供给外部程序调用,用于给node增加errors次数,并记录last_error时间,和做日志记录
2.12 iter_nodes(self, ring, partition, node_iter=None)方法,提供给外部程序调用,用于对nodes做排序后生成的nodes迭代器
2.13 exception_occurred(self, node, typ, additional_info)方法,提供给外部程序调用,用于当node发生异常了,进行日志记录
2.14 modify_wsgi_pipeline(self, pipe)方法,提供给外部程序调用,用于系统启动时,初始化pipeline,并做日志记录
view plaincopy
[*]def modify_wsgi_pipeline(self, pipe):
[*] """
[*] Called during WSGI pipeline creation. Modifies the WSGI pipeline
[*] context to ensure that mandatory middleware is present in the pipeline.
[*]
[*] :param pipe: A PipelineWrapper object
[*] """
[*] pipeline_was_modified = False
[*] for filter_spec in reversed(required_filters): #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理
[*] filter_name = filter_spec['name']
[*] if filter_name not in pipe:
[*] afters = filter_spec.get('after_fn', lambda _junk: [])(pipe)
[*] insert_at = 0
[*] for after in afters:
[*] try:
[*] insert_at = max(insert_at, pipe.index(after) + 1)
[*] except ValueError: # not in pipeline; ignore it
[*] pass
[*] self.logger.info(
[*] 'Adding required filter %s to pipeline at position %d' %
[*] (filter_name, insert_at))
[*] ctx = pipe.create_filter(filter_name)
[*] pipe.insert_filter(ctx, index=insert_at)
[*] pipeline_was_modified = True
[*]
[*] if pipeline_was_modified:
[*] self.logger.info("Pipeline was modified. New pipeline is \"%s\".",
[*] pipe)
[*] else:
[*] self.logger.debug("Pipeline is \"%s\"", pipe)
以下源码为2014、3、12的最新的Proxy的server.py源码,只加了部分代码注释:
view plaincopy
[*]# Copyright (c) 2010-2012 OpenStack Foundation
[*]#
[*]# Licensed under the Apache License, Version 2.0 (the "License");
[*]# you may not use this file except in compliance with the License.
[*]# You may obtain a copy of the License at
[*]#
[*]# http://www.apache.org/licenses/LICENSE-2.0
[*]#
[*]# Unless required by applicable law or agreed to in writing, software
[*]# distributed under the License is distributed on an "AS IS" BASIS,
[*]# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
[*]# implied.
[*]# See the License for the specific language governing permissions and
[*]# limitations under the License.
[*]
[*]import mimetypes
[*]import os
[*]import socket
[*]from swift import gettext_ as _
[*]from random import shuffle
[*]from time import time
[*]import itertools
[*]
[*]from eventlet import Timeout
[*]
[*]from swift import __canonical_version__ as swift_version
[*]from swift.common import constraints
[*]from swift.common.ring import Ring
[*]from swift.common.utils import cache_from_env, get_logger, \
[*] get_remote_client, split_path, config_true_value, generate_trans_id, \
[*] affinity_key_function, affinity_locality_predicate, list_from_csv, \
[*] register_swift_info
[*]from swift.common.constraints import check_utf8
[*]from swift.proxy.controllers import AccountController, ObjectController, \
[*] ContainerController, InfoController
[*]from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
[*] HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
[*] HTTPServerError, HTTPException, Request
[*]
[*]
[*]# List of entry points for mandatory middlewares.
[*]#
[*]# Fields:
[*]#
[*]# "name" (required) is the entry point name from setup.py.
[*]#
[*]# "after_fn" (optional) a function that takes a PipelineWrapper object as its
[*]# single argument and returns a list of middlewares that this middleware
[*]# should come after. Any middlewares in the returned list that are not present
[*]# in the pipeline will be ignored, so you can safely name optional middlewares
[*]# to come after. For example, ["catch_errors", "bulk"] would install this
[*]# middleware after catch_errors and bulk if both were present, but if bulk
[*]# were absent, would just install it after catch_errors.
[*]
[*]required_filters = [
[*] {'name': 'catch_errors'},
[*] {'name': 'gatekeeper',
[*] 'after_fn': lambda pipe: (['catch_errors']
[*] if pipe.startswith("catch_errors")
[*] else [])},
[*] {'name': 'dlo', 'after_fn': lambda _junk: ['catch_errors', 'gatekeeper',
[*] 'proxy_logging']}]
[*]
[*]
[*]class Application(object):
[*] """WSGI application for the proxy server."""
[*]
[*] def __init__(self, conf, memcache=None, logger=None, account_ring=None,
[*] container_ring=None, object_ring=None):
[*] if conf is None:
[*] conf = {}
[*] if logger is None:
[*] self.logger = get_logger(conf, log_route='proxy-server')
[*] else:
[*] self.logger = logger
[*]
[*] swift_dir = conf.get('swift_dir', '/etc/swift')
[*] self.node_timeout = int(conf.get('node_timeout', 10))
[*] self.recoverable_node_timeout = int(
[*] conf.get('recoverable_node_timeout', self.node_timeout))
[*] self.conn_timeout = float(conf.get('conn_timeout', 0.5))
[*] self.client_timeout = int(conf.get('client_timeout', 60))
[*] self.put_queue_depth = int(conf.get('put_queue_depth', 10))
[*] self.object_chunk_size = int(conf.get('object_chunk_size', 65536))
[*] self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
[*] self.trans_id_suffix = conf.get('trans_id_suffix', '')
[*] self.post_quorum_timeout = float(conf.get('post_quorum_timeout', 0.5))
[*] self.error_suppression_interval = \
[*] int(conf.get('error_suppression_interval', 60))
[*] self.error_suppression_limit = \
[*] int(conf.get('error_suppression_limit', 10))
[*] self.recheck_container_existence = \
[*] int(conf.get('recheck_container_existence', 60))
[*] self.recheck_account_existence = \
[*] int(conf.get('recheck_account_existence', 60))
[*] self.allow_account_management = \
[*] config_true_value(conf.get('allow_account_management', 'no'))
[*] self.object_post_as_copy = \
[*] config_true_value(conf.get('object_post_as_copy', 'true'))
[*] self.object_ring = object_ring or Ring(swift_dir, ring_name='object')
[*] self.container_ring = container_ring or Ring(swift_dir,
[*] ring_name='container')
[*] self.account_ring = account_ring or Ring(swift_dir,
[*] ring_name='account')
[*] self.memcache = memcache
[*] mimetypes.init(mimetypes.knownfiles +
[*] 'mime.types')])
[*] self.account_autocreate = \
[*] config_true_value(conf.get('account_autocreate', 'no'))
[*] self.expiring_objects_account = \
[*] (conf.get('auto_create_account_prefix') or '.') + \
[*] (conf.get('expiring_objects_account_name') or 'expiring_objects')
[*] self.expiring_objects_container_divisor = \
[*] int(conf.get('expiring_objects_container_divisor') or 86400)
[*] self.max_containers_per_account = \
[*] int(conf.get('max_containers_per_account') or 0)
[*] self.max_containers_whitelist = [
[*] a.strip()
[*] for a in conf.get('max_containers_whitelist', '').split(',')
[*] if a.strip()]
[*] self.deny_host_headers = [
[*] host.strip() for host in
[*] conf.get('deny_host_headers', '').split(',') if host.strip()]
[*] self.rate_limit_after_segment = \
[*] int(conf.get('rate_limit_after_segment', 10))
[*] self.rate_limit_segments_per_sec = \
[*] int(conf.get('rate_limit_segments_per_sec', 1))
[*] self.log_handoffs = config_true_value(conf.get('log_handoffs', 'true'))
[*] self.cors_allow_origin = [
[*] a.strip()
[*] for a in conf.get('cors_allow_origin', '').split(',')
[*] if a.strip()]
[*] self.strict_cors_mode = config_true_value(
[*] conf.get('strict_cors_mode', 't'))
[*] self.node_timings = {}
[*] self.timing_expiry = int(conf.get('timing_expiry', 300))
[*] self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
[*] self.max_large_object_get_time = float(
[*] conf.get('max_large_object_get_time', '86400'))
[*] value = conf.get('request_node_count', '2 * replicas').lower().split()
[*] if len(value) == 1:
[*] value = int(value[0])
[*] self.request_node_count = lambda replicas: value
[*] elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
[*] value = int(value[0])
[*] self.request_node_count = lambda replicas: value * replicas
[*] else:
[*] raise ValueError(
[*] 'Invalid request_node_count value: %r' % ''.join(value))
[*] try:
[*] self._read_affinity = read_affinity = conf.get('read_affinity', '')
[*] self.read_affinity_sort_key = affinity_key_function(read_affinity)
[*] except ValueError as err:
[*] # make the message a little more useful
[*] raise ValueError("Invalid read_affinity value: %r (%s)" %
[*] (read_affinity, err.message))
[*] try:
[*] write_affinity = conf.get('write_affinity', '')
[*] self.write_affinity_is_local_fn \
[*] = affinity_locality_predicate(write_affinity)
[*] except ValueError as err:
[*] # make the message a little more useful
[*] raise ValueError("Invalid write_affinity value: %r (%s)" %
[*] (write_affinity, err.message))
[*] value = conf.get('write_affinity_node_count',
[*] '2 * replicas').lower().split()
[*] if len(value) == 1:
[*] value = int(value[0])
[*] self.write_affinity_node_count = lambda replicas: value
[*] elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
[*] value = int(value[0])
[*] self.write_affinity_node_count = lambda replicas: value * replicas
[*] else:
[*] raise ValueError(
[*] 'Invalid write_affinity_node_count value: %r' % ''.join(value))
[*] # swift_owner_headers are stripped by the account and container
[*] # controllers; we should extend header stripping to object controller
[*] # when a privileged object header is implemented.
[*] swift_owner_headers = conf.get(
[*] 'swift_owner_headers',
[*] 'x-container-read, x-container-write, '
[*] 'x-container-sync-key, x-container-sync-to, '
[*] 'x-account-meta-temp-url-key, x-account-meta-temp-url-key-2, '
[*] 'x-account-access-control')
[*] self.swift_owner_headers = [
[*] name.strip().title()
[*] for name in swift_owner_headers.split(',') if name.strip()]
[*] # Initialization was successful, so now apply the client chunk size
[*] # parameter as the default read / write buffer size for the network
[*] # sockets.
[*] #
[*] # NOTE WELL: This is a class setting, so until we get set this on a
[*] # per-connection basis, this affects reading and writing on ALL
[*] # sockets, those between the proxy servers and external clients, and
[*] # those between the proxy servers and the other internal servers.
[*] #
[*] # ** Because it affects the client as well, currently, we use the
[*] # client chunk size as the govenor and not the object chunk size.
[*] socket._fileobject.default_bufsize = self.client_chunk_size
[*] self.expose_info = config_true_value(
[*] conf.get('expose_info', 'yes'))
[*] self.disallowed_sections = list_from_csv(
[*] conf.get('disallowed_sections'))
[*] self.admin_key = conf.get('admin_key', None)
[*] register_swift_info(
[*] version=swift_version,
[*] strict_cors_mode=self.strict_cors_mode,
[*] **constraints.EFFECTIVE_CONSTRAINTS)
[*]
[*] def check_config(self):
[*] """
[*] Check the configuration for possible errors
[*] """
[*] if self._read_affinity and self.sorting_method != 'affinity':
[*] self.logger.warn("sorting_method is set to '%s', not 'affinity'; "
[*] "read_affinity setting will have no effect." %
[*] self.sorting_method)
[*]
[*] def get_controller(self, path):
[*] """
[*] Get the controller to handle a request.
[*]
[*] :param path: path from request
[*] :returns: tuple of (controller class, path dictionary)
[*]
[*] :raises: ValueError (thrown by split_path) if given invalid path
[*] """
[*] if path == '/info': #url是/info 则返回InfoController和包括version,expose_info,disallowed_sections,admin_key的字典
[*] d = dict(version=None,
[*] expose_info=self.expose_info,
[*] disallowed_sections=self.disallowed_sections,
[*] admin_key=self.admin_key)
[*] return InfoController, d
[*]
[*] version, account, container, obj = split_path(path, 1, 4, True) #以/拆分url为数列,并取对应的1到4位的数据返回给对应的变量
[*] d = dict(version=version,
[*] account_name=account,
[*] container_name=container,
[*] object_name=obj)
[*] if obj and container and account: #根据解析出的account值,congtainer和object值得有无,确定适用的Controller是那种
[*] return ObjectController, d
[*] elif container and account:
[*] return ContainerController, d
[*] elif account and not container and not obj:
[*] return AccountController, d
[*] return None, d
[*]
[*] def __call__(self, env, start_response):
[*] """
[*] WSGI entry point.
[*] Wraps env in swob.Request object and passes it down.
[*]
[*] :param env: WSGI environment dictionary
[*] :param start_response: WSGI callable
[*] """
[*] try:
[*] if self.memcache is None: #首先判断是否memcache值存在,不存在再去获取一次
[*] self.memcache = cache_from_env(env)
[*] req = self.update_request(Request(env)) #判断header中是否有x-storage-token和x-auth-token
[*] return self.handle_request(req)(env, start_response) #调用handle_request方法并返回处理的结果resp对象
[*] except UnicodeError:
[*] err = HTTPPreconditionFailed(
[*] request=req, body='Invalid UTF8 or contains NULL')
[*] return err(env, start_response)
[*] except (Exception, Timeout):
[*] start_response('500 Server Error',
[*] [('Content-Type', 'text/plain')])
[*] return ['Internal server error.\n']
[*]
[*] def update_request(self, req):
[*] if 'x-storage-token' in req.headers and \
[*] 'x-auth-token' not in req.headers:
[*] req.headers['x-auth-token'] = req.headers['x-storage-token']
[*] return req
[*]
[*] def handle_request(self, req):
[*] """
[*] Entry point for proxy server.
[*] Should return a WSGI-style callable (such as swob.Response).
[*]
[*] :param req: swob.Request object
[*] """
[*] try:
[*] self.logger.set_statsd_prefix('proxy-server') #在日志的开头加上‘proxy-server’,方便跟踪分析
[*] if req.content_length and req.content_length < 0: #检查header里面中的Content-Length是否有值,无值返回错误请求,并日志记录
[*] self.logger.increment('errors')
[*] return HTTPBadRequest(request=req,
[*] body='Invalid Content-Length')
[*]
[*] try:
[*] if not check_utf8(req.path_info): #检查Pathde的编码是否不满足utf8,不满足返回错误请求,并日志记录
[*] self.logger.increment('errors')
[*] return HTTPPreconditionFailed(
[*] request=req, body='Invalid UTF8 or contains NULL')
[*] except UnicodeError:
[*] self.logger.increment('errors')
[*] return HTTPPreconditionFailed(
[*] request=req, body='Invalid UTF8 or contains NULL')
[*]
[*] try:
[*] controller, path_parts = self.get_controller(req.path) #调用get_controller(self,path)方法返回正确的controller类和字典对象
[*] p = req.path_info
[*] if isinstance(p, unicode):
[*] p = p.encode('utf-8') #path编码Unicode转换utf-8
[*] except ValueError: #发生值异常,返回错误请求,并日志记录
[*] self.logger.increment('errors')
[*] return HTTPNotFound(request=req)
[*] if not controller: #为找到对应处理的controller类时,返回错误请求,并日志记录
[*] self.logger.increment('errors')
[*] return HTTPPreconditionFailed(request=req, body='Bad URL')
[*] if self.deny_host_headers and \ #当proxy-server.conf中deny_host_headers有值,且请求的header中的host在deny_host_headers中,则返回错误请求,并日志记录
[*] req.host.split(':')[0] in self.deny_host_headers:
[*] return HTTPForbidden(request=req, body='Invalid host header')
[*]
[*] self.logger.set_statsd_prefix('proxy-server.' +
[*] controller.server_type.lower()) #在日志的开头加上‘proxy-server.controller类中的请求类型(eg:HEAD/GET/PUT)’,方便跟踪分析
[*] controller = controller(self, **path_parts) #初始化实际的controller对象(AccountController、ContainerController、ObjectController、InfoController其中之一)
[*] if 'swift.trans_id' not in req.environ: #如果没有trans_id在env中,则重新生成一个,有些类似于http请求中的seesionID的感觉,是一种UUID
[*] # if this wasn't set by an earlier middleware, set it now
[*] trans_id = generate_trans_id(self.trans_id_suffix)
[*] req.environ['swift.trans_id'] = trans_id
[*] self.logger.txn_id = trans_id
[*] req.headers['x-trans-id'] = req.environ['swift.trans_id']
[*] controller.trans_id = req.environ['swift.trans_id']
[*] self.logger.client_ip = get_remote_client(req) #把请求中获取出请求端的IP信息,加入logger对象,方便后续日志查看分析
[*] try:
[*] handler = getattr(controller, req.method) #根据req.method方法获取对应controller对象中的方法(可能是多个,有的有public标签,有的没有)
[*] getattr(handler, 'publicly_accessible') #再根据public标签获取最终的处理方法。(在方法前面可以加 @public 和@delay_denial)
[*] except AttributeError:
[*] allowed_methods = getattr(controller, 'allowed_methods', set())
[*] return HTTPMethodNotAllowed(
[*] request=req, headers={'Allow': ', '.join(allowed_methods)})
[*] if 'swift.authorize' in req.environ: #做鉴权操作
[*] # We call authorize before the handler, always. If authorized,
[*] # we remove the swift.authorize hook so isn't ever called
[*] # again. If not authorized, we return the denial unless the
[*] # controller's method indicates it'd like to gather more
[*] # information and try again later.
[*] resp = req.environ['swift.authorize'](req)
[*] if not resp:
[*] # No resp means authorized, no delayed recheck required.
[*] del req.environ['swift.authorize']
[*] else:
[*] # Response indicates denial, but we might delay the denial
[*] # and recheck later. If not delayed, return the error now.
[*] if not getattr(handler, 'delay_denial', None):
[*] return resp
[*] # Save off original request method (GET, POST, etc.) in case it
[*] # gets mutated during handling. This way logging can display the
[*] # method the client actually sent.
[*] req.environ['swift.orig_req_method'] = req.method
[*] return handler(req) #调用最终的method方法,并返回resp结果
[*] except HTTPException as error_response:
[*] return error_response
[*] except (Exception, Timeout):
[*] self.logger.exception(_('ERROR Unhandled exception in request'))
[*] return HTTPServerError(request=req)
[*]
[*] def sort_nodes(self, nodes):
[*] '''''
[*] Sorts nodes in-place (and returns the sorted list) according to
[*] the configured strategy. The default "sorting" is to randomly
[*] shuffle the nodes. If the "timing" strategy is chosen, the nodes
[*] are sorted according to the stored timing data.
[*] '''
[*] # In the case of timing sorting, shuffling ensures that close timings
[*] # (ie within the rounding resolution) won't prefer one over another.
[*] # Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)
[*] shuffle(nodes)
[*] if self.sorting_method == 'timing': #配置文件中排序方法为timing时,以时间排序
[*] now = time()
[*]
[*] def key_func(node):
[*] timing, expires = self.node_timings.get(node['ip'], (-1.0, 0))
[*] return timing if expires > now else -1.0
[*] nodes.sort(key=key_func)
[*] elif self.sorting_method == 'affinity': #配置文件中排序方法为affinity时,以自定义的亲和力规则排序
[*] nodes.sort(key=self.read_affinity_sort_key)
[*] return nodes
[*]
[*] def set_node_timing(self, node, timing):
[*] if self.sorting_method != 'timing':
[*] return
[*] now = time()
[*] timing = round(timing, 3) # sort timings to the millisecond
[*] self.node_timings'ip']] = (timing, now + self.timing_expiry)
[*]
[*] def error_limited(self, node):
[*] """
[*] Check if the node is currently error limited.
[*]
[*] :param node: dictionary of node to check
[*] :returns: True if error limited, False otherwise
[*] """
[*] now = time()
[*] if 'errors' not in node: #errors没在node中时返回false
[*] return False
[*] if 'last_error' in node and node['last_error'] < \ #last_error在node中有并且 last_error小于现在时间减去系统允许的时间间隔
[*] now - self.error_suppression_interval:
[*] del node['last_error'] #node去掉last_error
[*] if 'errors' in node: #errors在node中时返回 去掉errors,且返回false
[*] del node['errors']
[*] return False
[*] limited = node['errors'] > self.error_suppression_limit #errors在node中中的个数多与系统允许的个数,是返回true,且做日志记录
[*] if limited:
[*] self.logger.debug(
[*] _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
[*] return limited
[*]
[*] def error_limit(self, node, msg):
[*] """
[*] Mark a node as error limited. This immediately pretends the
[*] node received enough errors to trigger error suppression. Use
[*] this for errors like Insufficient Storage. For other errors
[*] use :func:`error_occurred`.
[*]
[*] :param node: dictionary of node to error limit
[*] :param msg: error message
[*] """
[*] node['errors'] = self.error_suppression_limit + 1
[*] node['last_error'] = time()
[*] self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
[*] {'msg': msg, 'ip': node['ip'],
[*] 'port': node['port'], 'device': node['device']})
[*]
[*] def error_occurred(self, node, msg):
[*] """
[*] Handle logging, and handling of errors.
[*]
[*] :param node: dictionary of node to handle errors for
[*] :param msg: error message
[*] """
[*] node['errors'] = node.get('errors', 0) + 1
[*] node['last_error'] = time()
[*] self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
[*] {'msg': msg, 'ip': node['ip'],
[*] 'port': node['port'], 'device': node['device']})
[*]
[*] def iter_nodes(self, ring, partition, node_iter=None):
[*] """
[*] Yields nodes for a ring partition, skipping over error
[*] limited nodes and stopping at the configurable number of
[*] nodes. If a node yielded subsequently gets error limited, an
[*] extra node will be yielded to take its place.
[*]
[*] Note that if you're going to iterate over this concurrently from
[*] multiple greenthreads, you'll want to use a
[*] swift.common.utils.GreenthreadSafeIterator to serialize access.
[*] Otherwise, you may get ValueErrors from concurrent access. (You also
[*] may not, depending on how logging is configured, the vagaries of
[*] socket IO and eventlet, and the phase of the moon.)
[*]
[*] :param ring: ring to get yield nodes from
[*] :param partition: ring partition to yield nodes for
[*] :param node_iter: optional iterable of nodes to try. Useful if you
[*] want to filter or reorder the nodes.
[*] """
[*] part_nodes = ring.get_part_nodes(partition)
[*] if node_iter is None:
[*] node_iter = itertools.chain(part_nodes,
[*] ring.get_more_nodes(partition))
[*] num_primary_nodes = len(part_nodes)
[*]
[*] # Use of list() here forcibly yanks the first N nodes (the primary
[*] # nodes) from node_iter, so the rest of its values are handoffs.
[*] primary_nodes = self.sort_nodes(
[*] list(itertools.islice(node_iter, num_primary_nodes)))
[*] handoff_nodes = node_iter
[*] nodes_left = self.request_node_count(len(primary_nodes))
[*]
[*] for node in primary_nodes:
[*] if not self.error_limited(node):
[*] yield node
[*] if not self.error_limited(node):
[*] nodes_left -= 1
[*] if nodes_left <= 0:
[*] return
[*] handoffs = 0
[*] for node in handoff_nodes:
[*] if not self.error_limited(node):
[*] handoffs += 1
[*] if self.log_handoffs:
[*] self.logger.increment('handoff_count')
[*] self.logger.warning(
[*] 'Handoff requested (%d)' % handoffs)
[*] if handoffs == len(primary_nodes):
[*] self.logger.increment('handoff_all_count')
[*] yield node
[*] if not self.error_limited(node):
[*] nodes_left -= 1
[*] if nodes_left <= 0:
[*] return
[*]
[*] def exception_occurred(self, node, typ, additional_info):
[*] """
[*] Handle logging of generic exceptions.
[*]
[*] :param node: dictionary of node to log the error for
[*] :param typ: server type
[*] :param additional_info: additional information to log
[*] """
[*] self.logger.exception(
[*] _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
[*] '%(info)s'),
[*] {'type': typ, 'ip': node['ip'], 'port': node['port'],
[*] 'device': node['device'], 'info': additional_info})
[*]
[*] def modify_wsgi_pipeline(self, pipe):
[*] """
[*] Called during WSGI pipeline creation. Modifies the WSGI pipeline
[*] context to ensure that mandatory middleware is present in the pipeline.
[*]
[*] :param pipe: A PipelineWrapper object
[*] """
[*] pipeline_was_modified = False
[*] for filter_spec in reversed(required_filters): #当required_filters字典中定义了需要重新排序的app时,进行pipeline的重新排序处理
[*] filter_name = filter_spec['name']
[*] if filter_name not in pipe:
[*] afters = filter_spec.get('after_fn', lambda _junk: [])(pipe)
[*] insert_at = 0
[*] for after in afters:
[*] try:
[*] insert_at = max(insert_at, pipe.index(after) + 1)
[*] except ValueError: # not in pipeline; ignore it
[*] pass
[*] self.logger.info(
[*] 'Adding required filter %s to pipeline at position %d' %
[*] (filter_name, insert_at))
[*] ctx = pipe.create_filter(filter_name)
[*] pipe.insert_filter(ctx, index=insert_at)
[*] pipeline_was_modified = True
[*]
[*] if pipeline_was_modified:
[*] self.logger.info("Pipeline was modified. New pipeline is \"%s\".",
[*] pipe)
[*] else:
[*] self.logger.debug("Pipeline is \"%s\"", pipe)
[*]
[*]
[*]def app_factory(global_conf, **local_conf):
[*] """paste.deploy app factory for creating WSGI proxy apps."""
[*] conf = global_conf.copy()
[*] conf.update(local_conf)
[*] app = Application(conf)
[*] app.check_config()
[*] return app
页:
[1]