TT#81700 bin/check.sh: add info of what sections failed

* bin/check.py: return different values depending on what section
  check fails
  reformat using black tool

Change-Id: Ie393766ba5df0951fb9f51c1c780cc5be8673fc7
mr9.1.1
Victor Seva 6 years ago
parent 4099f1a4d7
commit 4c6d227370

@ -24,8 +24,10 @@ import re
import argparse
import json
import logging
from enum import IntFlag
from yaml import load
try:
from yaml import CLoader as Loader
except ImportError:
@ -37,66 +39,75 @@ class XAvp:
""" Class to simulate the xavp """
def __init__(self, name, data):
result = re.match(r'\$xavp\((\w+)\)', name)
result = re.match(r"\$xavp\((\w+)\)", name)
try:
self._name = result.group(1)
except Exception:
raise Exception('not a xavp')
raise Exception("not a xavp")
self._data = data
def get(self, str):
info = XAvp.parse(str)
if self._name != info['name']:
raise KeyError(
'diferent name. name:%s != %s' % (self._name, info['name'])
)
if self._name != info["name"]:
msg = "diferent name. name:{} != {}"
raise KeyError(msg.format(self._name, info["name"]))
nsize = len(self._data)
if nsize <= info['nindx']:
raise IndexError('%s has %d elements' % (self._name, nsize))
if nsize <= info["nindx"]:
raise IndexError("%s has %d elements" % (self._name, nsize))
if info['key'] in self._data[info['nindx']]:
values = self._data[info['nindx']][info['key']]
if info["key"] in self._data[info["nindx"]]:
values = self._data[info["nindx"]][info["key"]]
else:
raise KeyError('no %s key found' % info['key'])
raise KeyError("no %s key found" % info["key"])
if info['kindx'] == '*':
if info["kindx"] == "*":
return values
ksize = len(values)
if ksize <= info['kindx']:
raise IndexError('%s has %d elements not %s' %
(info['key'], ksize, info['kindx']))
if ksize <= info["kindx"]:
msg = "{} has {} elements not {}"
raise IndexError(msg.format(info["key"], ksize, info["kindx"]))
return values[info['kindx']]
return values[info["kindx"]]
@classmethod
def parse(cls, str):
pattern_nindx = r'(\[(?P<%s>\d+)\])?' % 'nindx'
pattern_kindx = r'(\[(?P<%s>\d+|\*+)\])?' % 'kindx'
pattern = r'\$xavp\((?P<name>\w+)%s(=>(?P<key>\w+)%s)?\)' % (
pattern_nindx, pattern_kindx)
pattern_nindx = r"(\[(?P<%s>\d+)\])?" % "nindx"
pattern_kindx = r"(\[(?P<%s>\d+|\*+)\])?" % "kindx"
pattern = r"\$xavp\((?P<name>\w+)%s(=>(?P<key>\w+)%s)?\)" % (
pattern_nindx,
pattern_kindx,
)
result = re.match(pattern, str)
if result is not None:
try:
nindx = int(result.group('nindx'))
except:
nindx = int(result.group("nindx"))
except Exception:
nindx = 0
try:
kindx = int(result.group('kindx'))
kindx = int(result.group("kindx"))
except Exception:
if result.group('kindx') == '*':
kindx = '*'
if result.group("kindx") == "*":
kindx = "*"
else:
kindx = 0
return {
'name': result.group('name'),
'nindx': nindx,
'key': result.group('key'),
'kindx': kindx}
"name": result.group("name"),
"nindx": nindx,
"key": result.group("key"),
"kindx": kindx,
}
else:
raise Exception('no xavp')
raise Exception("no xavp")
class Section(IntFlag):
FLOW = 2
FLOW_VARS = 4
SIP_IN = 8
SIP_OUT = 16
class Test:
@ -105,26 +116,27 @@ class Test:
def __init__(self):
self._step = []
self._errflag = False
self._errflag = 0
def comment(self, msg):
""" Add a comment """
self._step.append({'result': None, 'msg_ok': msg})
self._step.append({"result": None, "msg_ok": msg})
def ok(self, msg=None):
""" Add a ok result """
self._step.append({'result': True, 'msg_ok': msg})
self._step.append({"result": True, "msg_ok": msg})
def error(self, msg_err):
def error(self, section, msg_err):
""" Add an error result"""
self._step.append({'result': False, 'msg_err': msg_err})
self._errflag = True
self._step.append({"result": False, "msg_err": msg_err})
self._errflag += section
@classmethod
def compare(cls, val0, val1):
logging.debug("val0:[%s]:'%s' val1:[%s]:'%s'" %
(type(val0), str(val0), type(val1),
str(val1)))
logging.debug(
"val0:[%s]:'%s' val1:[%s]:'%s'"
% (type(val0), str(val0), type(val1), str(val1))
)
if isinstance(val0, str):
if re.search(val0, str(val1)) is not None:
return True
@ -132,7 +144,7 @@ class Test:
return False
elif isinstance(val0, int):
try:
result = (val0 == int(val1))
result = val0 == int(val1)
except Exception:
result = False
elif isinstance(val0, list) and isinstance(val1, list):
@ -147,25 +159,25 @@ class Test:
logging.debug(e)
return False
else:
result = (val0 == val1)
result = val0 == val1
return result
def test(self, value_expected, value, msg_err, msg_ok=None):
def test(self, section, value_expected, value, msg_err, msg_ok=None):
""" Test two values and add the result"""
result = Test.compare(value_expected, value)
self._step.append(
{'result': result, 'msg_err': msg_err, 'msg_ok': msg_ok})
val = {"result": result, "msg_err": msg_err, "msg_ok": msg_ok}
self._step.append(val)
if not result:
self._errflag = True
self._errflag += section
def isError(self):
return self._errflag
return self._errflag != 0
def _num_tests(self):
"""get the num of tests"""
test = 0
for s in self._step:
if (s['result'] is not None):
if s["result"] is not None:
test = test + 1
return test
@ -174,123 +186,136 @@ class Test:
output = "1..%s\n" % self._num_tests()
test = 1
for s in self._step:
if (s['result'] is None):
output += '# %s\n' % s['msg_ok']
if s["result"] is None:
output += "# %s\n" % s["msg_ok"]
continue
elif (s['result']):
if (s['msg_ok'] is not None):
output += "ok %d - %s\n" % (test, s['msg_ok'])
elif s["result"]:
if s["msg_ok"] is not None:
output += "ok %d - %s\n" % (test, s["msg_ok"])
else:
output += "ok %d\n" % test
else:
output += "not ok %d - ERROR: %s\n" % (test, s['msg_err'])
output += "not ok %d - ERROR: %s\n" % (test, s["msg_err"])
test = test + 1
return output
def check_flow_vars(sk, sv, cv, test):
""" check the vars on a flow level"""
sec = Section.FLOW_VARS
for k in sv.keys():
logging.debug("check k:'%s'" % k)
if(k not in cv):
if k not in cv:
try:
info = XAvp.parse(k)
search_key = '$xavp(%s)' % info['name']
if(search_key not in cv):
raise Exception("search_key: %s info:%s" %
(search_key, info))
search_key = "$xavp(%s)" % info["name"]
if search_key not in cv:
msg = "search_key: {} info:{}"
raise Exception(msg.format(search_key, info))
xavp = XAvp(search_key, cv[search_key])
val = xavp.get(k)
logging.debug("testing %s == %s" % (sv[k], val))
test.test(sv[k], val,
'flow[%s] expected %s == %s but is %s' %
(sk, k, sv[k], val),
'flow[%s] %s' % (sk, k))
msg_err = "flow[{}] expected {} == {} but is {}"
test.test(
sec,
sv[k],
val,
msg_err.format(sk, k, sv[k], val),
"flow[{}] {}".format(sk, k),
)
except LookupError as err:
if(sv[k] == 'None'):
test.ok('flow[%s] %s is not there' % (sk, k))
if sv[k] == "None":
test.ok("flow[%s] %s is not there" % (sk, k))
else:
test.error('LookupError with %s. Error:%s' % (k, err))
test.error(sec, "LookupError with %s. Error:%s" % (k, err))
except Exception as err:
if(sv[k] == 'None'):
test.ok('flow[%s] %s is not there' % (sk, k))
if sv[k] == "None":
test.ok("flow[%s] %s is not there" % (sk, k))
else:
test.error(
'Expected var %s on flow[%s]. %s' % (k, sk, err))
msg = "Expected var {} on flow[{}]. {}"
test.error(sec, msg.format(k, sk, err))
else:
logging.debug("sv[k]:'%s' cv[k]:'%s'" % (sv[k], cv[k]))
test.test(sv[k], cv[k], 'flow[%s] expected %s == %s but is %s' % (
sk, k, sv[k], cv[k]), 'flow[%s] %s' % (sk, k))
msg_err = "flow[{}] expected {} == {} but is {}"
test.test(
sec,
sv[k],
cv[k],
msg_err.format(sk, k, sv[k], cv[k]),
"flow[%s] %s" % (sk, k),
)
def check_flow(scen, check, test):
""" checks the flow and the vars inside"""
sec = Section.FLOW
for i in range(len(scen)):
(sk, sv) = scen[i].popitem()
try:
(ck, cv) = check[i].popitem()
except Exception:
test.error('wrong flow. Expected: %s but is nothing there' % sk)
msg = "wrong flow. Expected: {} but is nothing there"
test.error(sec, msg.format(sk))
continue
if(sk != ck):
test.error('wrong flow. Expected: %s but is %s' % (sk, ck))
if sk != ck:
test.error(sec, "wrong flow. Expected: %s but is %s" % (sk, ck))
continue
if sv is None:
test.ok('flow[%s] no var to check' % sk)
test.ok("flow[%s] no var to check" % sk)
continue
else:
test.ok('flow[%s]' % sk)
test.ok("flow[%s]" % sk)
check_flow_vars(sk, sv, cv, test)
if(len(check) > len(scen)):
if len(check) > len(scen):
line = []
for i in check:
for k in i.keys():
line.append(k)
test.error('Expected to end but there are more flows %s' % line)
test.error(sec, "Expected to end but there are more flows %s" % line)
def check_sip(scen, msg, test):
def check_sip(sec, scen, msg, test):
if isinstance(msg, list):
if len(msg) != 1:
test.error('sip_in len != 1')
test.error(sec, "sip_in len != 1")
return
else:
msg = msg[0]
for rule in scen:
if rule.startswith('_:NOT:_'):
if rule.startswith("_:NOT:_"):
flag = False
rule = rule[7:]
msg_ok = '%s not match'
msg_ko = '%s match'
msg_ok = "%s not match"
msg_ko = "%s match"
else:
flag = True
msg_ok = '%s match'
msg_ko = '%s not match'
msg_ok = "%s match"
msg_ko = "%s not match"
result = re.search(rule, msg)
if (result is not None) == flag:
test.ok(msg_ok % rule)
continue
test.comment('result:%s' % result)
test.error(msg_ko % rule)
test.comment("result:%s" % result)
test.error(sec, msg_ko % rule)
def check_sip_out(scen, msgs, test):
def check_sip_out(sec, scen, msgs, test):
num_msgs = len(msgs)
num_scen = len(scen)
for i in (range(num_scen)):
for i in range(num_scen):
test.comment("sip_out %d" % i)
if(i < num_msgs):
check_sip(scen[i], msgs[i], test)
if i < num_msgs:
check_sip(sec, scen[i], msgs[i], test)
else:
test.error("sip_out[%d] does not exist" % i)
if (num_scen != num_msgs):
test.error("we expected %d out messages but we have %d" %
(num_scen, num_msgs))
test.error(sec, "sip_out[%d] does not exist" % i)
if num_scen != num_msgs:
msg = "we expected {} out messages but we have {}"
test.error(sec, msg.format(num_scen, num_msgs))
def load_yaml(filepath):
output = None
with io.open(filepath, 'r') as file:
with io.open(filepath, "r") as file:
output = load(file, Loader=Loader)
file.close()
return output
@ -298,7 +323,7 @@ def load_yaml(filepath):
def load_json(filepath):
output = None
with io.open(filepath, 'r') as file:
with io.open(filepath, "r") as file:
output = json.load(file)
file.close()
return output
@ -322,18 +347,18 @@ def main(args):
try:
check = load_check(args.kam_file)
except Exception:
check = {'flow': [], 'sip_in': '', 'sip_out': []}
check = {"flow": [], "sip_in": "", "sip_out": []}
test.error("Error loading file:%s" % args[1])
test.comment('check flow')
check_flow(scen['flow'], check['flow'], test)
test.comment('check sip_in')
check_sip(scen['sip_in'], check['sip_in'], test)
test.comment('check sip_out')
check_sip_out(scen['sip_out'], check['sip_out'], test)
test.comment("check flow")
check_flow(scen["flow"], check["flow"], test)
test.comment("check sip_in")
check_sip(Section.SIP_IN, scen["sip_in"], check["sip_in"], test)
test.comment("check sip_out")
check_sip_out(Section.SIP_OUT, scen["sip_out"], check["sip_out"], test)
print(test)
if test.isError():
sys.exit(1)
sys.exit(test._errflag)
if __name__ == "__main__":

@ -61,12 +61,24 @@ EOF
echo "$(date) - $(basename "$2") NOT ok"
}
function str_check_error() {
local err_type=()
[[ $(($1 & 2)) -eq 0 ]] && err_type+=("FLOW")
[[ $(($1 & 4)) -eq 0 ]] && err_type+=("FLOW_VARS")
[[ $(($1 & 8)) -eq 0 ]] && err_type+=("SIP_IN")
[[ $(($1 & 16)) -eq 0 ]] && err_type+=("SIP_OUT")
echo "${err_type[*]}"
}
check_retrans_next() {
# testing next json file, if exist. Necessary in case of retransmissions or wrong timing/order.
local next_msg
local next_tap
local kam_type
local step
local err_type
step=${4:-1}
next_test_filepath "$1" "${step}"
@ -82,6 +94,8 @@ check_retrans_next() {
mv "$3" "${3}_retrans"
mv "${next_tap}" "$3"
return 0
else
err_type=$(str_check_error $?)
fi
if [ -a "${next_tap}" ] ; then
@ -93,7 +107,7 @@ check_retrans_next() {
echo -n "$(date) - File $(basename "${next_msg}") doesn't exists. Result"
fi
echo " NOT ok"
echo " NOT ok[${err_type}]"
return 1
}
@ -103,6 +117,7 @@ check_retrans_prev() {
local prev_tap
local kam_type
local step
local err_type
step=${4:-1}
prev_test_filepath "$1" "${step}"
@ -118,6 +133,8 @@ check_retrans_prev() {
mv "$3" "${3}_retrans"
mv "${prev_tap}" "$3"
return 0
else
err_type=$(str_check_error $?)
fi
if [ -a "${prev_tap}" ] ; then
@ -129,7 +146,7 @@ check_retrans_prev() {
echo -n "$(date) - File $(basename "${prev_msg}") doesn't exists. Result"
fi
echo " NOT ok"
echo " NOT ok[${err_type}]"
return 1
}
@ -156,6 +173,7 @@ check_retrans_block() {
check_test() {
local dest
local kam_type="--yaml"
local err_type
dest=${RESULT_DIR}/$(basename "$3" .tap)
@ -178,9 +196,11 @@ check_test() {
if "${BIN_DIR}/check.py" ${kam_type} "$1" "$2" > "$3" ; then
echo " ok"
return 0
else
err_type=$(str_check_error $?)
fi
echo " NOT ok"
echo " NOT ok[${err_type}]"
if "${FIX_RETRANS}" ; then
check_retrans_block "$1" "${kam_type}" "$3" "${RETRANS_SIZE}" && return 0

@ -1,6 +1,6 @@
#!/usr/bin/python3
#
# Copyright: 2013-2015 Sipwise Development Team <support@sipwise.com>
# Copyright: 2013-2020 Sipwise Development Team <support@sipwise.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -23,109 +23,109 @@ import sys
import junitxml
import unittest
import re
lib_path = os.path.abspath('bin')
lib_path = os.path.abspath("bin")
sys.path.append(lib_path)
from check import Section # noqa
from check import check_sip, check_sip_out # noqa
from check import XAvp, Test, check_flow, check_flow_vars # noqa
from check import load_json, load_yaml # noqa
not_ok = re.compile('^not ok.*', re.MULTILINE)
not_ok = re.compile("^not ok.*", re.MULTILINE)
class TestXAvp(unittest.TestCase):
def setUp(self):
self.name = 'test'
self.name = "test"
self.data = [
{'koko': [1]},
{'koko': [1, 2]},
{'lolo': [3], 'lola': [7]},
{"koko": [1]}, {"koko": [1, 2]}, {"lolo": [3], "lola": [7]}
]
self.xavp = XAvp('$xavp(%s)' % self.name, self.data)
self.xavp = XAvp("$xavp(%s)" % self.name, self.data)
def test_init(self):
self.assertEqual(self.name, self.xavp._name)
self.assertCountEqual(self.data, self.xavp._data)
def test_init_wrong_type(self):
self.assertRaises(Exception, self.xavp, '$var(whatever)', None)
self.assertRaises(Exception, self.xavp, "$var(whatever)", None)
def test_parse_type(self):
self.assertRaisesRegex(
Exception, 'no xavp', XAvp.parse, '$var(whatever)')
self.assertRaisesRegex(Exception, 'no xavp', XAvp.parse, '$fU')
Exception, "no xavp", XAvp.parse, "$var(whatever)"
)
self.assertRaisesRegex(Exception, "no xavp", XAvp.parse, "$fU")
def test_get_wrong_name(self):
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro)')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0])')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro=>whatever)')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0]=>whatever)')
self.assertRaises(KeyError, self.xavp.get, "$xavp(otro)")
self.assertRaises(KeyError, self.xavp.get, "$xavp(otro[0])")
self.assertRaises(KeyError, self.xavp.get, "$xavp(otro=>whatever)")
self.assertRaises(KeyError, self.xavp.get, "$xavp(otro[0]=>whatever)")
def test_get_wrong_nindx(self):
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[10]=>koko)')
self.assertRaises(IndexError, self.xavp.get, "$xavp(test[10]=>koko)")
self.assertRaises(
IndexError, self.xavp.get, '$xavp(test[10]=>koko[1])')
IndexError, self.xavp.get, "$xavp(test[10]=>koko[1])"
)
def test_get_wrong_kindx(self):
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[0]=>koko[1])')
self.assertRaises(IndexError, self.xavp.get, "$xavp(test[0]=>koko[1])")
self.assertRaises(
IndexError, self.xavp.get, '$xavp(test[1]=>koko[10])')
IndexError, self.xavp.get, "$xavp(test[1]=>koko[10])"
)
def test_get_value(self):
self.assertEqual(self.xavp.get('$xavp(test=>koko)'), 1)
self.assertEqual(self.xavp.get('$xavp(test[1]=>koko[1])'), 2)
self.assertEqual(self.xavp.get('$xavp(test[2]=>lola[0])'), 7)
self.assertEqual(self.xavp.get("$xavp(test=>koko)"), 1)
self.assertEqual(self.xavp.get("$xavp(test[1]=>koko[1])"), 2)
self.assertEqual(self.xavp.get("$xavp(test[2]=>lola[0])"), 7)
def test_get_value_all(self):
self.assertCountEqual(self.xavp.get('$xavp(test[1]=>koko[*])'), [1, 2])
self.assertCountEqual(self.xavp.get("$xavp(test[1]=>koko[*])"), [1, 2])
class TestCheckFlowVars(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.check_ok = [
{'R0': {'$xavp(v0)': [{'k0': [1],
'k1': ['a', 'b', 'fuckthisshit']},
{'k0': [1, 2], 'k1': ['a', ]},
],
'fU': 'testpep',
}},
{'R1': {'$xavp(v0)': [{'k0': [1, 2]}]}},
]
self.check_ko = [
{'R0': {'$xavp(v0)': [{'k0': ['a', 'b']}]}},
]
self.scen_ko = [
{'R0': {'$xavp(v0[0]=>k0[*])': ['a']}},
]
self.scen_noxavp = [
{'R0': {'fU': 'testpep'}},
{'R1': {}},
{
"R0": {
"$xavp(v0)": [
{"k0": [1], "k1": ["a", "b", "fuckthisshit"]},
{"k0": [1, 2], "k1": ["a"]},
],
"fU": "testpep",
}
},
{"R1": {"$xavp(v0)": [{"k0": [1, 2]}]}},
]
self.check_ko = [{"R0": {"$xavp(v0)": [{"k0": ["a", "b"]}]}}]
self.scen_ko = [{"R0": {"$xavp(v0[0]=>k0[*])": ["a"]}}]
self.scen_noxavp = [{"R0": {"fU": "testpep"}}, {"R1": {}}]
self.scen = [
{'R0': {'$xavp(v0[0]=>k0[0])': 1,
'$xavp(v0[0]=>k1[0])': 'a',
'$xavp(v0[0]=>k1[2])': '^f',
'$var(no)': 'None',
'$xavp(nono=>koko)': 'None',
'$xavp(v0=>k10)': 'None',
'$xavp(v0[1]=>k0[1])': '\d+',
'$xavp(v0[0]=>k1[*])': [1],
'$xavp(v0[0]=>k1[*])': ['a', 'b', 'fuckthisshit']}
},
{'R1': {'$xavp(v0[1]=>k0[0])': 1}},
{
"R0": {
"$xavp(v0[0]=>k0[0])": 1,
"$xavp(v0[0]=>k1[0])": "a",
"$xavp(v0[0]=>k1[2])": "^f",
"$var(no)": "None",
"$xavp(nono=>koko)": "None",
"$xavp(v0=>k10)": "None",
"$xavp(v0[1]=>k0[1])": r"\d+",
"$xavp(v0[0]=>k1[*])": ["a", "b", "fuckthisshit"],
}
},
{"R1": {"$xavp(v0[1]=>k0[0])": 1}},
]
def testXAvp(self):
data = self.check_ok[0]['R0']['$xavp(v0)']
xavp = XAvp('$xavp(v0)', data)
data = self.check_ok[0]["R0"]["$xavp(v0)"]
xavp = XAvp("$xavp(v0)", data)
self.assertEqual(xavp.get('$xavp(v0=>k0)'), 1)
self.assertEqual(xavp.get("$xavp(v0=>k0)"), 1)
self.assertEqual(
xavp.get('$xavp(v0[0]=>k1[*])'), ['a', 'b', 'fuckthisshit'])
self.assertEqual(xavp.get('$xavp(v0[1]=>k0[1])'), 2)
self.assertEqual(xavp.get('$xavp(v0[1]=>k1[*])'), ['a'])
xavp.get("$xavp(v0[0]=>k1[*])"), ["a", "b", "fuckthisshit"]
)
self.assertEqual(xavp.get("$xavp(v0[1]=>k0[1])"), 2)
self.assertEqual(xavp.get("$xavp(v0[1]=>k1[*])"), ["a"])
def testFlow_noxavp(self):
check_flow(self.scen_noxavp, self.check_ok, self.ctest)
@ -133,14 +133,14 @@ class TestCheckFlowVars(unittest.TestCase):
def testFlowVars_noxavp(self):
check_flow_vars(
'RO', self.scen_noxavp[0]['R0'],
self.check_ok[0]['R0'], self.ctest)
"RO", self.scen_noxavp[0]["R0"], self.check_ok[0]["R0"], self.ctest
)
self.assertFalse(self.ctest.isError())
def testFlowVars_xavp(self):
check_flow_vars(
'RO', self.scen[0]['R0'],
self.check_ok[0]['R0'], self.ctest)
"RO", self.scen[0]["R0"], self.check_ok[0]["R0"], self.ctest
)
self.assertFalse(self.ctest.isError())
def testFlow_fail(self):
@ -151,62 +151,57 @@ class TestCheckFlowVars(unittest.TestCase):
def testFlowVars_fail(self):
check_flow_vars(
'RO', self.scen_ko[0]['R0'],
self.check_ko[0]['R0'], self.ctest)
"RO", self.scen_ko[0]["R0"], self.check_ko[0]["R0"], self.ctest
)
tap = str(self.ctest)
self.assertTrue(self.ctest.isError(), tap)
self.assertIsNotNone(not_ok.search(tap), tap)
class TestCheckSipIn(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.msg = open('./tests/fixtures/sip_in.txt', 'r').read()
self.msg = open("./tests/fixtures/sip_in.txt", "r").read()
def testSipIn(self):
sip_in = load_yaml('./tests/fixtures/test_sip_in.yml')
check_sip(sip_in, self.msg, self.ctest)
sip_in = load_yaml("./tests/fixtures/test_sip_in.yml")
check_sip(Section.SIP_IN, sip_in, self.msg, self.ctest)
self.assertFalse(self.ctest.isError())
class TestCheckSipOut(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.msg = load_yaml('./tests/fixtures/sip_out.yml')
self.msg = load_yaml("./tests/fixtures/sip_out.yml")
def testSipOut(self):
sip_out = load_yaml('./tests/fixtures/test_sip_out.yml')
check_sip_out(sip_out, self.msg, self.ctest)
sip_out = load_yaml("./tests/fixtures/test_sip_out.yml")
check_sip_out(Section.SIP_OUT, sip_out, self.msg, self.ctest)
self.assertFalse(self.ctest.isError())
class TestJson(unittest.TestCase):
def setUp(self):
self.ctest = Test()
def testFail(self):
check = load_json('./tests/fixtures/fail.json')
scen = load_yaml('./tests/fixtures/scen_fail.yml')
check_flow(scen['flow'], check['flow'], self.ctest)
check = load_json("./tests/fixtures/fail.json")
scen = load_yaml("./tests/fixtures/scen_fail.yml")
check_flow(scen["flow"], check["flow"], self.ctest)
tap = str(self.ctest)
self.assertTrue(self.ctest.isError(), tap)
self.assertIsNotNone(not_ok.search(tap), tap)
if __name__ == '__main__':
if __name__ == "__main__":
suite = unittest.TestSuite()
suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestXAvp))
suite.addTest(
unittest.defaultTestLoader.loadTestsFromTestCase(TestCheckFlowVars))
suite.addTest(
unittest.defaultTestLoader.loadTestsFromTestCase(TestCheckSipIn))
suite.addTest(
unittest.defaultTestLoader.loadTestsFromTestCase(TestCheckSipOut))
suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TestJson))
load = unittest.defaultTestLoader.loadTestsFromTestCase
suite.addTest(load(TestXAvp))
suite.addTest(load(TestCheckFlowVars))
suite.addTest(load(TestCheckSipIn))
suite.addTest(load(TestCheckSipOut))
suite.addTest(load(TestJson))
result = junitxml.JUnitXmlResult(sys.stdout)
result.startTestRun()
suite.run(result)

Loading…
Cancel
Save