设为首页 收藏本站
查看: 814|回复: 0

[经验分享] python做一个http接口测试框架

[复制链接]

尚未签到

发表于 2015-11-30 07:39:38 | 显示全部楼层 |阅读模式
  目录结构
  project
  |
  case#测试用例
  |
  suite#测试目录
  |
  logs#测试日志
  |
  papi#测试类
  |
  result#测试结果
  |
  setting.py#配置文件
  1、日志类,用于测试时日志记录



pyapilog.py


1 # -*-coding:utf-8 -*-
2 # !/usr/bin/python
3 __author__ = 'dongjie'
4 __data__ = '2015-05-20'
5
6 import logging
7 import datetime
8 import os
9 import setting
10 logLevel = {
11    1 : logging.NOTSET,
12    2 : logging.DEBUG,
13    3 : logging.INFO,
14    4 : logging.WARNING,
15    5 : logging.ERROR,
16    6 : logging.CRITICAL
17 }
18 setFile = os.path.join(setting.root_dir, 'setting.ini')
19 loggers = {}
20
21
22 #  定义日志方法,从配置文件读取日志等级,且定义日志输出路径
23 def pyapilog(**kwargs):
24     global loggers
25     log_level = setting.logLevel
26     log_path = setting.logFile
27     if os.path.exists(log_path):
28         log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
29     else:
30         os.mkdir(r'%s' % log_path)
31         log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
32     logger = logging.getLogger()
33     logger.setLevel(logLevel[log_level])
34     if not logger.handlers:
35         # 创建一个handler,用于写入日志文件
36         fh = logging.FileHandler(log_file)
37         fh.setLevel(logLevel[log_level])
38         # 再创建一个handler,用于输出到控制台
39         ch = logging.StreamHandler()
40         ch.setLevel(logging.ERROR)
41         # 定义handler的输出格式
42         formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
43         fh.setFormatter(formatter)
44         ch.setFormatter(formatter)
45         # 给logger添加handler
46         logger.addHandler(fh)
47         logger.addHandler(ch)
48         loggers.update(dict(name=logger))
49     return  logger
  2、http测试类



httprequest.py


# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-20'
from pyapilog import pyapilog
import requests
import json
import urllib
class SendHttpRequest(object):
def __init__(self, url):
self.url = url
# post request
def post(self, value=None):
params = urllib.urlencode(value)
try:
req = requests.post(self.url + "?%s" % params)
except Exception, err:
print err
if req.status_code == 200:
pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
else:
pyapilog().error(u"发送post请求: %s   服务器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))
return req.text
def post_json(self, value):
head = {'content-type': 'application/json'}
try:
req = requests.post(self.url, data=json.dumps(value), headers=head)
print req.url
except Exception, err:
print err
if req.status_code == 200:
pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
return req.text
else:
pyapilog().error(u"发送post请求: %s   服务器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))
def get(self, value=None):
try:
req = requests.get(self.url, params=value)
except Exception, err:
print err
if req.status_code == 200:
pyapilog().info(u"发送get请求: %s   服务器返回:  %s" % (req.url, req.status_code))
else:
pyapilog().error(u"发送get请求: %s   服务器返回:  %s\n error info: %s " % (req.url, req.status_code, req.text))
return req.text
  3、数据库操作类



databasedriver.py# -*-coding:utf-8 -*# !/usr/bin/python


__author__ = 'dongjie'
__data__ = '2015-05-21'
import pymssql
import MySQLdb
import setting
from pyapilog import pyapilog
class sqldriver(object):
def __init__(self, host, port, user, password, database):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
# 执行SQLserver查询
def exec_mssql(self, sql):
try:
conn = pymssql.connect(host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset="utf8")
cur = conn.cursor()
if cur:
pyapilog().info(u"执行SQL语句|%s|" % sql)
cur.execute(sql)
rows = cur.fetchall()
if len(rows) == 0:
pyapilog().warning(u"没有查询到数据")
return rows
else:
pyapilog().error(u"数据库连接不成功")
conn.close()
except Exception, e:
pyapilog().error(e)
# 执行Mysql查询
def exec_mysql(self, sql):
try:
conn = MySQLdb.connect(host=self.host,
port=self.port,
user=self.user,
passwd=self.password,
db=self.database,
)
cur = conn.cursor()
if cur:
pyapilog().info(u"执行SQL语句|%s|" % sql)
resList = cur.execute(sql)
return resList
except Exception, e:
pyapilog().error(e)
# 执行sql语句返回结果
def execsql(sql):
config = setting.DATABASE
driver = config.get("ENGINE")
host = config.get("HOST")
port = config.get("PORT")
user = config.get("USER")
password = config.get("PWD")
database = config.get("DATABASE")
if driver == "MYSQL":
try:
sql_result = sqldriver(
host=host,
port=port,
user=user,
password=password,
database=database
).exec_mysql(sql)
return sql_result
except Exception, e:
pyapilog().error(e)
elif driver == "MSSQL":
try:
sql_result = sqldriver(
host=host,
port=port,
user=user,
password=password,
database=database
).exec_mssql(sql)
return sql_result
except Exception, e:
pyapilog().error(e)
else:
pyapilog().error(u"[%s]数据库配置支持MYSQL、MSSQL、ORACLE" % driver)

  4、解析json字符串



dataprase.py



# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-21'
import json
import xmltodict
from pyapilog import pyapilog
# 解析json字符串
class jsonprase(object):
def __init__(self, json_value):
try:
self.json_value = json.loads(json_value)
except ValueError, e:
pyapilog().error(e)
def find_json_node_by_xpath(self, xpath):
elem = self.json_value
nodes = xpath.strip("/").split("/")
for x in range(len(nodes)):
try:
elem = elem.get(nodes[x])
except AttributeError:
elem = [y.get(nodes[x]) for y in elem]
return elem
def datalength(self, xpath="/"):
return len(self.find_json_node_by_xpath(xpath))
@property
def json_to_xml(self):
try:
root = {"root": self.json_value}
xml = xmltodict.unparse(root, pretty=True)
except ArithmeticError, e:
pyapilog().error(e)
return xml
# 解析xml字符串
class xmlprase(object):
def __init__(self, xml_value):
self.xml_str = xml_value
@property
def xml_to_json(self):
try:
xml_dic = xmltodict.parse(self.xml_str,
encoding="utf-8",
process_namespaces=True,
)
json_str = json.dumps(xml_dic)
except Exception, e:
print e
return json_str
  5、还有配置文件差点忘记说了



# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-20'
'''
配置系统相关的参数,提供全局的相关配置
'''
import os
import sys
root_dir = '/'.join(os.path.realpath(__file__).split('/')[:-1])
sys.path.append(root_dir)
# log等级,1:notset 2:debug  3:info 4:warning 5:error 6:critical
logLevel = 2
# 日志文件路径
logFile = os.path.join(root_dir, 'logs')
# 数据库配置,支持MYSQL、MSSQL、ORACLE
DATABASE = {
"ENGINE": "MSSQL",
"HOST": "",
"PORT": 3433,
"USER": "",
"PWD": "",
"DATABASE": ""
}
  6、最后看看我们的测试用例吧,当然是数据驱动了



# -*-coding:utf-8 -*-
from ddt import ddt, data, unpack
import unittest
from papi.httprequest import SendHttpRequest
from papi.dataparse import jsonprase, xmlprase

@ddt
class TestSingleRequest(unittest.TestCase):
def setUp(self):
self.url = "http://xxxxxxxxxxxxxxxxxxx/api/xxxxxx"
    @data(
(32351, 6),
(9555, 4)
)
@unpack
def test_Single_right(self, sid, count):
value = {"sid": sid, "count": count}
data = SendHttpRequest(self.url).get(value)
json_data = jsonprase(data)
point_lat = json_data.find_json_node_by_xpath("/Point/Lat")
point_lng = json_data.find_json_node_by_xpath("/Point/Lng")
is_exists_map = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/IsExistsMap")
size = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/Size")
# 断言
assert float(point_lat) != 0 and float(point_lng) != 0
# 断言
assert json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/DownUrl") is not None
if is_exists_map == True:
assert size != ""
# 导常请求SingleRequest接口
    @data(
("abceeffffg", 6),
(9555, "")
)
@unpack
def test_Single_error(self, sid, count):
value = {"sid": sid, "count": count}
data = SendHttpRequest(self.url).get(value)
self.assertEqual(data, u'{"Message":"请求无效。"}')
@ddt
class TourMaps(unittest.TestCase):
def setUp(self):
self.url = "http://xxxxxx/api/TourMap"
@data(32351, 9555)
def test_requests_online_xml(self, tourId):
xml_url = self.url + "/%s" % tourId
data = SendHttpRequest(xml_url).get()
json_st = xmlprase(data).xml_to_json
json_data = jsonprase(json_st)
lng = json_data.find_json_node_by_xpath("/root/data/@lng")
lat = json_data.find_json_node_by_xpath("/root/data/@lat")
assert lng != "" and lat != ""
son_tour = json_data.find_json_node_by_xpath("/root/data/data")
assert len(son_tour) > 0
class TourData(unittest.TestCase):
def setUp(self):
self.url = "http://xxxxxx/api/xxx"
@data(
(),
(),
(),
)
@unpack
def test_tourList_Location_in_open(self):
pass
def test_tourList_Location_not_open(self):
pass
def test_tour_open_city(self):
pass

if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TourMaps, TestSingleRequest)
unittest.TextTestRunner(verbosity=2).run(suite)
  关于python ddt查以参考https://ddt.readthedocs.org/en/latest/example.html
  测试结果生成,可以查看python nose相关文档,生成hmtl
  本文主要提供了代码,如果相关疑问可联系本人,邮箱d1988505j@163.com

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-145083-1-1.html 上篇帖子: 读Flask源代码学习Python 下篇帖子: 2015/9/1 Python基础(6):列表
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表