TT#116101 bin/generate_test_tt2.py

sipp test generator

Change-Id: I793a4d48b62166100905402bc7bdbee3c0e110c1
mr10.0
Victor Seva 5 years ago
parent 493423a911
commit e7b9a38eff

@ -74,6 +74,10 @@ test_detect_network: tests/test_detect_network.py
mkdir -p $(RESULTS)
pytest-3 --junitxml=${RESULTS}/$(@).xml $(<)
test_generate_test_tt2: tests/test_generate_test_tt2.py
mkdir -p $(RESULTS)
pytest-3 --junitxml=${RESULTS}/$(@).xml $(<)
# run this in parallel!! -j is your friend
test: $(TESTS) test_check test_detect_network

@ -0,0 +1,210 @@
#!/usr/bin/python3
#
# Copyright: 2021 Sipwise Development Team <support@sipwise.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This package is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
#
from pathlib import Path
import os
import logging
import re
import argparse
import subprocess
from yaml import load, dump
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
def load_yaml(filepath):
output = None
with open(filepath, "r") as file:
output = load(file, Loader=Loader)
return output
def load_msg(filepath):
mark = re.compile(r"^-+ .+")
recv = re.compile(r"^(TCP|UDP) message received .+")
res = []
msg = []
flag = False
with open(filepath, "r") as file:
line = file.readline()
while line:
if mark.match(line):
if flag and len(msg) > 0:
res.append(msg)
msg = []
flag = False
elif recv.match(line):
line = file.readline() # eat next empty line
flag = True
elif flag:
msg.append(line)
line = file.readline()
if flag and len(msg) > 0:
res.append(msg)
msg = []
return res
def is_zero(matchobj):
if matchobj.group(1) == "0":
return ": 0"
return r": \\d+"
class Generator:
common_hdrs = set(["Via", "Route", "Call-ID", "Expires", "Max-Forwards"])
hdr_re = re.compile(r"^([\w-]+): .+")
@classmethod
def get_filters(cls, args):
filters = set()
if args.filter_common:
cls.common_hdrs = set(args.filter_common)
if args.filter:
filters = set(args.filter)
return cls.common_hdrs | filters
@classmethod
def generate_rules(self, ids):
rules = []
id_dom = ids["domains"][0]
def add_sip_username(val, tt):
rules.append(
(r"<sip:{}@".format(val), r"<sip:[% {} %]@".format(tt))
)
for subs in ids[id_dom]:
add_sip_username(
ids[id_dom][subs]["phone_number"],
r"{}.{}.phone_number".format(id_dom, subs),
)
for idx, scen in enumerate(ids["scenarios"]):
add_sip_username(
scen["username"], r"scenarios.{}.username".format(idx)
)
if scen["devid"] != scen["username"]:
add_sip_username(
scen["devid"], r"scenarios.{}.devid".format(idx)
)
return rules
def __init__(self, _hdrs, _msg, _ids):
# ignore case for headers
self.hdrs = [hdr.lower() for hdr in _hdrs]
self.msgs = _msg
self.ids = _ids
self.ids_rules = self.generate_rules(_ids)
@classmethod
def get_header(self, line):
m = self.hdr_re.match(line)
if m:
# ignore case for header
return m.group(1).lower()
def filter_hdr(self, line):
hdr = self.get_header(line)
if hdr and hdr in self.hdrs:
return True, hdr
return False, hdr
def subst_common(self, line, hdr):
rules = []
if hdr is None:
pass
elif hdr in ["from", "to"]:
rules.append((r";tag=[^;]+", r";tag=[\\w-]+"))
elif hdr == "www-authenticate":
rules.append((r"nonce=\"[^\"]+", 'nonce="[^"]+'))
elif hdr in ["server", "user-agent"]:
rules.append(
(
r": Sipwise NGCP (Proxy|Application|PBX|LB).+",
r": Sipwise NGCP \1",
)
)
elif hdr == "content-length":
rules.append((r":\s+(\d+)", is_zero))
for rule in rules:
line = re.sub(rule[0], rule[1], line, flags=re.IGNORECASE)
return line
def subst_ids(self, line, hdr):
for rule in self.ids_rules:
line = re.sub(rule[0], rule[1], line)
return line
def process_msg(self, msg):
res = []
for line in msg:
ok, hdr = self.filter_hdr(line)
if ok:
continue
l_common = self.subst_common(line, hdr)
l_ids = self.subst_ids(l_common, hdr)
if line != l_ids:
logging.info("line:{} => {}".format(line, l_ids))
l = l_ids.replace("\n", "")
if len(l) > 0:
res.append(l)
return res
def __str__(self):
res = {"messages": []}
for msg in self.msgs:
res["messages"].append(self.process_msg(msg))
return dump(res, Dumper=Dumper)
def main(args):
msgs = load_msg(args.sipp_msg)
ids = load_yaml(args.scen_ids)
g = Generator(Generator.get_filters(args), msgs, ids)
print(g)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="generate test_yml.tt2 files from sipp msg file"
)
parser.add_argument(
"scen_ids", help="path to the scenario_ids.yml file", type=Path
)
parser.add_argument(
"sipp_msg", help="path to the sipp msg file", type=Path
)
parser.add_argument("--filter", nargs="?", help="remove this header")
parser.add_argument("--filter-common", help="filter common headers")
parser.add_argument(
"--verbose",
"-v",
help="verbose mode",
action="store_const",
dest="loglevel",
const=logging.INFO,
)
args = parser.parse_args()
logging.basicConfig(level=args.loglevel)
main(args)

@ -0,0 +1,29 @@
---
auth_fail_scenarios_test:
testuser1003:
ac: "1"
cc: "43"
phone_number: "4311000"
sn: 1000
uuid: 21b2b667-7e75-459a-9161-1eb93c57f90a
customer_test:
id: "7"
domains:
- auth_fail_scenarios_test
extra_info: {}
scenarios:
- devid: testuser1003
domain: auth-fail.scenarios.test
ip: 10.20.29.2
mport: 45003
port: 51602
responders:
- domain: auth-fail.scenarios.test
ip: 10.20.29.2
mport: 45006
port: 51603
proto: udp
register: no
username: testuser1003
username: testuser1003
server_ip: 192.168.2.121

@ -0,0 +1,148 @@
----------------------------------------------- 2021-07-12 11:47:35.854097
UDP message sent (404 bytes):
REGISTER sip:testuser1003@auth-fail.scenarios.test SIP/2.0
Via: SIP/2.0/UDP 10.20.29.2:51602;branch=z9hG4bK-29092-1-0
From: <sip:testuser1003@auth-fail.scenarios.test>;tag=29092SIPpTag001
To: <sip:testuser1003@auth-fail.scenarios.test>
Call-ID: NGCP%auth_fail%///1-29092@10.20.29.2
CSeq: 1 REGISTER
Contact: <sip:testuser1003@10.20.29.2:51602>
Expires: 600
Max-Forwards: 70
Content-Length: 0
----------------------------------------------- 2021-07-12 11:47:35.862022
UDP message received [483] bytes :
SIP/2.0 401 Unauthorized
Via: SIP/2.0/UDP 10.20.29.2:51602;rport=51602;branch=z9hG4bK-29092-1-0
From: <sip:testuser1003@auth-fail.scenarios.test>;tag=29092SIPpTag001
To: <sip:testuser1003@auth-fail.scenarios.test>;tag=95c37a12bff1a2c36d72bf8333176544.78550000
Call-ID: NGCP%auth_fail%///1-29092@10.20.29.2
CSeq: 1 REGISTER
WWW-Authenticate: Digest realm="auth-fail.scenarios.test", nonce="YOwQ42DsD7evuVhxpdWTJe37BDP8EKRf"
Server: Sipwise NGCP Proxy 9.X
Content-Length: 0
----------------------------------------------- 2021-07-12 11:47:35.868422
UDP message sent (612 bytes):
REGISTER sip:testuser1003@auth-fail.scenarios.test SIP/2.0
Via: SIP/2.0/UDP 10.20.29.2:51602;branch=z9hG4bK-29092-1-3
From: <sip:testuser1003@auth-fail.scenarios.test>;tag=29092SIPpTag001
To: <sip:testuser1003@auth-fail.scenarios.test>
Call-ID: NGCP%auth_fail%///1-29092@10.20.29.2
CSeq: 2 REGISTER
Authorization: Digest username="testuser1003",realm="auth-fail.scenarios.test",uri="sip:192.168.2.121:5060",nonce="YOwQ42DsD7evuVhxpdWTJe37BDP8EKRf",response="5a694615df518a21232659b1ae37b73d",algorithm=MD5
Contact: <sip:testuser1003@10.20.29.2:51602>
Expires: 600
Max-Forwards: 70
Content-Length: 0
----------------------------------------------- 2021-07-12 11:47:35.874829
UDP message received [483] bytes :
SIP/2.0 401 Unauthorized
Via: SIP/2.0/UDP 10.20.29.2:51602;rport=51602;branch=z9hG4bK-29092-1-3
From: <sip:testuser1003@auth-fail.scenarios.test>;tag=29092SIPpTag001
To: <sip:testuser1003@auth-fail.scenarios.test>;tag=95c37a12bff1a2c36d72bf8333176544.78550000
Call-ID: NGCP%auth_fail%///1-29092@10.20.29.2
CSeq: 2 REGISTER
WWW-Authenticate: Digest realm="auth-fail.scenarios.test", nonce="YOwQ42DsD7evuVhxpdWTJe37BDP8EKRf"
Server: Sipwise NGCP Proxy 9.X
Content-Length: 0
----------------------------------------------- 2021-07-12 11:47:35.882794
UDP message sent (612 bytes):
REGISTER sip:testuser1003@auth-fail.scenarios.test SIP/2.0
Via: SIP/2.0/UDP 10.20.29.2:51602;branch=z9hG4bK-29092-1-6
From: <sip:testuser1003@auth-fail.scenarios.test>;tag=29092SIPpTag001
To: <sip:testuser1003@auth-fail.scenarios.test>
Call-ID: NGCP%auth_fail%///1-29092@10.20.29.2
CSeq: 3 REGISTER
Authorization: Digest username="testuser1003",realm="auth-fail.scenarios.test",uri="sip:192.168.2.121:5060",nonce="YOwQ42DsD7evuVhxpdWTJe37BDP8EKRf",response="5a694615df518a21232659b1ae37b73d",algorithm=MD5
Contact: <sip:testuser1003@10.20.29.2:51602>
Expires: 600
Max-Forwards: 70
Content-Length: 0
----------------------------------------------- 2021-07-12 11:47:35.890672
UDP message received [483] bytes :
SIP/2.0 401 Unauthorized
Via: SIP/2.0/UDP 10.20.29.2:51602;rport=51602;branch=z9hG4bK-29092-1-6
From: <sip:testuser1003@auth-fail.scenarios.test>;tag=29092SIPpTag001
To: <sip:testuser1003@auth-fail.scenarios.test>;tag=95c37a12bff1a2c36d72bf8333176544.78550000
Call-ID: NGCP%auth_fail%///1-29092@10.20.29.2
CSeq: 3 REGISTER
WWW-Authenticate: Digest realm="auth-fail.scenarios.test", nonce="YOwQ42DsD7evuVhxpdWTJe37BDP8EKRf"
Server: Sipwise NGCP Proxy 9.X
Content-Length: 0
----------------------------------------------- 2021-07-12 11:47:35.890805
UDP message sent (612 bytes):
REGISTER sip:testuser1003@auth-fail.scenarios.test SIP/2.0
Via: SIP/2.0/UDP 10.20.29.2:51602;branch=z9hG4bK-29092-1-9
From: <sip:testuser1003@auth-fail.scenarios.test>;tag=29092SIPpTag001
To: <sip:testuser1003@auth-fail.scenarios.test>
Call-ID: NGCP%auth_fail%///1-29092@10.20.29.2
CSeq: 4 REGISTER
Authorization: Digest username="testuser1003",realm="auth-fail.scenarios.test",uri="sip:192.168.2.121:5060",nonce="YOwQ42DsD7evuVhxpdWTJe37BDP8EKRf",response="5a694615df518a21232659b1ae37b73d",algorithm=MD5
Contact: <sip:testuser1003@10.20.29.2:51602>
Expires: 600
Max-Forwards: 70
Content-Length: 0
----------------------------------------------- 2021-07-12 11:47:35.902452
UDP message received [483] bytes :
SIP/2.0 401 Unauthorized
Via: SIP/2.0/UDP 10.20.29.2:51602;rport=51602;branch=z9hG4bK-29092-1-9
From: <sip:testuser1003@auth-fail.scenarios.test>;tag=29092SIPpTag001
To: <sip:testuser1003@auth-fail.scenarios.test>;tag=95c37a12bff1a2c36d72bf8333176544.78550000
Call-ID: NGCP%auth_fail%///1-29092@10.20.29.2
CSeq: 4 REGISTER
WWW-Authenticate: Digest realm="auth-fail.scenarios.test", nonce="YOwQ42DsD7evuVhxpdWTJe37BDP8EKRf"
Server: Sipwise NGCP Proxy 9.X
Content-Length: 0
----------------------------------------------- 2021-07-12 11:47:35.902720
UDP message sent (613 bytes):
REGISTER sip:testuser1003@auth-fail.scenarios.test SIP/2.0
Via: SIP/2.0/UDP 10.20.29.2:51602;branch=z9hG4bK-29092-1-12
From: <sip:testuser1003@auth-fail.scenarios.test>;tag=29092SIPpTag001
To: <sip:testuser1003@auth-fail.scenarios.test>
Call-ID: NGCP%auth_fail%///1-29092@10.20.29.2
CSeq: 5 REGISTER
Authorization: Digest username="testuser1003",realm="auth-fail.scenarios.test",uri="sip:192.168.2.121:5060",nonce="YOwQ42DsD7evuVhxpdWTJe37BDP8EKRf",response="5a694615df518a21232659b1ae37b73d",algorithm=MD5
Contact: <sip:testuser1003@10.20.29.2:51602>
Expires: 600
Max-Forwards: 70
Content-Length: 0
----------------------------------------------- 2021-07-12 11:47:35.902845
UDP message received [383] bytes :
SIP/2.0 403 Try again later
Via: SIP/2.0/UDP 10.20.29.2:51602;branch=z9hG4bK-29092-1-12;rport=51602
From: <sip:testuser1003@auth-fail.scenarios.test>;tag=29092SIPpTag001
To: <sip:testuser1003@auth-fail.scenarios.test>;tag=0bfe5d278ccafe7e169db3d564176051.e6880ed6
Call-ID: NGCP%auth_fail%///1-29092@10.20.29.2
CSeq: 5 REGISTER
Server: Sipwise NGCP LB 9.X
Content-Length: 0

@ -0,0 +1,36 @@
messages:
- - SIP/2.0 401 Unauthorized
- 'From: <sip:[% scenarios.0.username %]@auth-fail.scenarios.test>;tag=[\w-]+'
- 'To: <sip:[% scenarios.0.username %]@auth-fail.scenarios.test>;tag=[\w-]+'
- 'CSeq: 1 REGISTER'
- 'WWW-Authenticate: Digest realm="auth-fail.scenarios.test", nonce="[^"]+"'
- 'Server: Sipwise NGCP Proxy'
- 'Content-Length: 0'
- - SIP/2.0 401 Unauthorized
- 'From: <sip:[% scenarios.0.username %]@auth-fail.scenarios.test>;tag=[\w-]+'
- 'To: <sip:[% scenarios.0.username %]@auth-fail.scenarios.test>;tag=[\w-]+'
- 'CSeq: 2 REGISTER'
- 'WWW-Authenticate: Digest realm="auth-fail.scenarios.test", nonce="[^"]+"'
- 'Server: Sipwise NGCP Proxy'
- 'Content-Length: 0'
- - SIP/2.0 401 Unauthorized
- 'From: <sip:[% scenarios.0.username %]@auth-fail.scenarios.test>;tag=[\w-]+'
- 'To: <sip:[% scenarios.0.username %]@auth-fail.scenarios.test>;tag=[\w-]+'
- 'CSeq: 3 REGISTER'
- 'WWW-Authenticate: Digest realm="auth-fail.scenarios.test", nonce="[^"]+"'
- 'Server: Sipwise NGCP Proxy'
- 'Content-Length: 0'
- - SIP/2.0 401 Unauthorized
- 'From: <sip:[% scenarios.0.username %]@auth-fail.scenarios.test>;tag=[\w-]+'
- 'To: <sip:[% scenarios.0.username %]@auth-fail.scenarios.test>;tag=[\w-]+'
- 'CSeq: 4 REGISTER'
- 'WWW-Authenticate: Digest realm="auth-fail.scenarios.test", nonce="[^"]+"'
- 'Server: Sipwise NGCP Proxy'
- 'Content-Length: 0'
- - SIP/2.0 403 Try again later
- 'From: <sip:[% scenarios.0.username %]@auth-fail.scenarios.test>;tag=[\w-]+'
- 'To: <sip:[% scenarios.0.username %]@auth-fail.scenarios.test>;tag=[\w-]+'
- 'CSeq: 5 REGISTER'
- 'Server: Sipwise NGCP LB'
- 'Content-Length: 0'

@ -9,29 +9,27 @@ import shutil
@pytest.fixture()
def detect_network(tmpdir, *args):
testbin = tmpdir.mkdir('bin')
testbin = tmpdir.mkdir("bin")
def run(*args, env={}, nodename=None):
testenv = {
'PATH': '{}:/usr/bin:/bin:/usr/sbin:/sbin'.format(testbin),
}
testenv = {"PATH": "{}:/usr/bin:/bin:/usr/sbin:/sbin".format(testbin)}
testenv.update(env)
if nodename:
nodename_path = os.path.join(testbin, 'ngcp-nodename')
with open(nodename_path, 'w') as fd:
nodename_path = os.path.join(testbin, "ngcp-nodename")
with open(nodename_path, "w") as fd:
fd.write('#!/bin/sh\necho "{}"\n'.format(nodename))
os.chmod(nodename_path, 755)
cmd = ["./bin/detect_network.py", "--verbose"] + list(args)
p = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=testenv)
p = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=testenv
)
stdout, stderr = p.communicate(timeout=30)
stdout, stderr = str(stdout), str(stderr)
result = namedtuple('ProcessResult',
['returncode', 'stdout', 'stderr'])(
p.returncode, stdout, stderr)
result = namedtuple(
"ProcessResult", ["returncode", "stdout", "stderr"]
)(p.returncode, stdout, stderr)
return result
return run
@ -39,7 +37,6 @@ def detect_network(tmpdir, *args):
@pytest.fixture()
def detect_network_pro(detect_network, tmpdir, *args):
def run(
*args,
config="tests/fixtures/kct_config.yml",
@ -50,8 +47,8 @@ def detect_network_pro(detect_network, tmpdir, *args):
shutil.copy(orig_config, _config)
la = [str(_config), str(network)] + list(args)
res = detect_network(*la, nodename="sp1")
flds = ['config', 'network', 'returncode', 'stdout', 'stderr']
result = namedtuple('Result', flds)(
flds = ["config", "network", "returncode", "stdout", "stderr"]
result = namedtuple("Result", flds)(
_config, network, res.returncode, res.stdout, res.stderr
)
return result
@ -61,7 +58,6 @@ def detect_network_pro(detect_network, tmpdir, *args):
@pytest.fixture()
def detect_network_ce(detect_network, tmpdir, *args):
def run(
*args,
config="tests/fixtures/kct_config.yml",
@ -72,10 +68,59 @@ def detect_network_ce(detect_network, tmpdir, *args):
shutil.copy(orig_config, _config)
la = [str(_config), str(network)] + list(args)
res = detect_network(*la, nodename="spce")
flds = ['config', 'network', 'returncode', 'stdout', 'stderr']
result = namedtuple('Result', flds)(
flds = ["config", "network", "returncode", "stdout", "stderr"]
result = namedtuple("Result", flds)(
_config, network, res.returncode, res.stdout, res.stderr
)
return result
return run
@pytest.fixture()
def generate_test_tt2(*args):
def run(*args):
cmd = ["./bin/generate_test_tt2.py"] + list(args)
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
)
stdout, stderr = p.communicate(timeout=30)
stdout, stderr = str(stdout), str(stderr)
result = namedtuple(
"ProcessResult", ["returncode", "stdout", "stderr"]
)(p.returncode, stdout, stderr)
return result
return run
@pytest.fixture()
def generate_test_tt2_file(tmpdir, *args):
fout = tmpdir.join("sip_messages00_test.yml.tt2")
ferr = tmpdir.join("stderr.txt")
def run(*args):
cmd = ["./bin/generate_test_tt2.py"] + list(args)
with open(fout, "wb") as fo, open(ferr, "wb") as fe:
with subprocess.Popen(
cmd, stdout=fo, stderr=fe, encoding="utf-8"
) as p:
p.wait(timeout=30)
print("done")
result = namedtuple(
"ProcessResult", ["returncode", "out_file", "err_file"]
)(p.returncode, fout, ferr)
# debug, only printed in logs in case of error
# print("stdout[{}]:".format(fout))
# with open(fout, "r") as file:
# sys.stdout.writelines(file.readlines())
sys.stderr.write("stderr[{}]:\n".format(ferr))
with open(ferr, "r") as file:
sys.stderr.writelines(file.readlines())
return result
return run

@ -0,0 +1,140 @@
#!/usr/bin/python3
#
# Copyright: 2021 Sipwise Development Team <support@sipwise.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This package is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
#
import os
import sys
import logging
from collections import namedtuple
import filecmp
import difflib
lib_path = os.path.abspath("bin")
sys.path.append(lib_path)
from generate_test_tt2 import load_msg, load_yaml # noqa
from generate_test_tt2 import Generator # noqa
Args = namedtuple(
"GenratorArgs", ["scen_ids", "sipp_msg", "filter", "filter_common"]
)
IDS_FILE = "tests/fixtures/auth_fail/scenario_ids.yml"
MSG_FILE = "tests/fixtures/auth_fail/sipp_scenario00.msg"
TT2_FILE = "tests/fixtures/auth_fail/sipp_scenario00_test.yml.tt2"
def check_filecontent(a, b):
with open(a, "r") as fa, open(b, "r") as fb:
lines_a = fa.readlines()
lines_b = fb.readlines()
diff = difflib.unified_diff(lines_a, lines_b, a, str(b))
sys.stderr.write("diff:\n")
sys.stderr.writelines(diff)
return filecmp.cmp(a, b)
def test_load_msg():
res = load_msg(MSG_FILE)
for i in range(len(res)):
print("res[{}][0]:{}".format(i, res[i][0]))
assert len(res) == 5
for i in range(4):
assert res[i][0] == "SIP/2.0 401 Unauthorized\n"
assert len(res[i]) == 11
assert res[4][0] == "SIP/2.0 403 Try again later\n"
assert len(res[4]) == 10
for i in range(5):
assert (
res[i][2] == "From: <sip:testuser1003@"
"auth-fail.scenarios.test>;tag=29092SIPpTag001\n"
)
def test_get_header():
hdr = Generator.get_header(
"ViA: SIP/2.0/UDP 10.20.29.2:51602"
";rport=51602;branch=z9hG4bK-29092-1-6"
)
assert hdr == "via"
assert Generator.get_header("not a header...no no via:") == None
def test_filter_hdr():
msgs = load_msg(MSG_FILE)
ids = load_yaml(IDS_FILE)
g = Generator(Generator.common_hdrs, msgs, ids)
assert g.filter_hdr(
"ViA: SIP/2.0/UDP 10.20.29.2:51602;"
"rport=51602;branch=z9hG4bK-29092-1-6"
)
assert g.filter_hdr("SIP/2.0 401 Unauthorized") == (False, None)
assert g.filter_hdr("CSeq: 1 REGISTER") == (False, "cseq")
def test_process_msg():
msgs = load_msg(MSG_FILE)
ids = load_yaml(IDS_FILE)
g = Generator(Generator.common_hdrs, msgs, ids)
res = g.process_msg(msgs[0])
assert len(res) == 7
def test_empty_args(generate_test_tt2):
res = generate_test_tt2("--filter", "CSeq")
assert res.returncode != 0
def test_hdrs_common():
assert len(Generator.common_hdrs) == 5
def test_get_filters(generate_test_tt2):
args = Args(None, None, ["CSeq", "WWW-Authenticate"], None)
res = Generator.get_filters(args)
assert res == set(
[
"CSeq",
"WWW-Authenticate",
"Via",
"Route",
"Call-ID",
"Expires",
"Max-Forwards",
]
)
assert Generator.common_hdrs == set(
["Via", "Route", "Call-ID", "Expires", "Max-Forwards"]
)
args = Args(None, None, None, None)
res = Generator.get_filters(args)
assert Generator.common_hdrs == set(
["Via", "Route", "Call-ID", "Expires", "Max-Forwards"]
)
def test_ok(generate_test_tt2_file, caplog):
caplog.set_level(logging.DEBUG)
res = generate_test_tt2_file(IDS_FILE, MSG_FILE)
assert check_filecontent(TT2_FILE, res.out_file)
assert res.returncode == 0
Loading…
Cancel
Save