TT#81700 bin/cdr_check.py: add info of what failed as comment

Previously there was no indication of what match was falling

* migrate to argparse
* format with black
* resolve some lint issues
* remove unused code

Change-Id: Iac06c29ef832250e322d64473837181d3d4effcb
mr9.5.1
Victor Seva 5 years ago committed by Víctor Seva
parent b6c960a146
commit 90a6e1cda2

@ -1,6 +1,6 @@
#!/usr/bin/python3
#
# Copyright: 2013-2015 Sipwise Development Team <support@sipwise.com>
# Copyright: 2013-2021 Sipwise Development Team <support@sipwise.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -21,15 +21,15 @@
import io
import sys
import re
import getopt
import argparse
import json
import logging
from yaml import load
from pprint import pprint
try:
from yaml import CLoader as Loader
except:
except ImportError:
from yaml import Loader
@ -43,22 +43,24 @@ class Test:
def comment(self, msg):
""" Add a comment """
self._step.append({'result': None, 'msg_ok': msg})
self._step.append({"result": None, "msg_ok": msg})
def ok(self, msg=None):
""" Add a ok result """
self._step.append({'result': True, 'msg_ok': msg})
self._step.append({"result": True, "msg_ok": msg})
def error(self, msg_err):
""" Add an error result"""
self._step.append({'result': False, 'msg_err': msg_err})
self._step.append({"result": False, "msg_err": msg_err})
self._errflag = True
@classmethod
def compare(cls, val0, val1):
logging.debug("val0:[%s]:'%s' val1:[%s]:'%s'" %
(type(val0), str(val0), type(val1),
str(val1)))
logging.debug(
"val0:[{}]:'{}' val1:[{}]:'{}'".format(
type(val0), str(val0), type(val1), str(val1)
)
)
if isinstance(val0, str):
if re.search(val0, str(val1)) is not None:
return True
@ -66,8 +68,8 @@ class Test:
return False
elif isinstance(val0, int):
try:
result = (val0 == int(val1))
except:
result = val0 == int(val1)
except ValueError:
result = False
elif isinstance(val0, list) and isinstance(val1, list):
size = len(val0)
@ -81,14 +83,15 @@ class Test:
logging.debug(e)
return False
else:
result = (val0 == val1)
result = val0 == val1
return result
def test(self, value_expected, value, msg_err, msg_ok=None):
""" Test two values and add the result"""
result = Test.compare(value_expected, value)
self._step.append(
{'result': result, 'msg_err': msg_err, 'msg_ok': msg_ok})
{"result": result, "msg_err": msg_err, "msg_ok": msg_ok}
)
if not result:
self._errflag = True
@ -99,94 +102,56 @@ class Test:
"""get the num of tests"""
test = 0
for s in self._step:
if (s['result'] is not None):
if s["result"] is not None:
test = test + 1
return test
def __str__(self):
"""get the TAP output"""
output = "1..%s\n" % self._num_tests()
output = "1..{}\n".format(self._num_tests())
test = 1
for s in self._step:
if (s['result'] is None):
output += '# %s\n' % s['msg_ok']
if s["result"] is None:
output += "# {}\n".format(s["msg_ok"])
continue
elif (s['result']):
if (s['msg_ok'] is not None):
output += "ok %d - %s\n" % (test, s['msg_ok'])
elif s["result"]:
if s["msg_ok"] is not None:
output += "ok {} - {}\n".format(test, s["msg_ok"])
else:
output += "ok %d\n" % test
output += "ok {}\n".format(test)
else:
output += "not ok %d - ERROR: %s\n" % (test, s['msg_err'])
output += "not ok {} - ERROR: {}\n".format(test, s["msg_err"])
test = test + 1
return output
def check_single_cdr(scen, msg, test):
if isinstance(msg, list):
if len(msg) != 1:
test.error('cdr len != 1')
return
else:
msg = msg[0]
for rule, value in scen.items():
value = str(value)
if rule not in msg:
test.error('%s not in cdr' % rule)
continue
if value.startswith('_:NOT:_'):
flag = False
value = value[7:]
msg_ok = '%s not match'
msg_ko = '%s match'
else:
flag = True
msg_ok = '%s match'
msg_ko = '%s not match'
result = re.search(value, msg[rule])
if (result is not None) == flag:
test.ok(msg_ok % rule)
continue
test.comment('result: %s' % result)
test.error(msg_ko % rule)
def check_cdr(scen, msgs, test):
num_scen = len(scen) # Expected CDRs
num_msgs = len(msgs) # Resulted CDRs
for i in (range(num_scen)):
test.comment("cdr %d" % i)
if(i < num_msgs):
check_single_cdr(scen[i], msgs[i], test)
else:
test.error("cdr[%d] does not exist" % i)
if (num_scen != num_msgs):
test.error("we expected %d cdr but we have %d" %
(num_scen, num_msgs))
def check_single_cdr_recursive(scen, msg):
validated = True
comments = []
oks = []
match_msg = ["{}:{} not match {}", "{}:{} match {}"]
for rule, value in scen.items():
value = str(value)
if rule not in msg:
validated = False
break
if value.startswith('_:NOT:_'):
if value.startswith("_:NOT:_"):
flag = False
value = value[7:]
msg_ok = '%s not match'
msg_ko = '%s match'
ok_idx = 0
ko_idx = 1
else:
flag = True
msg_ok = '%s match'
msg_ko = '%s not match'
ok_idx = 1
ko_idx = 0
result = re.search(value, msg[rule])
if (result is not None) == flag:
oks.append(msg_ok % rule)
ok_str = match_msg[ok_idx].format(rule, msg[rule], value)
oks.append(ok_str)
comments.append(ok_str)
continue
else:
comments.append(match_msg[ko_idx].format(rule, msg[rule], value))
validated = False
break
return validated, comments, oks
@ -195,16 +160,18 @@ def check_single_cdr_recursive(scen, msg):
def check_cdr_recursive(scen, msgs, test):
num_scen = len(scen) # Expected CDRs
num_msgs = len(msgs) # Result CDRs
for i in (range(num_scen)):
test.comment("cdr %d" % i)
for i in range(num_scen):
test.comment("cdr {}".format(i))
found = False
for message in msgs:
test.comment("comparing with cdr id %s" % message['id'])
valid, comments, oks = check_single_cdr_recursive(scen[i], message)
test.comment("comparing with cdr id {}".format(message["id"]))
valid, comments, oks = check_single_cdr_recursive(
scen[i], message
)
for msg in comments:
test.comment(msg)
# if expected and result CDRs fully matches -> exit loop
if valid:
for comment in comments:
test.comment(comment)
for ok in oks:
test.ok(ok)
msgs.remove(message)
@ -212,24 +179,16 @@ def check_cdr_recursive(scen, msgs, test):
break
# Otherwise, continue with the following one
if not found:
test.error("cdr[%d] does not find a correct match" % i)
if (num_scen != num_msgs):
test.error("we expected %d cdr but we have %d" %
(num_scen, num_msgs))
def usage():
print('Usage: mysql_check.py [OPTIONS] cdr_file cdr_test.yml')
print('-h: this help')
print('-d: debug')
print('-y: cdr_file in .yaml format')
print('-j: cdr_file in .json format')
print('-t: cdr_file in .text format')
test.error("cdr[{}] does not find a correct match".format(i))
if num_scen != num_msgs:
test.error(
"we expected {} cdr but we have {}".format(num_scen, num_msgs)
)
def load_yaml(filepath):
output = None
with io.open(filepath, 'r') as file:
with io.open(filepath, "r") as file:
output = load(file, Loader=Loader)
file.close()
return output
@ -237,81 +196,81 @@ def load_yaml(filepath):
def load_json(filepath):
output = None
with io.open(filepath, 'r') as file:
with io.open(filepath, "r") as file:
output = json.load(file)
file.close()
return output
def load_text(filepath):
output = {'cdr': []}
output = {"cdr": []}
row = {}
with open(filepath, 'r') as file:
with open(filepath, "r") as file:
line = file.readline()
while line:
if line.startswith('**********'):
if line.startswith("**********"):
if len(row) > 0:
output['cdr'].append(row)
output["cdr"].append(row)
row = {}
else:
elements = line.split(': ')
elements = line.split(": ")
if len(elements) == 2:
row[elements[0].strip()] = elements[1].strip()
elif len(elements) == 1:
row[elements[0].strip()] = ""
line = file.readline()
if len(row) > 0:
output['cdr'].append(row)
output["cdr"].append(row)
file.close()
return output
def main():
def main(args):
# default -y
load_check = load_yaml
try:
opts, args = getopt.getopt(
sys.argv[1:], "hyjtd", ["help", "yaml", "json", "text", "debug"])
except getopt.GetoptError as err:
# print help information and exit:
print(str(err)) # will print something like "option -a not recognized"
usage()
sys.exit(2)
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-y", "--yaml"):
load_check = load_yaml
elif o in ("-j", "--json"):
load_check = load_json
elif o in ("-t", "--text"):
load_check = load_text
elif o in ("-d", "--debug"):
logging.basicConfig(level=logging.DEBUG)
else:
assert False, "unhandled option"
if(len(args) != 2):
usage()
sys.exit(1)
if args.yaml:
load_check = load_yaml
elif args.json:
load_check = load_json
elif args.text:
load_check = load_text
if args.debug:
logging.basicConfig(level=logging.DEBUG)
scen = load_yaml(args[0])
scen = load_yaml(args.yml_file)
test = Test()
try:
check = load_check(args[1])
except:
check = {'cdr': []}
test.error("Error loading file:%s" % args[1])
check = load_check(args.cdr_file)
except Exception:
check = {"cdr": []}
test.error("Error loading file:%s" % args.cdr_file)
test.comment('check cdr record')
check_cdr_recursive(scen['cdr'], check['cdr'], test)
test.comment("check cdr record")
check_cdr_recursive(scen["cdr"], check["cdr"], test)
print(test)
if test.isError():
sys.exit(1)
if __name__ == "__main__":
main()
parser = argparse.ArgumentParser(
description="generate TAP result for CDR"
)
grp = parser.add_mutually_exclusive_group()
grp.add_argument(
"-y", "--yaml", action="store_true", help="YAML cdr_file"
)
grp.add_argument(
"-j", "--json", action="store_true", help="JSON cdr_file"
)
grp.add_argument(
"-t", "--text", action="store_true", help="TEXT cdr_file"
)
parser.add_argument("yml_file", help="YAML file with checks")
parser.add_argument("cdr_file", help="CDR file")
parser.add_argument("-d", "--debug", action="store_true")
args = parser.parse_args()
main(args)

Loading…
Cancel
Save