kamailio-config-tests/bin/generate_test_tt2.py

525 lines
17 KiB

#!/usr/bin/python3
#
# Copyright: 2021 Sipwise Development Team <support@sipwise.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This package is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
#
import sys
from pathlib import Path
import os
import logging
import re
import argparse
import subprocess
import json
from yaml import load
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
def load_yaml(filepath):
output = None
with open(filepath, "r") as file:
output = load(file, Loader=Loader)
return output
def load_sipp_msg(filepath):
mark = re.compile(r"^-+ .+")
recv = re.compile(r"^(TCP|UDP) message received .+")
res = []
msg = []
flag = False
with open(filepath, "r") as file:
line = file.readline()
while line:
if mark.match(line):
if flag and len(msg) > 0:
res.append(msg)
msg = []
flag = False
elif recv.match(line):
line = file.readline() # eat next empty line
flag = True
elif flag:
msg.append(line)
line = file.readline()
if flag and len(msg) > 0:
res.append(msg)
msg = []
return res
def load_json(filepath):
output = None
with open(filepath, "r") as file:
output = json.load(file)
return output
def is_zero(matchobj):
if matchobj.group(1) == "0":
return r":\s+0"
return r":\s+\d+"
class Generator:
common_hdrs = set(["Via", "Route", "Call-ID", "Expires", "Max-Forwards"])
hdr_re = re.compile(r"^([\w-]+): .+")
def __init__(self, _hdrs, _ids):
# ignore case for headers
self.hdrs = [hdr.lower() for hdr in _hdrs]
self.ids = _ids
self.ids_rules = self.generate_rules(_ids)
@classmethod
def get_filters(cls, args):
filters = set()
if args.filter_common:
cls.common_hdrs = set(args.filter_common)
if args.filter:
filters = set(args.filter)
return cls.common_hdrs | filters
@classmethod
def generate_rules(self, ids) -> list:
rules = []
id_dom = ids["domains"][0]
try:
server_ip = ids["server_ip"]
except KeyError:
server_ip = False
def sip_rule(subs, tt, field):
str_val = str(subs[field])
if str_val.startswith("00"):
r = (
r"(sip|tel):{}([^\d])".format(subs[field]),
r"\1:[% {}.{} %]\2".format(tt, field),
)
rules.append(r)
logging.info("rule:[{0}]=>[{1}]".format(r[0], r[1]))
else:
rules.append(
(
r"(sip|tel):(\\\+|00)?{}([^\d])".format(subs[field]),
r"\1:\2[% {}.{} %]\3".format(tt, field),
)
)
rules.append(
(
r"\"{}\"".format(subs[field]),
'"[% {}.{} %]"'.format(tt, field),
)
)
rules.append(
(
r"primary={}([^0-9]+|$)".format(subs[field]),
r"primary=[% {}.{} %]\1".format(tt, field),
)
)
rules.append(
(
r"alias={}([^0-9]+|$)".format(subs[field]),
r"alias=[% {}.{} %]\1".format(tt, field),
)
)
rules.append(
(
r"P-First-Calle(e|r)-(U|N)PN: {}".format(subs[field]),
r"P-First-Calle\1-\2PN: [% {}.{} %]".format(tt, field),
)
)
def sdp_rule(val, tt):
rules.append(
(
r"^c=IN IP(\d) {}".format(val),
r"c=IN IP\1 [% {} %]".format(tt),
)
)
rules.append(
(
r"^o=(.+) \d+ \d+ IN IP(\d) {}".format(val),
r"o=\1 \\d+ \\d+ IN IP\2 [% {} %]".format(tt),
)
)
def add_sip(subs, tt):
sip_rule(subs, tt, "username")
if "devid" in subs:
if subs["username"] != subs["devid"]:
sip_rule(subs, tt, "devid")
def add_sip_full(subs, tt):
rules.append(
(
r"(sip|tel):(\\\+|00)?{}@{}:{}".format(
subs["username"], subs["ip"], subs["port"]
),
r"\1:\2[% {0}.username %]@"
"[% {0}.ip %]:[% {0}.port %]".format(tt),
)
)
rules.append(
(
r"sip:([^@]+@)?{}:{}([^\d])".format(
subs["ip"], subs["port"]
),
r"sip:\1[% {0}.ip %]:[% {0}.port %]\2".format(tt),
)
)
rules.append(
(
r"sip:([^@]+@)?{}([^:])".format(subs["ip"]),
r"sip:\1[% {0}.ip %]\2".format(tt),
)
)
def add_socket(scen, tt):
rules.append(
(
r";socket=(udp|tcp):{}:{}([^;>]+)".format(
scen["ip"], scen["port"]
),
r";socket=\1:[% {0}.ip %]:[% {0}.port %]\2".format(tt),
)
)
sdp_rule(scen["ip"], f"{tt}.ip")
rules.append(
(
r"^m=audio {} (.+)".format(scen["mport"]),
r"m=audio [% {}.mport %] \1".format(tt),
)
)
rules.append(
(
r";ip={};port={}(;?)".format(scen["ip"], scen["port"]),
r";ip=[% {0}.ip %];port=[% {0}.port %]\1".format(tt),
)
)
if server_ip:
rules.append(
(
r";socket=(sip|udp|tcp):{}:5060".format(server_ip),
r";socket=\1:[% server_ip %]:5060",
)
)
rules.append(
(r"sip:([^@]+@){}".format(server_ip), r"sip:\1[% server_ip %]")
)
rules.append((r"sip:{}".format(server_ip), f"sip:[% server_ip %]"))
rules.append(
(
r"(udp|tcp):{}:5060".format(server_ip),
r"\1:[% server_ip %]:5060",
)
)
sdp_rule(server_ip, "server_ip")
# priority on full match
for idx, scen in enumerate(ids["scenarios"]):
add_sip_full(scen, f"scenarios.{idx}")
add_socket(scen, f"scenarios.{idx}")
for jdx, resp in enumerate(scen["responders"]):
add_sip_full(resp, f"scenarios.{idx}.responders.{jdx}")
add_socket(resp, f"scenarios.{idx}.responders.{jdx}")
for idx, scen in enumerate(ids["scenarios"]):
add_sip(scen, f"scenarios.{idx}")
for jdx, resp in enumerate(scen["responders"]):
add_sip(resp, f"scenarios.{idx}.responders.{jdx}")
for key in ids[id_dom]:
subs = ids[id_dom][key]
sip_rule(subs, f"{id_dom}.{key}", "phone_number")
if "pbx_extension" in subs:
sip_rule(subs, f"{id_dom}.{key}", "pbx_extension")
rules.append(
(
r"^(P-.+-PBX-Ext): (.+)",
r"\1: [% {} %]".format(
f"{id_dom}.{key}.pbx_extension"
),
)
)
if "pbx_phone_number" in subs:
sip_rule(subs, f"{id_dom}.{key}", "pbx_phone_number")
if "alias_numbers" in subs:
for idx, alias in enumerate(subs["alias_numbers"]):
sip_rule(
alias,
f"{id_dom}.{key}.alias_numbers.{idx}",
"phone_number",
)
if "phone_numbers" in ids["extra_info"]:
numbers = ids["extra_info"]["phone_numbers"]
for idx, phone_number in enumerate(numbers):
sip_rule(numbers, f"extra_info.phone_numbers", idx)
return rules
@classmethod
def get_header(self, line):
m = self.hdr_re.match(line)
if m:
# ignore case for header
return m.group(1).lower()
def filter_hdr(self, line):
hdr = self.get_header(line)
if hdr and hdr in self.hdrs:
return True, hdr
return False, hdr
def generate_common_rules(self, hdr):
rules = []
if hdr is None:
rules.append((r"^a=rtcp:\d+", r"a=rtcp:\\d+"))
rules.append((r"^m=audio \d+ (.+)", r"m=audio \\d+ \1"))
rules.append(
(r"<\\\?xml version=\"[^\"]+\"\\\?><dialog-info .+", "")
)
rules.append((r";alias=[^; ]+", r";alias=[^; ]+"))
elif hdr in ["from", "to"]:
rules.append((r";tag=[^;>]+", r";tag=[\\w-]+"))
elif hdr in [
"www-authenticate",
"authorization",
"proxy-authenticate",
"proxy-authorization",
]:
rules.append((r"nonce=\"[^\"]+", 'nonce="[^"]+'))
rules.append((r"response=\"[^\"]+", 'response="[^"]+'))
elif hdr in ["server", "user-agent"]:
rules.append(
(
r": Sipwise NGCP (Proxy|Application|PBX|LB).+",
r": Sipwise NGCP \1",
)
)
elif hdr == "content-length":
rules.append((r":\s+(\d+)", is_zero))
elif hdr == "record-route":
rules.append((r";did=[^;>]+", r";did=[^;]+"))
rules.append((r";ftag=[^;>]+", r";ftag=[^;]+"))
rules.append((r";aset=[^;>]+", r";aset=\\d+"))
rules.append((r";vsf=[^;>]+", r";vsf=[^;]+"))
rules.append((r";vst=[^;>]+", r";vst=[^;]+"))
elif hdr == "contact":
rules.append((r"expires=[1-9]\d*", r"expires=\\d+"))
rules.append((r";ngcpct=[^;>]+", r";ngcpct=[^;]+"))
rules.append((r";alias=[^;>]+", r";alias=[^;]+"))
elif hdr == "cseq":
rules.append((r":\s+\d+", r": \\d+"))
elif hdr == "subscription-state":
rules.append((r";expires=[1-9]\d*", r";expires=\\d+"))
elif hdr == "sip-if-match":
rules.append((r": (.+)", ":"))
return rules
def subst_common(self, line: str, hdr: str) -> str:
rules = self.generate_common_rules(hdr)
for rule in rules:
line = re.sub(rule[0], rule[1], line, flags=re.IGNORECASE)
return line
def subst_ids(self, line: str, hdr: str) -> str:
for rule in self.ids_rules:
line = re.sub(rule[0], rule[1], line, flags=re.IGNORECASE)
return line
@classmethod
def escape_line(self, line):
for char in ["*", "+", "?", "(", ")", "[", "]"]:
line = line.replace(f"{char}", r"\{}".format(char))
return line
def process_msg(self, msg):
res = []
for line in msg:
line = line.replace("\n", "")
if len(line) == 0:
continue
# escape special characters
line = self.escape_line(line)
ok, hdr = self.filter_hdr(line)
if ok:
continue
l_ids = self.subst_ids(line, hdr)
l = self.subst_common(l_ids, hdr)
# remove empty lines
if len(l) > 0:
if line != l:
logging.info("line:{} => {}".format(line, l))
res.append(l)
return res
def run(self, info: list):
res = []
for msg in info:
res.append(self.process_msg(msg))
return res
def sipp_process(args):
msgs = load_sipp_msg(args.input)
ids = load_yaml(args.scen_ids)
gen = Generator(Generator.get_filters(args), ids)
print("messages:")
for msg in gen.run(msgs):
sys.stdout.write("- - '")
sys.stdout.write(msg[0])
sys.stdout.write("'\n")
for idx, line in enumerate(msg[1:]):
sys.stdout.write(" - '")
sys.stdout.write(line)
sys.stdout.write("'\n")
def get_msgs(info: list):
"""transform JSON sip msg in list of lines"""
res = []
for msg in info:
res.append(msg.split("\r\n"))
return res
class CFGTGenerator(Generator):
def generate_rules(self, ids) -> list:
id_dom = ids["domains"][0]
rules = super(CFGTGenerator, self).generate_rules(ids)
def add_ngcp(scen, tt):
rules.append(
(
r"^P-NGCP-(Auth|Src)-I(p|P): {}".format(scen["ip"]),
r"P-NGCP-\1-I\2: [% {0}.ip %]".format(tt),
)
)
rules.append(
(
r"^P-NGCP-Src-Port: {}".format(scen["port"]),
r"P-NGCP-Src-Port: [% {0}.port %]".format(tt),
)
)
def add_ngcp_info(subs, tt):
rules.append(
(
r"^P(-Prev)?-Calle(r|e)-U(UID): {}".format(subs["uuid"]),
r"P\1-Calle\2-U\3: [% {0}.uuid %]".format(tt),
)
)
def add_customer_info(cust, tt):
rules.append(
(
r"(a|b)_park_domain={}([^\d])?".format(cust["id"]),
r"\1_park_domain=[% {0}.id %]\2".format(tt),
)
)
rules.append(
(
r"^P-([^:]+): {}$".format(cust["id"]),
r"P-\1: [% {0}.id %]".format(tt),
)
)
for idx, scen in enumerate(ids["scenarios"]):
add_ngcp(scen, f"scenarios.{idx}")
for jdx, resp in enumerate(scen["responders"]):
add_ngcp(resp, f"scenarios.{idx}.responders.{jdx}")
for key in ids[id_dom]:
subs = ids[id_dom][key]
add_ngcp_info(subs, f"{id_dom}.{key}")
if "customers" in ids.keys():
for customer_key in ids["customers"]:
customer = ids[customer_key]
add_customer_info(customer, f"{customer_key}")
return rules
def generate_common_rules(self, hdr: str) -> list:
rules = super(CFGTGenerator, self).generate_common_rules(hdr)
if hdr == "p-lb-uptime":
rules.append((r":\s+(\d+)", is_zero))
return rules
def cfgt_process(args):
info = load_json(args.input)
ids = load_yaml(args.scen_ids)
gen = CFGTGenerator(Generator.get_filters(args), ids)
print("flow:")
for flow in info["flow"]:
for key in flow.keys():
print(f" - {key}:")
print("sip_in:")
msgs = get_msgs(info["sip_in"])
for msg in gen.run(msgs):
for line in msg:
print(f" - '{line}'")
if len(info["sip_out"]) > 0:
print("sip_out:")
msgs = get_msgs(info["sip_out"])
for msg in gen.run(msgs):
print(" - [")
for line in msg:
print(f" '{line}',")
print(" ]")
else:
print("sip_out: []")
def main(args):
if args.type == "sipp":
sipp_process(args)
else:
cfgt_process(args)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="generate test_yml.tt2 files")
parser.add_argument(
"scen_ids", help="path to the scenario_ids.yml file", type=Path
)
parser.add_argument("input", help="path to the input file", type=Path)
parser.add_argument("--filter", nargs="?", help="remove this header")
parser.add_argument("--filter-common", help="filter common headers")
parser.add_argument(
"--type",
"-t",
choices=["sipp", "cfgt"],
default="sipp",
help="type of input file",
)
parser.add_argument(
"--verbose",
"-v",
help="verbose mode",
action="store_const",
dest="loglevel",
const=logging.INFO,
)
args = parser.parse_args()
logging.basicConfig(level=args.loglevel)
main(args)