目前的思路如下:
- 把 postman 的请求集合文件、环境变量文件、全局变量文件全部下载到本地,分别存为
xxx.collection.json
、xxx.env.json
和xxx.globals.json
- 写脚本解析
xxx.collection.json
文件的请求,并使用 requests 发送请求。解析xxx.env.json
文件的环境变量,读写并解析xxx.globals.json
文件的全局变量 - 脚本放到 linux 服务器上去执行,我的是 ubuntu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import sys
import requests
import json
import time
import logging
from functools import wraps
from colorama import Fore, Style
# 日志配置
_logger = logging.getLogger('postman-test') # 获取日志记录器
_logger.setLevel(logging.DEBUG) # 设置日志等级
_handler = logging.StreamHandler(sys.stdout) # 输入到控制台的 handler
_logger.addHandler(_handler)
def info(msg):
'''日志函数'''
now = time.strftime("%Y-%m-%d %H:%M:%S")
_logger.info(Fore.GREEN + now + " [INFO] " + str(msg) + Style.RESET_ALL)
def error(msg):
now = time.strftime("%Y-%m-%d %H:%M:%S")
_logger.error(Fore.RED + now + " [ERROR] " + str(msg) + Style.RESET_ALL)
def _print(msg):
now = time.strftime("%Y-%m-%d %H:%M:%S")
_logger.debug(Fore.BLUE + now + " [PRINT] " + str(msg) + Style.RESET_ALL)
def traceback_error(func):
@wraps(func)
def wraper(self, *args, **kwargs):
try:
result = func(self, *args, **kwargs)
except Exception as e:
import traceback
ex_msg = '{exception}'.format(exception=traceback.format_exc())
print(ex_msg)
result = ex_msg
return result
return wraper
class DingDingNotice:
'''钉钉发送类'''
def __init__(self, ding_token=None, atMobiles=None, isAtAll=None):
# 根据电话@用户
self.atMobiles = ['13046367204',] if atMobiles==None else atMobiles
# self.token = 'cbb3b771657ef' if ding_token==None else ding_token
# 是否@所有人
self.isAtAll = True if isAtAll==None else isAtAll
self.token = 'xxxxxxxxxxx' # 钉钉群机器人token
self.api = 'https://oapi.dingtalk.com/robot/send?access_token={}'.format(self.token)
self.headers = {'Content-Type': 'application/json;charset=utf-8'}
@traceback_error
def send_msg(self,content):
msg = {
'msgtype': 'text',
'text': {'content': content},
'at': {'atMobiles': self.atMobiles, 'isAtAll': self.isAtAll}
}
data = requests.post(self.api, data=json.dumps(msg), headers=self.headers).json()
return json.dumps(data)
user = requests.Session()
with open(sys.argv[1]) as f:
'''从命令行获取测试数据 xxx.collection.json(定时任务需要指定绝对路径)'''
test_data = json.load(f)
class JsonRunner:
'''测试执行'''
def __init__(self):
self.name = None
self.url = None
self.method = None
self.headers = None
self.data = None
self.res = None
self.success = None
def run(self):
'''测试执行'''
for req in test_data['item']:
self.url = req['request']['url']['raw']
self.method = req['request']['method']
self.headers = dict()
self.name = req['name']
# _print(self.name)
# _print(self.url)
# _print(self.method)
if req['request']['header']:
for self.header in req['request']['header']:
# print('{}: {}'.format(header['key'], header['value']))
self.headers[self.header['key']] = self.header['value']
# print("headers: ", headers)
if self.method == 'POST':
if req['request']['body']['mode'] == 'raw':
self.data = req['request']['body']['raw']
# print("body: ", data)
self.res = user.post(self.url, headers=self.headers, data=self.data)
# success = res.json()["success"]
elif req['request']['body']['mode'] == 'urlencoded':
self.data = dict()
if req['request']['body']['urlencoded']:
for param in req['request']['body']['urlencoded']:
self.data[param['key']] = param['value']
# print("data: ", data)
self.res = user.post(self.url, headers=self.headers, data=self.data)
# success = res.json()["success"]
elif self.method == 'GET':
self.res = user.get(self.url, headers=self.headers)
# success = res.json()["success"]
# print("success: ", self.success)
# print("data: ", self.data)
# print("headers: ", self.headers)
# print("\n")
if '.html' in self.url:
pass
else:
# self.success = self.res.json()["success"]
yield self.res.status_code
def run():
runner = JsonRunner()
r = runner.run()
dingding_msg = []
while True:
try:
runner.status_code = next(r)
if runner.status_code == 200 and '"success":true' in runner.res.text:
info("测试通过 >>>>>> {} {} {}".format(runner.name, runner.url, runner.method))
else:
error("测试失败 >>>>>> {} {} {}".format(runner.name, runner.url, runner.method))
fail_msg = '异常接口: {}\n响应状态: {}\n响应数据: {}'.format(runner.url, runner.status_code, runner.res.text)
dingding_msg.append(fail_msg)
except StopIteration:
_print("测试结束")
break
if dingding_msg:
dingding_msg = '跨境说服务监控温馨提醒您:\n\n' + '\n\n'.join(dingding_msg)
service = DingDingNotice()
service.send_msg(content=dingding_msg)
_print("钉钉消息发送成功")
if __name__ == '__main__':
run()
_print("脚本执行完成")