From 4c6d22737079ca31929264c23e6c6566ecdd69f3 Mon Sep 17 00:00:00 2001 From: Victor Seva Date: Fri, 11 Sep 2020 12:06:16 +0200 Subject: [PATCH] TT#81700 bin/check.sh: add info of what sections failed * bin/check.py: return different values depending on what section check fails reformat using black tool Change-Id: Ie393766ba5df0951fb9f51c1c780cc5be8673fc7 --- bin/check.py | 235 ++++++++++++++++++++++++-------------------- bin/check.sh | 26 ++++- tests/test_check.py | 167 +++++++++++++++---------------- 3 files changed, 234 insertions(+), 194 deletions(-) diff --git a/bin/check.py b/bin/check.py index ff912600..6810adfe 100755 --- a/bin/check.py +++ b/bin/check.py @@ -24,8 +24,10 @@ import re import argparse import json import logging +from enum import IntFlag from yaml import load + try: from yaml import CLoader as Loader except ImportError: @@ -37,66 +39,75 @@ class XAvp: """ Class to simulate the xavp """ def __init__(self, name, data): - result = re.match(r'\$xavp\((\w+)\)', name) + result = re.match(r"\$xavp\((\w+)\)", name) try: self._name = result.group(1) except Exception: - raise Exception('not a xavp') + raise Exception("not a xavp") self._data = data def get(self, str): info = XAvp.parse(str) - if self._name != info['name']: - raise KeyError( - 'diferent name. name:%s != %s' % (self._name, info['name']) - ) + if self._name != info["name"]: + msg = "diferent name. name:{} != {}" + raise KeyError(msg.format(self._name, info["name"])) nsize = len(self._data) - if nsize <= info['nindx']: - raise IndexError('%s has %d elements' % (self._name, nsize)) + if nsize <= info["nindx"]: + raise IndexError("%s has %d elements" % (self._name, nsize)) - if info['key'] in self._data[info['nindx']]: - values = self._data[info['nindx']][info['key']] + if info["key"] in self._data[info["nindx"]]: + values = self._data[info["nindx"]][info["key"]] else: - raise KeyError('no %s key found' % info['key']) + raise KeyError("no %s key found" % info["key"]) - if info['kindx'] == '*': + if info["kindx"] == "*": return values ksize = len(values) - if ksize <= info['kindx']: - raise IndexError('%s has %d elements not %s' % - (info['key'], ksize, info['kindx'])) + if ksize <= info["kindx"]: + msg = "{} has {} elements not {}" + raise IndexError(msg.format(info["key"], ksize, info["kindx"])) - return values[info['kindx']] + return values[info["kindx"]] @classmethod def parse(cls, str): - pattern_nindx = r'(\[(?P<%s>\d+)\])?' % 'nindx' - pattern_kindx = r'(\[(?P<%s>\d+|\*+)\])?' % 'kindx' - pattern = r'\$xavp\((?P\w+)%s(=>(?P\w+)%s)?\)' % ( - pattern_nindx, pattern_kindx) + pattern_nindx = r"(\[(?P<%s>\d+)\])?" % "nindx" + pattern_kindx = r"(\[(?P<%s>\d+|\*+)\])?" % "kindx" + pattern = r"\$xavp\((?P\w+)%s(=>(?P\w+)%s)?\)" % ( + pattern_nindx, + pattern_kindx, + ) result = re.match(pattern, str) if result is not None: try: - nindx = int(result.group('nindx')) - except: + nindx = int(result.group("nindx")) + except Exception: nindx = 0 try: - kindx = int(result.group('kindx')) + kindx = int(result.group("kindx")) except Exception: - if result.group('kindx') == '*': - kindx = '*' + if result.group("kindx") == "*": + kindx = "*" else: kindx = 0 return { - 'name': result.group('name'), - 'nindx': nindx, - 'key': result.group('key'), - 'kindx': kindx} + "name": result.group("name"), + "nindx": nindx, + "key": result.group("key"), + "kindx": kindx, + } else: - raise Exception('no xavp') + raise Exception("no xavp") + + +class Section(IntFlag): + FLOW = 2 + FLOW_VARS = 4 + SIP_IN = 8 + SIP_OUT = 16 class Test: @@ -105,26 +116,27 @@ class Test: def __init__(self): self._step = [] - self._errflag = False + self._errflag = 0 def comment(self, msg): """ Add a comment """ - self._step.append({'result': None, 'msg_ok': msg}) + self._step.append({"result": None, "msg_ok": msg}) def ok(self, msg=None): """ Add a ok result """ - self._step.append({'result': True, 'msg_ok': msg}) + self._step.append({"result": True, "msg_ok": msg}) - def error(self, msg_err): + def error(self, section, msg_err): """ Add an error result""" - self._step.append({'result': False, 'msg_err': msg_err}) - self._errflag = True + self._step.append({"result": False, "msg_err": msg_err}) + self._errflag += section @classmethod def compare(cls, val0, val1): - logging.debug("val0:[%s]:'%s' val1:[%s]:'%s'" % - (type(val0), str(val0), type(val1), - str(val1))) + logging.debug( + "val0:[%s]:'%s' val1:[%s]:'%s'" + % (type(val0), str(val0), type(val1), str(val1)) + ) if isinstance(val0, str): if re.search(val0, str(val1)) is not None: return True @@ -132,7 +144,7 @@ class Test: return False elif isinstance(val0, int): try: - result = (val0 == int(val1)) + result = val0 == int(val1) except Exception: result = False elif isinstance(val0, list) and isinstance(val1, list): @@ -147,25 +159,25 @@ class Test: logging.debug(e) return False else: - result = (val0 == val1) + result = val0 == val1 return result - def test(self, value_expected, value, msg_err, msg_ok=None): + def test(self, section, value_expected, value, msg_err, msg_ok=None): """ Test two values and add the result""" result = Test.compare(value_expected, value) - self._step.append( - {'result': result, 'msg_err': msg_err, 'msg_ok': msg_ok}) + val = {"result": result, "msg_err": msg_err, "msg_ok": msg_ok} + self._step.append(val) if not result: - self._errflag = True + self._errflag += section def isError(self): - return self._errflag + return self._errflag != 0 def _num_tests(self): """get the num of tests""" test = 0 for s in self._step: - if (s['result'] is not None): + if s["result"] is not None: test = test + 1 return test @@ -174,123 +186,136 @@ class Test: output = "1..%s\n" % self._num_tests() test = 1 for s in self._step: - if (s['result'] is None): - output += '# %s\n' % s['msg_ok'] + if s["result"] is None: + output += "# %s\n" % s["msg_ok"] continue - elif (s['result']): - if (s['msg_ok'] is not None): - output += "ok %d - %s\n" % (test, s['msg_ok']) + elif s["result"]: + if s["msg_ok"] is not None: + output += "ok %d - %s\n" % (test, s["msg_ok"]) else: output += "ok %d\n" % test else: - output += "not ok %d - ERROR: %s\n" % (test, s['msg_err']) + output += "not ok %d - ERROR: %s\n" % (test, s["msg_err"]) test = test + 1 return output def check_flow_vars(sk, sv, cv, test): """ check the vars on a flow level""" + sec = Section.FLOW_VARS for k in sv.keys(): logging.debug("check k:'%s'" % k) - if(k not in cv): + if k not in cv: try: info = XAvp.parse(k) - search_key = '$xavp(%s)' % info['name'] - if(search_key not in cv): - raise Exception("search_key: %s info:%s" % - (search_key, info)) + search_key = "$xavp(%s)" % info["name"] + if search_key not in cv: + msg = "search_key: {} info:{}" + raise Exception(msg.format(search_key, info)) xavp = XAvp(search_key, cv[search_key]) val = xavp.get(k) logging.debug("testing %s == %s" % (sv[k], val)) - test.test(sv[k], val, - 'flow[%s] expected %s == %s but is %s' % - (sk, k, sv[k], val), - 'flow[%s] %s' % (sk, k)) + msg_err = "flow[{}] expected {} == {} but is {}" + test.test( + sec, + sv[k], + val, + msg_err.format(sk, k, sv[k], val), + "flow[{}] {}".format(sk, k), + ) except LookupError as err: - if(sv[k] == 'None'): - test.ok('flow[%s] %s is not there' % (sk, k)) + if sv[k] == "None": + test.ok("flow[%s] %s is not there" % (sk, k)) else: - test.error('LookupError with %s. Error:%s' % (k, err)) + test.error(sec, "LookupError with %s. Error:%s" % (k, err)) except Exception as err: - if(sv[k] == 'None'): - test.ok('flow[%s] %s is not there' % (sk, k)) + if sv[k] == "None": + test.ok("flow[%s] %s is not there" % (sk, k)) else: - test.error( - 'Expected var %s on flow[%s]. %s' % (k, sk, err)) + msg = "Expected var {} on flow[{}]. {}" + test.error(sec, msg.format(k, sk, err)) else: logging.debug("sv[k]:'%s' cv[k]:'%s'" % (sv[k], cv[k])) - test.test(sv[k], cv[k], 'flow[%s] expected %s == %s but is %s' % ( - sk, k, sv[k], cv[k]), 'flow[%s] %s' % (sk, k)) + msg_err = "flow[{}] expected {} == {} but is {}" + test.test( + sec, + sv[k], + cv[k], + msg_err.format(sk, k, sv[k], cv[k]), + "flow[%s] %s" % (sk, k), + ) def check_flow(scen, check, test): """ checks the flow and the vars inside""" + sec = Section.FLOW for i in range(len(scen)): (sk, sv) = scen[i].popitem() try: (ck, cv) = check[i].popitem() except Exception: - test.error('wrong flow. Expected: %s but is nothing there' % sk) + msg = "wrong flow. Expected: {} but is nothing there" + test.error(sec, msg.format(sk)) continue - if(sk != ck): - test.error('wrong flow. Expected: %s but is %s' % (sk, ck)) + if sk != ck: + test.error(sec, "wrong flow. Expected: %s but is %s" % (sk, ck)) continue if sv is None: - test.ok('flow[%s] no var to check' % sk) + test.ok("flow[%s] no var to check" % sk) continue else: - test.ok('flow[%s]' % sk) + test.ok("flow[%s]" % sk) check_flow_vars(sk, sv, cv, test) - if(len(check) > len(scen)): + if len(check) > len(scen): line = [] for i in check: for k in i.keys(): line.append(k) - test.error('Expected to end but there are more flows %s' % line) + test.error(sec, "Expected to end but there are more flows %s" % line) -def check_sip(scen, msg, test): +def check_sip(sec, scen, msg, test): if isinstance(msg, list): if len(msg) != 1: - test.error('sip_in len != 1') + test.error(sec, "sip_in len != 1") return else: msg = msg[0] for rule in scen: - if rule.startswith('_:NOT:_'): + if rule.startswith("_:NOT:_"): flag = False rule = rule[7:] - msg_ok = '%s not match' - msg_ko = '%s match' + msg_ok = "%s not match" + msg_ko = "%s match" else: flag = True - msg_ok = '%s match' - msg_ko = '%s not match' + msg_ok = "%s match" + msg_ko = "%s not match" result = re.search(rule, msg) if (result is not None) == flag: test.ok(msg_ok % rule) continue - test.comment('result:%s' % result) - test.error(msg_ko % rule) + test.comment("result:%s" % result) + test.error(sec, msg_ko % rule) -def check_sip_out(scen, msgs, test): +def check_sip_out(sec, scen, msgs, test): num_msgs = len(msgs) num_scen = len(scen) - for i in (range(num_scen)): + for i in range(num_scen): test.comment("sip_out %d" % i) - if(i < num_msgs): - check_sip(scen[i], msgs[i], test) + if i < num_msgs: + check_sip(sec, scen[i], msgs[i], test) else: - test.error("sip_out[%d] does not exist" % i) - if (num_scen != num_msgs): - test.error("we expected %d out messages but we have %d" % - (num_scen, num_msgs)) + test.error(sec, "sip_out[%d] does not exist" % i) + if num_scen != num_msgs: + msg = "we expected {} out messages but we have {}" + test.error(sec, msg.format(num_scen, num_msgs)) def load_yaml(filepath): output = None - with io.open(filepath, 'r') as file: + with io.open(filepath, "r") as file: output = load(file, Loader=Loader) file.close() return output @@ -298,7 +323,7 @@ def load_yaml(filepath): def load_json(filepath): output = None - with io.open(filepath, 'r') as file: + with io.open(filepath, "r") as file: output = json.load(file) file.close() return output @@ -322,18 +347,18 @@ def main(args): try: check = load_check(args.kam_file) except Exception: - check = {'flow': [], 'sip_in': '', 'sip_out': []} + check = {"flow": [], "sip_in": "", "sip_out": []} test.error("Error loading file:%s" % args[1]) - test.comment('check flow') - check_flow(scen['flow'], check['flow'], test) - test.comment('check sip_in') - check_sip(scen['sip_in'], check['sip_in'], test) - test.comment('check sip_out') - check_sip_out(scen['sip_out'], check['sip_out'], test) + test.comment("check flow") + check_flow(scen["flow"], check["flow"], test) + test.comment("check sip_in") + check_sip(Section.SIP_IN, scen["sip_in"], check["sip_in"], test) + test.comment("check sip_out") + check_sip_out(Section.SIP_OUT, scen["sip_out"], check["sip_out"], test) print(test) if test.isError(): - sys.exit(1) + sys.exit(test._errflag) if __name__ == "__main__": diff --git a/bin/check.sh b/bin/check.sh index 50edadce..dd94873f 100755 --- a/bin/check.sh +++ b/bin/check.sh @@ -61,12 +61,24 @@ EOF echo "$(date) - $(basename "$2") NOT ok" } +function str_check_error() { + local err_type=() + + [[ $(($1 & 2)) -eq 0 ]] && err_type+=("FLOW") + [[ $(($1 & 4)) -eq 0 ]] && err_type+=("FLOW_VARS") + [[ $(($1 & 8)) -eq 0 ]] && err_type+=("SIP_IN") + [[ $(($1 & 16)) -eq 0 ]] && err_type+=("SIP_OUT") + + echo "${err_type[*]}" +} + check_retrans_next() { # testing next json file, if exist. Necessary in case of retransmissions or wrong timing/order. local next_msg local next_tap local kam_type local step + local err_type step=${4:-1} next_test_filepath "$1" "${step}" @@ -82,6 +94,8 @@ check_retrans_next() { mv "$3" "${3}_retrans" mv "${next_tap}" "$3" return 0 + else + err_type=$(str_check_error $?) fi if [ -a "${next_tap}" ] ; then @@ -93,7 +107,7 @@ check_retrans_next() { echo -n "$(date) - File $(basename "${next_msg}") doesn't exists. Result" fi - echo " NOT ok" + echo " NOT ok[${err_type}]" return 1 } @@ -103,6 +117,7 @@ check_retrans_prev() { local prev_tap local kam_type local step + local err_type step=${4:-1} prev_test_filepath "$1" "${step}" @@ -118,6 +133,8 @@ check_retrans_prev() { mv "$3" "${3}_retrans" mv "${prev_tap}" "$3" return 0 + else + err_type=$(str_check_error $?) fi if [ -a "${prev_tap}" ] ; then @@ -129,7 +146,7 @@ check_retrans_prev() { echo -n "$(date) - File $(basename "${prev_msg}") doesn't exists. Result" fi - echo " NOT ok" + echo " NOT ok[${err_type}]" return 1 } @@ -156,6 +173,7 @@ check_retrans_block() { check_test() { local dest local kam_type="--yaml" + local err_type dest=${RESULT_DIR}/$(basename "$3" .tap) @@ -178,9 +196,11 @@ check_test() { if "${BIN_DIR}/check.py" ${kam_type} "$1" "$2" > "$3" ; then echo " ok" return 0 + else + err_type=$(str_check_error $?) fi - echo " NOT ok" + echo " NOT ok[${err_type}]" if "${FIX_RETRANS}" ; then check_retrans_block "$1" "${kam_type}" "$3" "${RETRANS_SIZE}" && return 0 diff --git a/tests/test_check.py b/tests/test_check.py index 69af09d4..fe814967 100755 --- a/tests/test_check.py +++ b/tests/test_check.py @@ -1,6 +1,6 @@ #!/usr/bin/python3 # -# Copyright: 2013-2015 Sipwise Development Team +# Copyright: 2013-2020 Sipwise Development Team # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -23,109 +23,109 @@ import sys import junitxml import unittest import re -lib_path = os.path.abspath('bin') + +lib_path = os.path.abspath("bin") sys.path.append(lib_path) +from check import Section # noqa from check import check_sip, check_sip_out # noqa from check import XAvp, Test, check_flow, check_flow_vars # noqa from check import load_json, load_yaml # noqa -not_ok = re.compile('^not ok.*', re.MULTILINE) +not_ok = re.compile("^not ok.*", re.MULTILINE) class TestXAvp(unittest.TestCase): - def setUp(self): - self.name = 'test' + self.name = "test" self.data = [ - {'koko': [1]}, - {'koko': [1, 2]}, - {'lolo': [3], 'lola': [7]}, + {"koko": [1]}, {"koko": [1, 2]}, {"lolo": [3], "lola": [7]} ] - self.xavp = XAvp('$xavp(%s)' % self.name, self.data) + self.xavp = XAvp("$xavp(%s)" % self.name, self.data) def test_init(self): self.assertEqual(self.name, self.xavp._name) self.assertCountEqual(self.data, self.xavp._data) def test_init_wrong_type(self): - self.assertRaises(Exception, self.xavp, '$var(whatever)', None) + self.assertRaises(Exception, self.xavp, "$var(whatever)", None) def test_parse_type(self): self.assertRaisesRegex( - Exception, 'no xavp', XAvp.parse, '$var(whatever)') - self.assertRaisesRegex(Exception, 'no xavp', XAvp.parse, '$fU') + Exception, "no xavp", XAvp.parse, "$var(whatever)" + ) + self.assertRaisesRegex(Exception, "no xavp", XAvp.parse, "$fU") def test_get_wrong_name(self): - self.assertRaises(KeyError, self.xavp.get, '$xavp(otro)') - self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0])') - self.assertRaises(KeyError, self.xavp.get, '$xavp(otro=>whatever)') - self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0]=>whatever)') + self.assertRaises(KeyError, self.xavp.get, "$xavp(otro)") + self.assertRaises(KeyError, self.xavp.get, "$xavp(otro[0])") + self.assertRaises(KeyError, self.xavp.get, "$xavp(otro=>whatever)") + self.assertRaises(KeyError, self.xavp.get, "$xavp(otro[0]=>whatever)") def test_get_wrong_nindx(self): - self.assertRaises(IndexError, self.xavp.get, '$xavp(test[10]=>koko)') + self.assertRaises(IndexError, self.xavp.get, "$xavp(test[10]=>koko)") self.assertRaises( - IndexError, self.xavp.get, '$xavp(test[10]=>koko[1])') + IndexError, self.xavp.get, "$xavp(test[10]=>koko[1])" + ) def test_get_wrong_kindx(self): - self.assertRaises(IndexError, self.xavp.get, '$xavp(test[0]=>koko[1])') + self.assertRaises(IndexError, self.xavp.get, "$xavp(test[0]=>koko[1])") self.assertRaises( - IndexError, self.xavp.get, '$xavp(test[1]=>koko[10])') + IndexError, self.xavp.get, "$xavp(test[1]=>koko[10])" + ) def test_get_value(self): - self.assertEqual(self.xavp.get('$xavp(test=>koko)'), 1) - self.assertEqual(self.xavp.get('$xavp(test[1]=>koko[1])'), 2) - self.assertEqual(self.xavp.get('$xavp(test[2]=>lola[0])'), 7) + self.assertEqual(self.xavp.get("$xavp(test=>koko)"), 1) + self.assertEqual(self.xavp.get("$xavp(test[1]=>koko[1])"), 2) + self.assertEqual(self.xavp.get("$xavp(test[2]=>lola[0])"), 7) def test_get_value_all(self): - self.assertCountEqual(self.xavp.get('$xavp(test[1]=>koko[*])'), [1, 2]) + self.assertCountEqual(self.xavp.get("$xavp(test[1]=>koko[*])"), [1, 2]) class TestCheckFlowVars(unittest.TestCase): - def setUp(self): self.ctest = Test() self.check_ok = [ - {'R0': {'$xavp(v0)': [{'k0': [1], - 'k1': ['a', 'b', 'fuckthisshit']}, - {'k0': [1, 2], 'k1': ['a', ]}, - ], - 'fU': 'testpep', - }}, - {'R1': {'$xavp(v0)': [{'k0': [1, 2]}]}}, - ] - self.check_ko = [ - {'R0': {'$xavp(v0)': [{'k0': ['a', 'b']}]}}, - ] - self.scen_ko = [ - {'R0': {'$xavp(v0[0]=>k0[*])': ['a']}}, - ] - self.scen_noxavp = [ - {'R0': {'fU': 'testpep'}}, - {'R1': {}}, + { + "R0": { + "$xavp(v0)": [ + {"k0": [1], "k1": ["a", "b", "fuckthisshit"]}, + {"k0": [1, 2], "k1": ["a"]}, + ], + "fU": "testpep", + } + }, + {"R1": {"$xavp(v0)": [{"k0": [1, 2]}]}}, ] + self.check_ko = [{"R0": {"$xavp(v0)": [{"k0": ["a", "b"]}]}}] + self.scen_ko = [{"R0": {"$xavp(v0[0]=>k0[*])": ["a"]}}] + self.scen_noxavp = [{"R0": {"fU": "testpep"}}, {"R1": {}}] self.scen = [ - {'R0': {'$xavp(v0[0]=>k0[0])': 1, - '$xavp(v0[0]=>k1[0])': 'a', - '$xavp(v0[0]=>k1[2])': '^f', - '$var(no)': 'None', - '$xavp(nono=>koko)': 'None', - '$xavp(v0=>k10)': 'None', - '$xavp(v0[1]=>k0[1])': '\d+', - '$xavp(v0[0]=>k1[*])': [1], - '$xavp(v0[0]=>k1[*])': ['a', 'b', 'fuckthisshit']} - }, - {'R1': {'$xavp(v0[1]=>k0[0])': 1}}, + { + "R0": { + "$xavp(v0[0]=>k0[0])": 1, + "$xavp(v0[0]=>k1[0])": "a", + "$xavp(v0[0]=>k1[2])": "^f", + "$var(no)": "None", + "$xavp(nono=>koko)": "None", + "$xavp(v0=>k10)": "None", + "$xavp(v0[1]=>k0[1])": r"\d+", + "$xavp(v0[0]=>k1[*])": ["a", "b", "fuckthisshit"], + } + }, + {"R1": {"$xavp(v0[1]=>k0[0])": 1}}, ] def testXAvp(self): - data = self.check_ok[0]['R0']['$xavp(v0)'] - xavp = XAvp('$xavp(v0)', data) + data = self.check_ok[0]["R0"]["$xavp(v0)"] + xavp = XAvp("$xavp(v0)", data) - self.assertEqual(xavp.get('$xavp(v0=>k0)'), 1) + self.assertEqual(xavp.get("$xavp(v0=>k0)"), 1) self.assertEqual( - xavp.get('$xavp(v0[0]=>k1[*])'), ['a', 'b', 'fuckthisshit']) - self.assertEqual(xavp.get('$xavp(v0[1]=>k0[1])'), 2) - self.assertEqual(xavp.get('$xavp(v0[1]=>k1[*])'), ['a']) + xavp.get("$xavp(v0[0]=>k1[*])"), ["a", "b", "fuckthisshit"] + ) + self.assertEqual(xavp.get("$xavp(v0[1]=>k0[1])"), 2) + self.assertEqual(xavp.get("$xavp(v0[1]=>k1[*])"), ["a"]) def testFlow_noxavp(self): check_flow(self.scen_noxavp, self.check_ok, self.ctest) @@ -133,14 +133,14 @@ class TestCheckFlowVars(unittest.TestCase): def testFlowVars_noxavp(self): check_flow_vars( - 'RO', self.scen_noxavp[0]['R0'], - self.check_ok[0]['R0'], self.ctest) + "RO", self.scen_noxavp[0]["R0"], self.check_ok[0]["R0"], self.ctest + ) self.assertFalse(self.ctest.isError()) def testFlowVars_xavp(self): check_flow_vars( - 'RO', self.scen[0]['R0'], - self.check_ok[0]['R0'], self.ctest) + "RO", self.scen[0]["R0"], self.check_ok[0]["R0"], self.ctest + ) self.assertFalse(self.ctest.isError()) def testFlow_fail(self): @@ -151,62 +151,57 @@ class TestCheckFlowVars(unittest.TestCase): def testFlowVars_fail(self): check_flow_vars( - 'RO', self.scen_ko[0]['R0'], - self.check_ko[0]['R0'], self.ctest) + "RO", self.scen_ko[0]["R0"], self.check_ko[0]["R0"], self.ctest + ) tap = str(self.ctest) self.assertTrue(self.ctest.isError(), tap) self.assertIsNotNone(not_ok.search(tap), tap) class TestCheckSipIn(unittest.TestCase): - def setUp(self): self.ctest = Test() - self.msg = open('./tests/fixtures/sip_in.txt', 'r').read() + self.msg = open("./tests/fixtures/sip_in.txt", "r").read() def testSipIn(self): - sip_in = load_yaml('./tests/fixtures/test_sip_in.yml') - check_sip(sip_in, self.msg, self.ctest) + sip_in = load_yaml("./tests/fixtures/test_sip_in.yml") + check_sip(Section.SIP_IN, sip_in, self.msg, self.ctest) self.assertFalse(self.ctest.isError()) class TestCheckSipOut(unittest.TestCase): - def setUp(self): self.ctest = Test() - self.msg = load_yaml('./tests/fixtures/sip_out.yml') + self.msg = load_yaml("./tests/fixtures/sip_out.yml") def testSipOut(self): - sip_out = load_yaml('./tests/fixtures/test_sip_out.yml') - check_sip_out(sip_out, self.msg, self.ctest) + sip_out = load_yaml("./tests/fixtures/test_sip_out.yml") + check_sip_out(Section.SIP_OUT, sip_out, self.msg, self.ctest) self.assertFalse(self.ctest.isError()) class TestJson(unittest.TestCase): - def setUp(self): self.ctest = Test() def testFail(self): - check = load_json('./tests/fixtures/fail.json') - scen = load_yaml('./tests/fixtures/scen_fail.yml') - check_flow(scen['flow'], check['flow'], self.ctest) + check = load_json("./tests/fixtures/fail.json") + scen = load_yaml("./tests/fixtures/scen_fail.yml") + check_flow(scen["flow"], check["flow"], self.ctest) tap = str(self.ctest) self.assertTrue(self.ctest.isError(), tap) self.assertIsNotNone(not_ok.search(tap), tap) -if __name__ == '__main__': +if __name__ == "__main__": suite = unittest.TestSuite() - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestXAvp)) - suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestCheckFlowVars)) - suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestCheckSipIn)) - suite.addTest( - unittest.defaultTestLoader.loadTestsFromTestCase(TestCheckSipOut)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestJson)) + load = unittest.defaultTestLoader.loadTestsFromTestCase + suite.addTest(load(TestXAvp)) + suite.addTest(load(TestCheckFlowVars)) + suite.addTest(load(TestCheckSipIn)) + suite.addTest(load(TestCheckSipOut)) + suite.addTest(load(TestJson)) result = junitxml.JUnitXmlResult(sys.stdout) result.startTestRun() suite.run(result)