python 编程规范
程序模板Python代码
[*]@FileName:
[*]
[*]@Author:xx@ic.net.cn
[*]
[*]@Create date:
[*]
[*]@description:用一行文字概述模块或脚本,用句号结尾。
[*]
[*]@Update date:
[*]
[*]@Vindicator: xx@ic.net.cn
[*]
[*]@File URL: http://idea.icgoo.net/xxxxxxx
[*]
[*]@svn Path: svn://svn.icinfo.net/xxxxxx
[*]
[*]"""
[*]
[*]#add by XXX or modify by XXX
[*]def Function(parameter1 ,parameter2...):
[*]'''''
[*] @description:
[*] @parameter1:
[*] @parameter2:
[*] @...
[*] @return:
[*]'''
基本原则
[*]方便代码的交流和维护.
[*]不影响编码的效率,不与大众习惯冲突.
[*]使代码更美观,阅读更方便.
[*]使代码的逻辑更清晰,更易于理解.
编码
[*] *所有的 Python 脚本文件都应在文件头标上如下标识或其兼容格式的标识
[*] *设置编辑器,默认保存为 utf-8 格式
[*] *不论什么情况使用 UTF-8 吧!这是王道!
# -*- coding:utf-8 -*- 或 #coding=utf-8
命名
[*] 一致的命名可以给开发人员减少许多麻烦,而恰如其分的命名则可以大幅提高代码的可读性,降低维护成本.
[*] Python库的命名约定有点混乱,所以我们将永远不能使之变得完全一致--- 不过还是有公认的命名规范的.
[*] 新的模块和包(包括第三方的框架)必须符合这些标准,但对已有的库存在不同风格的, 保持内部的一致性是首选的
*一些特殊的字符要避免.如小写字母'l','o'
模块名
1. 模块应该是不含下划线的,简短的,小写的名字.
例:module.py
2. 对于包内使用的模块,可以加一个下划线前缀.
例:_internal_module.py
类名
[*] 几乎没有例外,类名总是使用首字母大写单词串(CapWords)的约定.
[*] 不使用下划线连接单词,也不加入 C、T 等前缀
例:
class ThisIsAClass(object):
pass
函数名
[*] 函数名全部小写,由下划线连接各个单词
[*] 类似mixedCase函数名仅被允许用于这种风格已经占优势的上下文(如: threading.py) 以便保持向后兼容.
例:
def this_is_a_func(self):
pass
变量名
[*] 变量名全部小写,由下划线连接各个单词
[*] 不论是类成员变量还是全局变量,均不使用 m 或 g 前缀
[*] 私有类成员使用单一下划线前缀标识,多定义公开成员,少定义私有成员。
[*] 变量名不应带有类型信息,因为 Python 是动态类型语言。如 iValue、names_list、dict_obj 等都是不好的命名。
例:
color = WHITE
this_is_a_variable = 1
常量名
[*] 常量名所有字母大写,由下划线连接各个单词
例:
WHITE = 0xffffffff
THIS_IS_A_CONSTANT = 1
异常名
[*] 如果模块对所有情况定义了单个异常,它通常被叫做"error"或"Error"
[*] 似乎内建(扩展)的模块使用"error"(例如:os.error), 而Python模块通常用"Error" (例如: xdrlib.Error).
[*] 趋势是使用(CapWords)
缩写
1. 命名应当尽量使用全拼写的单词
2. 常用的缩写,如 XML、ID等,在命名时也应只大写首字母
例:class XmlParser(object):pass
3. 命名中含有长单词,对某个单词进行缩写
例:function 缩写为 fn;text 缩写为 txt;object 缩写为 obj等
特殊命名
1. 用下划线作前导或结尾的特殊形式是被公认的
_single_leading_underscore(以一个下划线作前导): 弱的"内部使用(internal use)"标志
例: "from M import *"不会导入以下划线开头的对象
2. single_trailing_underscore_(以一个下划线结尾): 用于避免与Python关键词的冲突
例:"Tkinter.Toplevel(master, class_='ClassName')"
3. __double_leading_underscore(双下划线): 从Python 1.4起为类私有名
4. __double_leading_and_trailing_underscore__: 特殊的(magic) 对象或属性,存在于用户控制的(user-controlled)名字空间
例:__init__, __import__ 或 __file__
缩进
1. 使用制表符还是空格?
-- 永远不要混用制表符和空格.建议使用空格.
-- 我们内部应该都是使用的4个空格的tab.
空行
适当的空行有利于增加代码的可读性,加空行可以参考如下几个准则
1. 在类、函数的定义间加空行
-- 用两行空行分割顶层函数和类的定义,类内方法的定义用单个空行分割.
2. 额外的空行可被用于分割一组相关函数
3. 在 import 不同种类的模块间加空行
4. 在函数中的逻辑段落间加空行,即把相关的代码紧凑写在一起,作为一个逻辑段落,段落间以空行分隔
空格
[*] 空格在 Python 代码中是有意义的,因为 Python 的语法依赖于缩进,在行首的空格称为前导空格.这里不谈这个.
[*] 非前导空格在 Python 代码中没有意义,但适当地加入非前导空格可以增进代码的可读性。
3. 在二元算术、逻辑运算符前后加空格
例:a = b + c(好) a=b+c(不好)
4. “:”用在行尾时前后皆不加空格,如分枝、循环、函数和类定义语言;用在非行尾时两端加空格,如 dict 对象的定义
例:d = {'key' : 'value'}(好) d = {'key':'value'}(不好)
5. 括号(含圆括号、方括号和花括号)前后不加空格
例:do_something(arg1, arg2)(好) do_something( arg1, arg2 )(不好)
6. 逗号后面加一个空格,前面不加空格
例:print x, y(好) print x , y(不好)
7. 函数调用,索引切片,字典取值不要用空格
例:spam(x)(好) spam (x)(不好); list(好) list (不好); dict(好) dict (不好)
断行
*尽管现在的宽屏显示器已经可以单屏显示超过 256 列字符,但本规范仍然坚持行的最大长度不得超过 78 个字符的标准
1. 为长变量名换一个短名
例: this._is.a.very.long.variable_name = this._is.another.long.variable_name (不好)
variable_name = this._is.another.long.variable_name (好)
2. 在括号(包括圆括号、方括号和花括号)内换行
例:
class Edit(Widget):
def __init__(self, parent, width,
font = FONT, color = BLACK, pos = POS, style = 0): # 注意:多一层缩进
pass
如果行长到连第一个括号内的参数都放不下,则每个元素都单独占一行
例:
very_very_very_long_variable_name = ui.widgets.Edit(
panrent,
width,
font,
color,
pos) # 注意:多一层缩进
do_sth_with(very_very_very_long_variable_name)
3. 在长行加入续行符强行断行,断行的位置应在操作符前,且换行后多一个缩进,以使维护人员看代码的时候看到代码行首即可判定这里存在换行
例:
if color == WHITE or color == BLACK \
or color == BLUE: # 注意 or 操作符在新行的行首而不是旧行的行尾,上一行的续行符不可省略
do_something(color);
else:
do_something(DEFAULT_COLOR);
导入
[*] Imports 通常被放置在文件的顶部,仅在模块注释和文档字符串之后,在模块的全局变量和常量之前.Imports应该有顺序地成组安放
[*] 对于内部包的导入是不推荐使用相对导入的.对所有导入都要使用包的绝对路径
[*] import应该按照从最常用到最不常用的顺序分组放置,这几种模块中用空行分隔开来
import标准库
import第三方库
importGoogle App Engine 相关库
importDjango 框架相关库
importSoC framework 相关库
import基于 SoC 框架的模块
import应用程序特有的内容
例:
import a_standard
import b_standard
import a_third_party
import b_third_party
from a_soc import f
from a_soc import g
import a_soc
import b_soc
1.单行导入
例:
import x
import y
import x, y(不好)
*如果是from xx import xx可以一行
例: from types import StringType, ListType
2.从一个包含类的模块中导入类时
例:
from MyClass import MyClass
from foo.bar.YourClass import YourClass
如果发生地名字冲突,可以
import MyClass
import foo.bar.YourClass
然后再按下面使用即可
MyClass.MyClass
foo.bar.YourClass.YourClass
注释
[*] ** 业界普遍认同 Python 的注释分为两种的概念,一种是由 # 开头的“真正的”注释,另一种是 docstrings。
[*] ** 前者表明为何选择当前实现以及这种实现的原理和难点,后者表明如何使用这个包、模块、类、函数(方法),甚至包括使用示例和单元测试。
[*] ** 坚持适当注释原则。对不存在技术难点的代码坚持不注释,对存在技术难点的代码必须注释
[*] ** 推荐对每一个包、模块、类、函数(方法)写 docstrings,除非代码一目了然,非常简单
[*] ** 包、模块、类、函数的第一个语句如果是字符串那么就是一个 __doc__ String。
文件注释
** 每个文件开头都应该包含一个带有版权信息和许可声明的块注释。
Python代码
[*]"""
[*] 用一行文字概述模块或脚本,用句号结尾。
[*] 留一个空行。本 __doc__ string 的其他部分应该包括模块或脚本的全面描述。作为可选项,还可以包括导出的类和函数的简要描述。
[*]
[*] ClassFoo: 一行概述。
[*] functionBar(): 一行概述。
[*]"""
[*]
[*]__authors__ = [
[*] # 请按照姓氏字母顺序排列:
[*] '"John Smith" <johnsmith@example.com>',
[*] '"Joe Paranoid" <joeisgone@example.com>', # 应提供电子邮件地址
[*]]
类注释
** 类应该在描述它的类定义下面放 __doc__ string
Python代码
[*]class SampleClass(object):
[*]
[*] """这里是类的概述。
[*]
[*] 详细的描述信息……
[*] 详细的描述信息……
[*]
[*] Attributes:
[*] likes_spam: 布尔型,表示我们是否喜欢垃圾邮件。
[*] eggs: 整数,数我们下了多少蛋。
[*] """
[*]
[*] def __init__(self, likes_spam=False):
[*] """拿点什么来初始化 SampleClass 。
[*]
[*] Args:
[*] likes_spam: 初始化指标,表示 SampleClass 实例是否喜欢垃圾邮件(默认是 False)。
[*] """
[*] self.likes_spam = likes_spam
[*] self.eggs = 0
[*]
[*] def publicMethod(self):
[*] """执行一些操作。"""
[*] pass
函数注释
** 如果不是用途非常明显而且非常短的话,所有函数和方法都应该有 __doc__ string
** 所有外部能访问的函数和方法,无论有多短、有多简单,都应该有 __doc__ string
Python代码
[*]def fetchRows(table, keys):
[*] """取出表中的多行内容。
[*]
[*] Args:
[*] table: 打开的表。 Table 类的实例。
[*] keys: 字符串序列,表示要从表中取出的行的键值。
[*]
[*] Returns:
[*] 一个字典,映射指定键值与取出的表中对应行的数据:
[*]
[*] {'Serak': ('Rigel VII', 'Preparer'),
[*] 'Zim': ('Irk', 'Invader'),
[*] 'Lrrr': ('Omicron Persei 8', 'Emperor')}
[*]
[*] 如果 keys 参数中的键值没有出现在字典里,就表示对应行在表中没找到。
[*]
[*] Raises:
[*] IOError: 访问 table.Table 对象时发生的错误。
[*] """
[*] pass
单行注释
[*] ** 加注释的最后一个位置是在难以理解的代码里面
[*] ** 如果你打算在下一次代码复查(code review)的时候解释这是什么意思,那你应该现在就把它写成注释。
[*] ** 在开始进行操作之前,就应该给复杂的操作写几行注释
[*] ** 对不直观的代码则应该在行末写注释
# 我们用带权的字典检索来查找 i 在数组中的位置。我们根据数组中最大的数和
# 数组的长度来推断可能的位置,然后做二分法检索找到准确的值。
if i & (i-1) == 0: # 当且仅当 i 是 2 的幂时,值为 true
TODO style
[*] ** 在代码中使用 TODO 注释是临时的、短期的解决方案,或者说是足够好但不够完美的办法
[*] ** TODO 应该包括全部大写的字符串 TODO ,紧接用圆括号括起来的你的用户名
# TODO(someuser): 这里应该用 "*" 来做级联操作。
# TODO(anotheruser) 用 relations 来修改这儿。
赋值
** 对于赋值语句,主要是不要做无谓的对齐
** 原因有两点:一是这种对齐会打乱编程时的注意力,大脑要同时处理两件事(编程和对齐);
二是以后阅读和维护都很困难,因为人眼的横向视野很窄,把三个字段看成一行很困难,而且维护时要增加一个更长的变量名也会破坏对齐
例:
x = 1
y = 1
long_var = 1 (不好)
x = 1
y = 1
long_var = 1 (好)
分支与循环
Python代码
[*]** 不要写成一行
[*] 例:
[*] if not flg: pass (不好)
[*] if not flg:
[*] pass (好)
[*]
[*] for i in xrange(10): print i (不好)
[*] for i in xrange(10):
[*] print i (好)
[*]
[*]** 条件表达式的编写应该足够 pythonic,如以下形式的条件表达式是拙劣的
[*] 例:
[*] if len(alist) != 0: do_something()
[*] if alist != []: do_something()
[*] if s != "": do_something()
[*] if var != None: do_something()
[*] if var != False: do_something()
[*]
[*] 这样写就好了:
[*] if alist: do_something()
[*] if s: do_something()
[*] if var: do_something()
文档
这个东西可以另外FILE来讲了
调试
写道
所有程序,如果需要加入调试输出打印代码。
需要在源程序的 import 包之后,加入一个变量 用大写名称 DEBUG 变量,即 DEBUG=True
后面的打印语句加上 if DEBUG: print 'hello world'
在准备交付生产环境时,提交版本库的时候,必须关闭调试开关,也就是 DEBUG变量修改成 DEBUG=False
例子
Python代码
[*]"""
[*]BaseHTTPServer that implements the Python WSGI protocol (PEP 333, rev 1.21).
[*]
[*]Adapted from wsgiref.simple_server: http://svn.eby-sarna.com/wsgiref/
[*]
[*]This is a simple server for use in testing or debugging Django apps. It hasn't
[*]been reviewed for security issues. Don't use it for production use.
[*]"""
[*]
[*]from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
[*]import os
[*]import re
[*]import socket
[*]import sys
[*]import urllib
[*]import warnings
[*]
[*]from django.core.management.color import color_style
[*]from django.utils.http import http_date
[*]from django.utils._os import safe_join
[*]from django.views import static
[*]
[*]from django.contrib.staticfiles import handlers
[*]
[*]__version__ = "0.1"
[*]__all__ = ['WSGIServer','WSGIRequestHandler']
[*]
[*]server_version = "WSGIServer/" + __version__
[*]sys_version = "Python/" + sys.version.split()
[*]software_version = server_version + ' ' + sys_version
[*]
[*]class WSGIServerException(Exception):
[*] pass
[*]
[*]class FileWrapper(object):
[*] """Wrapper to convert file-like objects to iterables"""
[*]
[*] def __init__(self, filelike, blksize=8192):
[*] self.filelike = filelike
[*] self.blksize = blksize
[*] if hasattr(filelike,'close'):
[*] self.close = filelike.close
[*]
[*] def __getitem__(self,key):
[*] data = self.filelike.read(self.blksize)
[*] if data:
[*] return data
[*] raise IndexError
[*]
[*] def __iter__(self):
[*] return self
[*]
[*] def next(self):
[*] data = self.filelike.read(self.blksize)
[*] if data:
[*] return data
[*] raise StopIteration
[*]
[*]# Regular expression that matches `special' characters in parameters, the
[*]# existence of which force quoting of the parameter value.
[*]tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]')
[*]
[*]def _formatparam(param, value=None, quote=1):
[*] """Convenience function to format and return a key=value pair.
[*]
[*] This will quote the value if needed or if quote is true.
[*] """
[*] if value is not None and len(value) > 0:
[*] if quote or tspecials.search(value):
[*] value = value.replace('\\', '\\\\').replace('"', r'\"')
[*] return '%s="%s"' % (param, value)
[*] else:
[*] return '%s=%s' % (param, value)
[*] else:
[*] return param
[*]
[*]class Headers(object):
[*] """Manage a collection of HTTP response headers"""
[*] def __init__(self,headers):
[*] if not isinstance(headers, list):
[*] raise TypeError("Headers must be a list of name/value tuples")
[*] self._headers = headers
[*]
[*] def __len__(self):
[*] """Return the total number of headers, including duplicates."""
[*] return len(self._headers)
[*]
[*] def __setitem__(self, name, val):
[*] """Set the value of a header."""
[*] del self
[*] self._headers.append((name, val))
[*]
[*] def __delitem__(self,name):
[*] """Delete all occurrences of a header, if present.
[*]
[*] Does *not* raise an exception if the header is missing.
[*] """
[*] name = name.lower()
[*] self._headers[:] = for kv in self._headers if kv.lower()<>name]
[*]
[*] def __getitem__(self,name):
[*] """Get the first header value for 'name'
[*]
[*] Return None if the header is missing instead of raising an exception.
[*]
[*] Note that if the header appeared multiple times, the first exactly which
[*] occurrance gets returned is undefined. Use getall() to get all
[*] the values matching a header field name.
[*] """
[*] return self.get(name)
[*]
[*] def has_key(self, name):
[*] """Return true if the message contains the header."""
[*] return self.get(name) is not None
[*]
[*] __contains__ = has_key
[*]
[*] def get_all(self, name):
[*] """Return a list of all the values for the named field.
[*]
[*] These will be sorted in the order they appeared in the original header
[*] list or were added to this instance, and may contain duplicates. Any
[*] fields deleted and re-inserted are always appended to the header list.
[*] If no fields exist with the given name, returns an empty list.
[*] """
[*] name = name.lower()
[*] return for kv in self._headers if kv.lower()==name]
[*]
[*]
[*] def get(self,name,default=None):
[*] """Get the first header value for 'name', or return 'default'"""
[*] name = name.lower()
[*] for k,v in self._headers:
[*] if k.lower()==name:
[*] return v
[*] return default
[*]
[*] def keys(self):
[*] """Return a list of all the header field names.
[*]
[*] These will be sorted in the order they appeared in the original header
[*] list, or were added to this instance, and may contain duplicates.
[*] Any fields deleted and re-inserted are always appended to the header
[*] list.
[*] """
[*] return for k, v in self._headers]
[*]
[*] def values(self):
[*] """Return a list of all header values.
[*]
[*] These will be sorted in the order they appeared in the original header
[*] list, or were added to this instance, and may contain duplicates.
[*] Any fields deleted and re-inserted are always appended to the header
[*] list.
[*] """
[*] return for k, v in self._headers]
[*]
[*] def items(self):
[*] """Get all the header fields and values.
[*]
[*] These will be sorted in the order they were in the original header
[*] list, or were added to this instance, and may contain duplicates.
[*] Any fields deleted and re-inserted are always appended to the header
[*] list.
[*] """
[*] return self._headers[:]
[*]
[*] def __repr__(self):
[*] return "Headers(%s)" % `self._headers`
[*]
[*] def __str__(self):
[*] """str() returns the formatted headers, complete with end line,
[*] suitable for direct HTTP transmission."""
[*] return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['',''])
[*]
[*] def setdefault(self,name,value):
[*] """Return first matching header value for 'name', or 'value'
[*]
[*] If there is no header named 'name', add a new header with name 'name'
[*] and value 'value'."""
[*] result = self.get(name)
[*] if result is None:
[*] self._headers.append((name,value))
[*] return value
[*] else:
[*] return result
[*]
[*] def add_header(self, _name, _value, **_params):
[*] """Extended header setting.
[*]
[*] _name is the header field to add. keyword arguments can be used to set
[*] additional parameters for the header field, with underscores converted
[*] to dashes. Normally the parameter will be added as key="value" unless
[*] value is None, in which case only the key will be added.
[*]
[*] Example:
[*]
[*] h.add_header('content-disposition', 'attachment', filename='bud.gif')
[*]
[*] Note that unlike the corresponding 'email.Message' method, this does
[*] *not* handle '(charset, language, value)' tuples: all values must be
[*] strings or None.
[*] """
[*] parts = []
[*] if _value is not None:
[*] parts.append(_value)
[*] for k, v in _params.items():
[*] if v is None:
[*] parts.append(k.replace('_', '-'))
[*] else:
[*] parts.append(_formatparam(k.replace('_', '-'), v))
[*] self._headers.append((_name, "; ".join(parts)))
[*]
[*]def guess_scheme(environ):
[*] """Return a guess for whether 'wsgi.url_scheme' should be 'http' or 'https'
[*] """
[*] if environ.get("HTTPS") in ('yes','on','1'):
[*] return 'https'
[*] else:
[*] return 'http'
[*]
[*]_hop_headers = {
[*] 'connection':1, 'keep-alive':1, 'proxy-authenticate':1,
[*] 'proxy-authorization':1, 'te':1, 'trailers':1, 'transfer-encoding':1,
[*] 'upgrade':1
[*]}
[*]
[*]def is_hop_by_hop(header_name):
[*] """Return true if 'header_name' is an HTTP/1.1 "Hop-by-Hop" header"""
[*] return header_name.lower() in _hop_headers
[*]
[*]class ServerHandler(object):
[*] """Manage the invocation of a WSGI application"""
[*]
[*] # Configuration parameters; can override per-subclass or per-instance
[*] wsgi_version = (1,0)
[*] wsgi_multithread = True
[*] wsgi_multiprocess = True
[*] wsgi_run_once = False
[*]
[*] origin_server = True # We are transmitting direct to client
[*] http_version = "1.0" # Version that should be used for response
[*] server_software = software_version
[*]
[*] # os_environ is used to supply configuration from the OS environment:
[*] # by default it's a copy of 'os.environ' as of import time, but you can
[*] # override this in e.g. your __init__ method.
[*] os_environ = dict(os.environ.items())
[*]
[*] # Collaborator classes
[*] wsgi_file_wrapper = FileWrapper # set to None to disable
[*] headers_class = Headers # must be a Headers-like class
[*]
[*] # Error handling (also per-subclass or per-instance)
[*] traceback_limit = None # Print entire traceback to self.get_stderr()
[*] error_status = "500 INTERNAL SERVER ERROR"
[*] error_headers = [('Content-Type','text/plain')]
[*]
[*] # State variables (don't mess with these)
[*] status = result = None
[*] headers_sent = False
[*] headers = None
[*] bytes_sent = 0
[*]
[*] def __init__(self, stdin, stdout, stderr, environ, multithread=True,
[*] multiprocess=False):
[*] self.stdin = stdin
[*] self.stdout = stdout
[*] self.stderr = stderr
[*] self.base_env = environ
[*] self.wsgi_multithread = multithread
[*] self.wsgi_multiprocess = multiprocess
[*]
[*] def run(self, application):
[*] """Invoke the application"""
[*] # Note to self: don't move the close()! Asynchronous servers shouldn't
[*] # call close() from finish_response(), so if you close() anywhere but
[*] # the double-error branch here, you'll break asynchronous servers by
[*] # prematurely closing. Async servers must return from 'run()' without
[*] # closing if there might still be output to iterate over.
[*] try:
[*] self.setup_environ()
[*] self.result = application(self.environ, self.start_response)
[*] self.finish_response()
[*] except:
[*] try:
[*] self.handle_error()
[*] except:
[*] # If we get an error handling an error, just give up already!
[*] self.close()
[*] raise # ...and let the actual server figure it out.
[*]
[*] def setup_environ(self):
[*] """Set up the environment for one request"""
[*]
[*] env = self.environ = self.os_environ.copy()
[*] self.add_cgi_vars()
[*]
[*] env['wsgi.input'] = self.get_stdin()
[*] env['wsgi.errors'] = self.get_stderr()
[*] env['wsgi.version'] = self.wsgi_version
[*] env['wsgi.run_once'] = self.wsgi_run_once
[*] env['wsgi.url_scheme'] = self.get_scheme()
[*] env['wsgi.multithread'] = self.wsgi_multithread
[*] env['wsgi.multiprocess'] = self.wsgi_multiprocess
[*]
[*] if self.wsgi_file_wrapper is not None:
[*] env['wsgi.file_wrapper'] = self.wsgi_file_wrapper
[*]
[*] if self.origin_server and self.server_software:
[*] env.setdefault('SERVER_SOFTWARE',self.server_software)
[*]
[*] def finish_response(self):
[*] """
[*] Send any iterable data, then close self and the iterable
[*]
[*] Subclasses intended for use in asynchronous servers will want to
[*] redefine this method, such that it sets up callbacks in the event loop
[*] to iterate over the data, and to call 'self.close()' once the response
[*] is finished.
[*] """
[*] if not self.result_is_file() or not self.sendfile():
[*] for data in self.result:
[*] self.write(data)
[*] self.finish_content()
[*] self.close()
[*]
[*] def get_scheme(self):
[*] """Return the URL scheme being used"""
[*] return guess_scheme(self.environ)
[*]
[*] def set_content_length(self):
[*] """Compute Content-Length or switch to chunked encoding if possible"""
[*] try:
[*] blocks = len(self.result)
[*] except (TypeError, AttributeError, NotImplementedError):
[*] pass
[*] else:
[*] if blocks==1:
[*] self.headers['Content-Length'] = str(self.bytes_sent)
[*] return
[*] # XXX Try for chunked encoding if origin server and client is 1.1
[*]
[*] def cleanup_headers(self):
[*] """Make any necessary header changes or defaults
[*]
[*] Subclasses can extend this to add other defaults.
[*] """
[*] if 'Content-Length' not in self.headers:
[*] self.set_content_length()
[*]
[*] def start_response(self, status, headers,exc_info=None):
[*] """'start_response()' callable as specified by PEP 333"""
[*]
[*] if exc_info:
[*] try:
[*] if self.headers_sent:
[*] # Re-raise original exception if headers sent
[*] raise exc_info, exc_info, exc_info
[*] finally:
[*] exc_info = None # avoid dangling circular ref
[*] elif self.headers is not None:
[*] raise AssertionError("Headers already set!")
[*]
[*] assert isinstance(status, str),"Status must be a string"
[*] assert len(status)>=4,"Status must be at least 4 characters"
[*] assert int(status[:3]),"Status message must begin w/3-digit code"
[*] assert status==" ", "Status message must have a space after code"
[*] if __debug__:
[*] for name,val in headers:
[*] assert isinstance(name, str),"Header names must be strings"
[*] assert isinstance(val, str),"Header values must be strings"
[*] assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed"
[*] self.status = status
[*] self.headers = self.headers_class(headers)
[*] return self.write
[*]
[*] def send_preamble(self):
[*] """Transmit version/status/date/server, via self._write()"""
[*] if self.origin_server:
[*] if self.client_is_modern():
[*] self._write('HTTP/%s %s\r\n' % (self.http_version,self.status))
[*] if 'Date' not in self.headers:
[*] self._write(
[*] 'Date: %s\r\n' % http_date()
[*] )
[*] if self.server_software and 'Server' not in self.headers:
[*] self._write('Server: %s\r\n' % self.server_software)
[*] else:
[*] self._write('Status: %s\r\n' % self.status)
[*]
[*] def write(self, data):
[*] """'write()' callable as specified by PEP 333"""
[*]
[*] assert isinstance(data, str), "write() argument must be string"
[*]
[*] if not self.status:
[*] raise AssertionError("write() before start_response()")
[*]
[*] elif not self.headers_sent:
[*] # Before the first output, send the stored headers
[*] self.bytes_sent = len(data) # make sure we know content-length
[*] self.send_headers()
[*] else:
[*] self.bytes_sent += len(data)
[*]
[*] # XXX check Content-Length and truncate if too many bytes written?
[*]
[*] # If data is too large, socket will choke, so write chunks no larger
[*] # than 32MB at a time.
[*] length = len(data)
[*] if length > 33554432:
[*] offset = 0
[*] while offset < length:
[*] chunk_size = min(33554432, length)
[*] self._write(data)
[*] self._flush()
[*] offset += chunk_size
[*] else:
[*] self._write(data)
[*] self._flush()
[*]
[*] def sendfile(self):
[*] """Platform-specific file transmission
[*]
[*] Override this method in subclasses to support platform-specific
[*] file transmission. It is only called if the application's
[*] return iterable ('self.result') is an instance of
[*] 'self.wsgi_file_wrapper'.
[*]
[*] This method should return a true value if it was able to actually
[*] transmit the wrapped file-like object using a platform-specific
[*] approach. It should return a false value if normal iteration
[*] should be used instead. An exception can be raised to indicate
[*] that transmission was attempted, but failed.
[*]
[*] NOTE: this method should call 'self.send_headers()' if
[*] 'self.headers_sent' is false and it is going to attempt direct
[*] transmission of the file1.
[*] """
[*] return False # No platform-specific transmission by default
[*]
[*] def finish_content(self):
[*] """Ensure headers and content have both been sent"""
[*] if not self.headers_sent:
[*] self.headers['Content-Length'] = "0"
[*] self.send_headers()
[*] else:
[*] pass # XXX check if content-length was too short?
[*]
[*] def close(self):
[*] try:
[*] self.request_handler.log_request(self.status.split(' ',1), self.bytes_sent)
[*] finally:
[*] try:
[*] if hasattr(self.result,'close'):
[*] self.result.close()
[*] finally:
[*] self.result = self.headers = self.status = self.environ = None
[*] self.bytes_sent = 0; self.headers_sent = False
[*]
[*] def send_headers(self):
[*] """Transmit headers to the client, via self._write()"""
[*] self.cleanup_headers()
[*] self.headers_sent = True
[*] if not self.origin_server or self.client_is_modern():
[*] self.send_preamble()
[*] self._write(str(self.headers))
[*]
[*] def result_is_file(self):
[*] """True if 'self.result' is an instance of 'self.wsgi_file_wrapper'"""
[*] wrapper = self.wsgi_file_wrapper
[*] return wrapper is not None and isinstance(self.result,wrapper)
[*]
[*] def client_is_modern(self):
[*] """True if client can accept status and headers"""
[*] return self.environ['SERVER_PROTOCOL'].upper() != 'HTTP/0.9'
[*]
[*] def log_exception(self,exc_info):
[*] """Log the 'exc_info' tuple in the server log
[*]
[*] Subclasses may override to retarget the output or change its format.
[*] """
[*] try:
[*] from traceback import print_exception
[*] stderr = self.get_stderr()
[*] print_exception(
[*] exc_info, exc_info, exc_info,
[*] self.traceback_limit, stderr
[*] )
[*] stderr.flush()
[*] finally:
[*] exc_info = None
[*]
[*] def handle_error(self):
[*] """Log current error, and send error output to client if possible"""
[*] self.log_exception(sys.exc_info())
[*] if not self.headers_sent:
[*] self.result = self.error_output(self.environ, self.start_response)
[*] self.finish_response()
[*] # XXX else: attempt advanced recovery techniques for HTML or text?
[*]
[*] def error_output(self, environ, start_response):
[*] import traceback
[*] start_response(self.error_status, self.error_headers[:], sys.exc_info())
[*] return ['\n'.join(traceback.format_exception(*sys.exc_info()))]
[*]
[*] # Pure abstract methods; *must* be overridden in subclasses
[*]
[*] def _write(self,data):
[*] self.stdout.write(data)
[*] self._write = self.stdout.write
[*]
[*] def _flush(self):
[*] self.stdout.flush()
[*] self._flush = self.stdout.flush
[*]
[*] def get_stdin(self):
[*] return self.stdin
[*]
[*] def get_stderr(self):
[*] return self.stderr
[*]
[*] def add_cgi_vars(self):
[*] self.environ.update(self.base_env)
[*]
[*]class WSGIServer(HTTPServer):
[*] """BaseHTTPServer that implements the Python WSGI protocol"""
[*] application = None
[*]
[*] def __init__(self, *args, **kwargs):
[*] if kwargs.pop('ipv6', False):
[*] self.address_family = socket.AF_INET6
[*] HTTPServer.__init__(self, *args, **kwargs)
[*]
[*] def server_bind(self):
[*] """Override server_bind to store the server name."""
[*] try:
[*] HTTPServer.server_bind(self)
[*] except Exception, e:
[*] raise WSGIServerException(e)
[*] self.setup_environ()
[*]
[*] def setup_environ(self):
[*] # Set up base environment
[*] env = self.base_environ = {}
[*] env['SERVER_NAME'] = self.server_name
[*] env['GATEWAY_INTERFACE'] = 'CGI/1.1'
[*] env['SERVER_PORT'] = str(self.server_port)
[*] env['REMOTE_HOST']=''
[*] env['CONTENT_LENGTH']=''
[*] env['SCRIPT_NAME'] = ''
[*]
[*] def get_app(self):
[*] return self.application
[*]
[*] def set_app(self,application):
[*] self.application = application
[*]
[*]class WSGIRequestHandler(BaseHTTPRequestHandler):
[*] server_version = "WSGIServer/" + __version__
[*]
[*] def __init__(self, *args, **kwargs):
[*] from django.conf import settings
[*] self.admin_media_prefix = settings.ADMIN_MEDIA_PREFIX
[*] # We set self.path to avoid crashes in log_message() on unsupported
[*] # requests (like "OPTIONS").
[*] self.path = ''
[*] self.style = color_style()
[*] BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
[*]
[*] def get_environ(self):
[*] env = self.server.base_environ.copy()
[*] env['SERVER_PROTOCOL'] = self.request_version
[*] env['REQUEST_METHOD'] = self.command
[*] if '?' in self.path:
[*] path,query = self.path.split('?',1)
[*] else:
[*] path,query = self.path,''
[*]
[*] env['PATH_INFO'] = urllib.unquote(path)
[*] env['QUERY_STRING'] = query
[*] env['REMOTE_ADDR'] = self.client_address
[*]
[*] if self.headers.typeheader is None:
[*] env['CONTENT_TYPE'] = self.headers.type
[*] else:
[*] env['CONTENT_TYPE'] = self.headers.typeheader
[*]
[*] length = self.headers.getheader('content-length')
[*] if length:
[*] env['CONTENT_LENGTH'] = length
[*]
[*] for h in self.headers.headers:
[*] k,v = h.split(':',1)
[*] k=k.replace('-','_').upper(); v=v.strip()
[*] if k in env:
[*] continue # skip content length, type,etc.
[*] if 'HTTP_'+k in env:
[*] env['HTTP_'+k] += ','+v # comma-separate multiple headers
[*] else:
[*] env['HTTP_'+k] = v
[*] return env
[*]
[*] def get_stderr(self):
[*] return sys.stderr
[*]
[*] def handle(self):
[*] """Handle a single HTTP request"""
[*] self.raw_requestline = self.rfile.readline()
[*] if not self.parse_request(): # An error code has been sent, just exit
[*] return
[*] handler = ServerHandler(self.rfile, self.wfile, self.get_stderr(), self.get_environ())
[*] handler.request_handler = self # backpointer for logging
[*] handler.run(self.server.get_app())
[*]
[*] def log_message(self, format, *args):
[*] # Don't bother logging requests for admin images or the favicon.
[*] if self.path.startswith(self.admin_media_prefix) or self.path == '/favicon.ico':
[*] return
[*]
[*] msg = "[%s] %s\n" % (self.log_date_time_string(), format % args)
[*]
[*] # Utilize terminal colors, if available
[*] if args == '2':
[*] # Put 2XX first, since it should be the common case
[*] msg = self.style.HTTP_SUCCESS(msg)
[*] elif args == '1':
[*] msg = self.style.HTTP_INFO(msg)
[*] elif args == '304':
[*] msg = self.style.HTTP_NOT_MODIFIED(msg)
[*] elif args == '3':
[*] msg = self.style.HTTP_REDIRECT(msg)
[*] elif args == '404':
[*] msg = self.style.HTTP_NOT_FOUND(msg)
[*] elif args == '4':
[*] msg = self.style.HTTP_BAD_REQUEST(msg)
[*] else:
[*] # Any 5XX, or any other response
[*] msg = self.style.HTTP_SERVER_ERROR(msg)
[*]
[*] sys.stderr.write(msg)
[*]
[*]
[*]class AdminMediaHandler(handlers.StaticFilesHandler):
[*] """
[*] WSGI middleware that intercepts calls to the admin media directory, as
[*] defined by the ADMIN_MEDIA_PREFIX setting, and serves those images.
[*] Use this ONLY LOCALLY, for development! This hasn't been tested for
[*] security and is not super efficient.
[*]
[*] This is pending for deprecation since 1.3.
[*] """
[*] def get_base_dir(self):
[*] import django
[*] return os.path.join(django.__path__, 'contrib', 'admin', 'media')
[*]
[*] def get_base_url(self):
[*] from django.conf import settings
[*] from django.core.exceptions import ImproperlyConfigured
[*] if not settings.ADMIN_MEDIA_PREFIX:
[*] raise ImproperlyConfigured(
[*] "The ADMIN_MEDIA_PREFIX setting can't be empty "
[*] "when using the AdminMediaHandler, e.g. with runserver.")
[*] return settings.ADMIN_MEDIA_PREFIX
[*]
[*] def file_path(self, url):
[*] """
[*] Returns the path to the media file on disk for the given URL.
[*]
[*] The passed URL is assumed to begin with ``self.base_url``. If the
[*] resulting file path is outside the media directory, then a ValueError
[*] is raised.
[*] """
[*] relative_url = url):]
[*] relative_path = urllib.url2pathname(relative_url)
[*] return safe_join(self.base_dir, relative_path)
[*]
[*] def serve(self, request):
[*] document_root, path = os.path.split(self.file_path(request.path))
[*] return static.serve(request, path, document_root=document_root)
[*]
[*] def _should_handle(self, path):
[*] """
[*] Checks if the path should be handled. Ignores the path if:
[*]
[*] * the host is provided as part of the base_url
[*] * the request's path isn't under the base path
[*] """
[*] return path.startswith(self.base_url) and not self.base_url
[*]
[*]def run(addr, port, wsgi_handler, ipv6=False):
[*] server_address = (addr, port)
[*] httpd = WSGIServer(server_address, WSGIRequestHandler, ipv6=ipv6)
[*] httpd.set_app(wsgi_handler)
[*] httpd.serve_forever()
参考资料
http://blog.csdn.net/lanphaday/article/details/6601123/
http://wiki.woodpecker.org.cn/moin/PythonCodingRule
http://www.elias.cn/Python/PythonStyleGuide
页:
[1]