MT#11921 fix tap-test errors and move unit-tests to tests dir

Change-Id: I5a76db17a3d9178dca42440f50b5c9af1837d80c
changes/54/1354/3
Victor Seva 11 years ago
parent 473cee37f3
commit 2383cafe85

@ -1,6 +1,6 @@
#!/usr/bin/env python
#
# Copyright: 2013 Sipwise Development Team <support@sipwise.com>
# Copyright: 2013-2015 Sipwise Development Team <support@sipwise.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -18,262 +18,291 @@
# On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
#
import io, sys, re, getopt
import io
import sys
import re
import getopt
from yaml import load
from pprint import pprint
try:
from yaml import CLoader as Loader
from yaml import CLoader as Loader
except:
from yaml import Loader
from yaml import Loader
class XAvp:
""" Class to simulate the xavp """
def __init__(self, name, data):
result = re.match('\$xavp\((\w+)\)', name)
try:
self._name = result.group(1)
except:
raise Exception('not a xavp')
self._data = data
def get(self, str):
info = XAvp.parse(str)
if self._name != info['name']:
raise KeyError(
'diferent name. name:%s != %s' % (self._name, info['name'])
)
nsize = len(self._data)
if nsize <= info['nindx']:
raise IndexError('%s has %d elements' % (self._name, nsize))
if self._data[info['nindx']].has_key(info['key']):
values = self._data[info['nindx']][info['key']]
else:
raise KeyError('no %s key found' % info['key'] )
if info['kindx'] == '*':
return values;
ksize = len(values)
if ksize <= info['kindx']:
raise IndexError('%s has %d elements not %s' % (info['key'], ksize, info['kindx']))
return values[info['kindx']]
@classmethod
def parse(cls, str):
pattern_nindx = '(\[(?P<%s>\d+)\])?' % 'nindx'
pattern_kindx = '(\[(?P<%s>\d+|\*+)\])?' % 'kindx'
pattern = '\$xavp\((?P<name>\w+)%s(=>(?P<key>\w+)%s)?\)' % (pattern_nindx, pattern_kindx)
result = re.match(pattern, str)
if result is not None:
try:
nindx = int(result.group('nindx'))
except:
nindx = 0
try:
kindx = int(result.group('kindx'))
except:
if result.group('kindx') == '*':
kindx = '*'
""" Class to simulate the xavp """
def __init__(self, name, data):
result = re.match('\$xavp\((\w+)\)', name)
try:
self._name = result.group(1)
except:
raise Exception('not a xavp')
self._data = data
def get(self, str):
info = XAvp.parse(str)
if self._name != info['name']:
raise KeyError(
'diferent name. name:%s != %s' % (self._name, info['name'])
)
nsize = len(self._data)
if nsize <= info['nindx']:
raise IndexError('%s has %d elements' % (self._name, nsize))
if info['key'] in self._data[info['nindx']]:
values = self._data[info['nindx']][info['key']]
else:
kindx = 0
return {'name': result.group('name'), 'nindx': nindx, 'key': result.group('key'), 'kindx': kindx }
else:
raise Exception('no xavp')
raise KeyError('no %s key found' % info['key'])
if info['kindx'] == '*':
return values
ksize = len(values)
if ksize <= info['kindx']:
raise IndexError('%s has %d elements not %s' %
(info['key'], ksize, info['kindx']))
return values[info['kindx']]
@classmethod
def parse(cls, str):
pattern_nindx = '(\[(?P<%s>\d+)\])?' % 'nindx'
pattern_kindx = '(\[(?P<%s>\d+|\*+)\])?' % 'kindx'
pattern = '\$xavp\((?P<name>\w+)%s(=>(?P<key>\w+)%s)?\)' % (
pattern_nindx, pattern_kindx)
result = re.match(pattern, str)
if result is not None:
try:
nindx = int(result.group('nindx'))
except:
nindx = 0
try:
kindx = int(result.group('kindx'))
except:
if result.group('kindx') == '*':
kindx = '*'
else:
kindx = 0
return {
'name': result.group('name'),
'nindx': nindx,
'key': result.group('key'),
'kindx': kindx}
else:
raise Exception('no xavp')
class Test:
""" Class to create TAP output """
def __init__(self):
self._step = []
self._errflag = False
def comment(self, msg):
""" Add a comment """
self._step.append({'result': None, 'msg_ok': msg})
def ok(self, msg = None):
""" Add a ok result """
self._step.append({'result': True, 'msg_ok': msg})
def error(self, msg_err):
""" Add an error result"""
self._step.append({'result': False, 'msg_err': msg_err})
self._errflag = True
@classmethod
def compare(cls, val0, val1):
if isinstance(val0, basestring):
result = re.search(val0, str(val1))
else:
result = (val0 == val1)
return result
def test(self, value_expected, value, msg_err, msg_ok = None):
""" Test two values and add the result"""
result = Test.compare(value_expected, value)
self._step.append({'result': result, 'msg_err': msg_err, 'msg_ok': msg_ok})
if not result:
self._errflag = True
def isError(self):
return self._errflag
def _num_tests(self):
"""get the num of tests"""
test = 0
for s in self._step:
if (s['result'] is not None):
test = test + 1
return test
def __str__(self):
"""get the TAP output"""
output = "1..%s\n" % self._num_tests()
test = 1
for s in self._step:
if (s['result'] is None):
output += '# %s\n' % s['msg_ok']
continue
elif (s['result']):
if (s['msg_ok'] is not None):
output += "ok %d - %s\n" % (test, s['msg_ok'])
""" Class to create TAP output """
def __init__(self):
self._step = []
self._errflag = False
def comment(self, msg):
""" Add a comment """
self._step.append({'result': None, 'msg_ok': msg})
def ok(self, msg=None):
""" Add a ok result """
self._step.append({'result': True, 'msg_ok': msg})
def error(self, msg_err):
""" Add an error result"""
self._step.append({'result': False, 'msg_err': msg_err})
self._errflag = True
@classmethod
def compare(cls, val0, val1):
if isinstance(val0, basestring):
result = re.search(val0, str(val1))
else:
output += "ok %d\n" % test
else:
output += "not ok %d - ERROR: %s\n" % (test, s['msg_err'])
test = test + 1
return output
result = (val0 == val1)
return result
def test(self, value_expected, value, msg_err, msg_ok=None):
""" Test two values and add the result"""
result = Test.compare(value_expected, value)
self._step.append(
{'result': result, 'msg_err': msg_err, 'msg_ok': msg_ok})
if not result:
self._errflag = True
def isError(self):
return self._errflag
def _num_tests(self):
"""get the num of tests"""
test = 0
for s in self._step:
if (s['result'] is not None):
test = test + 1
return test
def __str__(self):
"""get the TAP output"""
output = "1..%s\n" % self._num_tests()
test = 1
for s in self._step:
if (s['result'] is None):
output += '# %s\n' % s['msg_ok']
continue
elif (s['result']):
if (s['msg_ok'] is not None):
output += "ok %d - %s\n" % (test, s['msg_ok'])
else:
output += "ok %d\n" % test
else:
output += "not ok %d - ERROR: %s\n" % (test, s['msg_err'])
test = test + 1
return output
def check_flow_vars(sk, sv, cv, test):
""" check the vars on a flow level"""
for k in sv.iterkeys():
if(not cv.has_key(k)):
try:
info = XAvp.parse(k)
search_key = '$xavp(%s)' % info['name']
if(not cv.has_key(search_key)):
raise Exception("search_key: %s info:%s" % (search_key, info))
xavp = XAvp(search_key, cv[search_key])
val = xavp.get(k)
#print "testing %s == %s" % (sv[k], val)
test.test(sv[k], val, 'flow[%s] expected %s == %s but is %s' % (sk, k, sv[k], val), 'flow[%s] %s' % (sk, k))
except LookupError as err:
if(sv[k] == 'None'):
test.ok('flow[%s] %s is not there' % (sk, k))
""" check the vars on a flow level"""
for k in sv.iterkeys():
if(k not in cv):
try:
info = XAvp.parse(k)
search_key = '$xavp(%s)' % info['name']
if(search_key not in cv):
raise Exception("search_key: %s info:%s" %
(search_key, info))
xavp = XAvp(search_key, cv[search_key])
val = xavp.get(k)
# print "testing %s == %s" % (sv[k], val)
test.test(sv[k], val,
'flow[%s] expected %s == %s but is %s' %
(sk, k, sv[k], val),
'flow[%s] %s' % (sk, k))
except LookupError as err:
if(sv[k] == 'None'):
test.ok('flow[%s] %s is not there' % (sk, k))
else:
test.error('LookupError with %s. Error:%s' % (k, err))
except Exception as err:
if(sv[k] == 'None'):
test.ok('flow[%s] %s is not there' % (sk, k))
else:
test.error(
'Expected var %s on flow[%s]. %s' % (k, sk, err))
else:
test.error('LookupError with %s. Error:%s' % (k, err))
except Exception as err:
if(sv[k] == 'None'):
test.ok('flow[%s] %s is not there' % (sk, k))
else:
test.error('Expected var %s on flow[%s]. %s' % (k,sk, err))
else:
test.test(sv[k], cv[k], 'flow[%s] expected %s == %s but is %s' % (sk, k, sv[k], cv[k]), 'flow[%s] %s' % (sk, k))
test.test(sv[k], cv[k], 'flow[%s] expected %s == %s but is %s' % (
sk, k, sv[k], cv[k]), 'flow[%s] %s' % (sk, k))
def check_flow(scen, check, test):
""" checks the flow and the vars inside"""
for i in range(len(scen)):
(sk, sv) = scen[i].popitem()
try:
(ck, cv) = check[i].popitem()
except:
test.error('wrong flow. Expected: %s but is nothing there' % sk)
continue
if(sk != ck):
test.error('wrong flow. Expected: %s but is %s' % (sk, ck))
continue
if sv is None:
test.ok('flow[%s] no var to check' % sk)
continue
else:
test.ok('flow[%s]' % sk)
check_flow_vars(sk, sv, cv, test)
if(len(check)>len(scen)):
l = []
for i in check:
for k in i.keys():
l.append(k)
test.error('Expected to end but there are more flows %s' % l)
""" checks the flow and the vars inside"""
for i in range(len(scen)):
(sk, sv) = scen[i].popitem()
try:
(ck, cv) = check[i].popitem()
except:
test.error('wrong flow. Expected: %s but is nothing there' % sk)
continue
if(sk != ck):
test.error('wrong flow. Expected: %s but is %s' % (sk, ck))
continue
if sv is None:
test.ok('flow[%s] no var to check' % sk)
continue
else:
test.ok('flow[%s]' % sk)
check_flow_vars(sk, sv, cv, test)
if(len(check) > len(scen)):
l = []
for i in check:
for k in i.keys():
l.append(k)
test.error('Expected to end but there are more flows %s' % l)
def check_sip(scen, msg, test):
for rule in scen:
if rule.startswith('_:NOT:_'):
flag = False
rule = rule[7:]
msg_ok = '%s not match'
msg_ko = '%s match'
else:
flag = True
msg_ok = '%s match'
msg_ko = '%s not match'
result = re.search(rule, msg)
if (result is not None) == flag:
test.ok(msg_ok % rule)
continue
test.comment('result:%s' % result)
test.error(msg_ko % rule)
for rule in scen:
if rule.startswith('_:NOT:_'):
flag = False
rule = rule[7:]
msg_ok = '%s not match'
msg_ko = '%s match'
else:
flag = True
msg_ok = '%s match'
msg_ko = '%s not match'
result = re.search(rule, msg)
if (result is not None) == flag:
test.ok(msg_ok % rule)
continue
test.comment('result:%s' % result)
test.error(msg_ko % rule)
def check_sip_out(scen, msgs, test):
num_msgs = len(msgs)
num_scen = len(scen)
for i in (range(num_scen)):
test.comment("sip_out %d" % i)
if(i<num_msgs):
check_sip(scen[i], msgs[i], test)
else:
test.error("sip_out[%d] does not exist" % i)
if ( num_scen != num_msgs ):
test.error("we expected %d out messages but we have %d" % ( num_scen, num_msgs ))
num_msgs = len(msgs)
num_scen = len(scen)
for i in (range(num_scen)):
test.comment("sip_out %d" % i)
if(i < num_msgs):
check_sip(scen[i], msgs[i], test)
else:
test.error("sip_out[%d] does not exist" % i)
if (num_scen != num_msgs):
test.error("we expected %d out messages but we have %d" %
(num_scen, num_msgs))
def usage():
print 'Usage: check.py [-h] scenario.yml test.yml'
print '-h: this help'
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help"])
except getopt.GetoptError as err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
else:
assert False, "unhandled option"
if(len(args)!=2):
usage()
sys.exit(1)
with io.open(args[0], 'r') as file:
scen = load(file, Loader=Loader)
file.close()
test = Test()
try:
with io.open(args[1], 'r') as file:
check = load(file, Loader=Loader)
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help"])
except getopt.GetoptError as err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
else:
assert False, "unhandled option"
if(len(args) != 2):
usage()
sys.exit(1)
with io.open(args[0], 'r') as file:
scen = load(file, Loader=Loader)
file.close()
except:
check = {'flow': [], 'sip_in': '', 'sip_out': []}
test.error("Error loading file:%s" % args[1])
test.comment('check flow')
check_flow(scen['flow'], check['flow'], test)
test.comment('check sip_in')
check_sip(scen['sip_in'], check['sip_in'], test)
test.comment('check sip_out')
check_sip_out(scen['sip_out'], check['sip_out'], test)
print test
if test.isError():
sys.exit(1)
test = Test()
try:
with io.open(args[1], 'r') as file:
check = load(file, Loader=Loader)
file.close()
except:
check = {'flow': [], 'sip_in': '', 'sip_out': []}
test.error("Error loading file:%s" % args[1])
test.comment('check flow')
check_flow(scen['flow'], check['flow'], test)
test.comment('check sip_in')
check_sip(scen['sip_in'], check['sip_in'], test)
test.comment('check sip_out')
check_sip_out(scen['sip_out'], check['sip_out'], test)
print test
if test.isError():
sys.exit(1)
if __name__ == "__main__":
main()
main()

@ -18,13 +18,14 @@
# On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
#
use File::Spec;
use English;
use File::Copy;
use Tie::File;
use File::Spec;
use Getopt::Long;
use strict;
use Tie::File;
use warnings;
use YAML::Tiny;
use Getopt::Long;
sub usage
{
@ -87,12 +88,13 @@ if (lc($action) eq "off")
}
else
{
copy($file, $file.".orig") or die "Copy failed: $!" unless(-e $file.".orig");
copy($file, $file.".orig") or die "Copy failed: $ERRNO" unless(-e $file.".orig");
$yaml = YAML::Tiny->read($file) or die "File $file could not be read";
$yaml->[0]->{kamailio}{lb}{debug} = 'yes';
$yaml->[0]->{kamailio}{lb}{use_dns_cache} = 'off';
$yaml->[0]->{kamailio}{proxy}{debug} = 'yes';
$yaml->[0]->{kamailio}{proxy}{presence}{enable} = 'yes';
$yaml->[0]->{kamailio}{proxy}{fritzbox_prefixes} = [ '112', '110', '118[0-9]{2}' ];
$yaml->[0]->{sems}{debug} = 'yes';
$yaml->[0]->{checktools}{sip_check_enable} = 0;

@ -21,6 +21,7 @@
use strict;
use warnings;
use English;
use Getopt::Std;
use Cwd 'abs_path';
use YAML;
@ -109,11 +110,11 @@ sub call_prov {
});
};
if($@) {
if(ref $@ eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring;
if($EVAL_ERROR) {
if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else {
die "Voip\::$function failed: $@";
die "Voip\::$function failed: $EVAL_ERROR";
}
}
@ -121,5 +122,5 @@ sub call_prov {
}
sub usage {
return "Usage:\n$0 ncos.yml\n";
return "Usage:\n$PROGRAM_NAME ncos.yml\n";
}

@ -21,6 +21,7 @@
use strict;
use warnings;
use English;
use Getopt::Std;
use Cwd 'abs_path';
use YAML;
@ -116,7 +117,7 @@ sub do_create {
foreach (@{$peer->{groups}})
{
$_->{peering_contract_id} = $contract_id;
call_prov( $vprov, 'create_peer_group', $_ );
call_prov( $vprov, 'create_peer_group', $_ );
}
my $result = call_prov( $vprov, 'get_peer_groups');
foreach (@{$result})
@ -147,7 +148,6 @@ sub do_create {
$param = { id => $_->{id}, preferences => $host->{$_->{name}}->{preferences}};
call_prov($vprov, 'set_peer_preferences', $param);
}
}
}
}
@ -171,11 +171,11 @@ sub call_prov {
});
};
if($@) {
if(ref $@ eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring;
if($EVAL_ERROR) {
if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else {
die "Voip\::$function failed: $@";
die "Voip\::$function failed: $EVAL_ERROR";
}
}
@ -183,5 +183,5 @@ sub call_prov {
}
sub usage {
return "Usage:\n$0 peer.yml\n";
return "Usage:\n$PROGRAM_NAME peer.yml\n";
}

@ -21,6 +21,7 @@
use strict;
use warnings;
use English;
use Getopt::Std;
use Cwd 'abs_path';
use YAML;
@ -130,11 +131,11 @@ sub call_prov {
});
};
if($@) {
if(ref $@ eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring;
if($EVAL_ERROR) {
if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else {
die "Voip\::$function failed: $@";
die "Voip\::$function failed: $EVAL_ERROR";
}
}
@ -142,5 +143,5 @@ sub call_prov {
}
sub usage {
return "Usage:\n$0 scenario.yml\n";
return "Usage:\n$PROGRAM_NAME scenario.yml\n";
}

@ -21,6 +21,7 @@
use strict;
use warnings;
use English;
use YAML;
use Getopt::Long;
use Sipwise::Provisioning::Billing;
@ -40,7 +41,7 @@ our %BILLING = (
);
sub usage {
die "Usage:\n$0 scenario.yml\n".
die "Usage:\n$PROGRAM_NAME scenario.yml\n".
"Options:\n".
" -h this help\n";
}
@ -72,6 +73,7 @@ sub main
call_prov( $bprov, 'create_voip_account', { data => { %BILLING, subscribers => \@subs }});
if($debug) { print("created ".($#subs+1)." subscribers"); }
}
return;
}
sub call_prov {
@ -91,11 +93,11 @@ sub call_prov {
});
};
if($@) {
if(ref $@ eq 'SOAP::Fault') {
die "Billing\::$function failed: ". $@->faultstring;
if($EVAL_ERROR) {
if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Billing\::$function failed: ". $EVAL_ERROR->faultstring;
} else {
die "Billing\::$function failed: $@";
die "Billing\::$function failed: $EVAL_ERROR";
}
}

@ -18,23 +18,29 @@
# On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
#
import sys, os, csv, argparse
import argparse
import csv
import os
import sys
import xmlrpclib
KAM_URL='http://127.0.0.1:5062'
KAM_LINES=10
KAM_URL = 'http://127.0.0.1:5062'
KAM_LINES = 10
proxy = xmlrpclib.ServerProxy(KAM_URL)
def get_headers(l, prefix):
res = []
for i in l:
res.append("%s%s" % (prefix, i))
return res
def sum_row(s, r):
for i in ['used', 'real_used', 'free']:
s[i] = s[i] + r[i];
s[i] = s[i] + r[i]
def save_data(datafile, headers, prefix, res):
show_headers = get_headers(headers, prefix)
@ -42,12 +48,13 @@ def save_data(datafile, headers, prefix, res):
spamwriter = csv.writer(csvfile)
spamwriter.writerow(show_headers)
spamwriter = csv.DictWriter(csvfile,
headers, extrasaction='ignore')
headers, extrasaction='ignore')
for row in res:
spamwriter.writerow(row)
def get_pvm(private_file):
res = [{'pid':'SUM', 'used':0, 'real_used':0, 'free':0},]
res = [{'pid': 'SUM', 'used': 0, 'real_used': 0, 'free': 0}, ]
headers = ['pid', 'used', 'real_used', 'free']
prefix = 'rss_'
try:
@ -72,10 +79,11 @@ def get_pvm(private_file):
private_file_pp = '%s_per_pid%s' % (fileName, fileExtension)
save_data(private_file_pp, headers, prefix, res[2:])
def get_shm(share_file):
headers = ['used', 'real_used', 'max_used',
'free', 'fragments', 'total'
]
'free', 'fragments', 'total'
]
prefix = 'shared_'
try:
res = proxy.core.shmmem('b')
@ -93,18 +101,20 @@ def get_shm(share_file):
sys.exit(-2)
save_data(share_file, headers, prefix, [res, ])
def main():
parser = argparse.ArgumentParser(description='export to csv kamailio proxy stats.')
parser = argparse.ArgumentParser(
description='export to csv kamailio proxy stats.')
parser.add_argument('--private_file', '-P', nargs='?', default='pvm.csv',
help='path to the private csv file')
help='path to the private csv file')
parser.add_argument('--share_file', '-S', nargs='?', default='shm.csv',
help='path to the share csv file')
help='path to the share csv file')
parser.add_argument('--rpc_url', nargs='?', default=KAM_URL,
help='rpc URL of kamailio. Default:%s' % KAM_URL)
help='rpc URL of kamailio. Default:%s' % KAM_URL)
args = parser.parse_args()
get_pvm(args.private_file)
get_shm(args.share_file)
if __name__ == "__main__":
main()
main()

@ -18,16 +18,18 @@
On Debian systems, the complete text of the GNU General
Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
"""
import sys
import os.path
import pyinotify
import logging
import os
import os.path
import pyinotify
import sys
BASE_DIR = "/usr/share/kamailio-config-tests"
if os.environ.has_key('BASE_DIR'):
if 'BASE_DIR' in os.environ:
BASE_DIR = os.environ['BASE_DIR']
filelog = os.path.join(BASE_DIR, 'log', 'pid_watcher.log')
logging.basicConfig(filename=filelog, level=logging.DEBUG, format='%(asctime)s %(message)s')
logging.basicConfig(
filename=filelog, level=logging.DEBUG, format='%(asctime)s %(message)s')
base_path = "/var/run"
@ -49,31 +51,33 @@ watched_dirs = [
watched = {}
class Handler(pyinotify.ProcessEvent):
def my_init(self, watched):
self.watched = watched
def check_all(self):
all = True
for k,v in self.watched.iteritems():
for k, v in self.watched.iteritems():
all = (all and (v['created'] or v['modified']))
logging.info("checking: %s[%s] all:%s" % (k,v, all))
logging.info("checking: %s[%s] all:%s" % (k, v, all))
return all
def process_IN_CREATE(self, event):
if watched.has_key(event.pathname):
if event.pathname in watched:
watched[event.pathname]['created'] = True
logging.info("created %s" % event.pathname)
if self.check_all():
sys.exit(0)
def process_IN_IGNORED(self, event):
if watched.has_key(event.pathname):
if event.pathname in watched:
watched[event.pathname]['deleted'] = True
logging.info("deleted %s" % event.pathname)
def process_IN_MODIFY(self, event):
if watched.has_key(event.pathname):
if event.pathname in watched:
watched[event.pathname]['modified'] = True
logging.info("modified %s" % event.pathname)
# for debug
@ -88,7 +92,8 @@ notifier = pyinotify.Notifier(wm, default_proc_fun=handler)
for service in services:
service_pid = os.path.join(base_path, service)
logging.info("Watching %s" % service_pid)
watched[service_pid] = {'deleted': False, 'created': False, 'modified': False }
watched[service_pid] = {
'deleted': False, 'created': False, 'modified': False}
wm.add_watch(service_pid, pyinotify.IN_IGNORED)
for d in watched_dirs:
wm.add_watch(d, pyinotify.ALL_EVENTS)

@ -94,6 +94,7 @@ sub get_subs_info
{
die("domain:".$data->{domain}." not defined in subscribers");
}
return;
}
sub generate
@ -133,7 +134,7 @@ sub generate
{
# by default foreign is no
$_->{foreign} = "no" unless defined($_->{foreign});
if(!defined($_->{peer_host}) and $_->{foreign} ne "yes")
if(not defined($_->{peer_host}) and $_->{foreign} ne "yes")
{
get_subs_info($data->{subscribers}, $_);
}
@ -161,6 +162,7 @@ sub generate
}
$id++;
}
return;
}
sub generate_reg
@ -169,6 +171,7 @@ sub generate_reg
my $vars = { line => $num };
my $fn = File::Spec->catfile($base_check_dir, "sipp_scenario_responder".(sprintf "%02i", $num)."_reg.xml");
$tt->process($template_reg, $vars, $fn) or die($tt->error(), "\n");
return;
}
sub generate_presence
@ -198,6 +201,7 @@ sub generate_presence
chmod(0755, $fn);
undef $fn;
}
return;
}
sub generate_foreign_dom
@ -207,6 +211,7 @@ sub generate_foreign_dom
my $fn = IO::File->new($file, "w") or die("can't create $file");
print {$fn} "$ip $domain\n";
undef $fn;
return;
}
generate($cf);

@ -21,6 +21,7 @@
use strict;
use warnings;
use English;
use Getopt::Std;
use Cwd 'abs_path';
use YAML;
@ -134,11 +135,11 @@ sub call_prov {
});
};
if($@) {
if(ref $@ eq 'SOAP::Fault') {
die "Billing\::$function failed: ". $@->faultstring;
if($EVAL_ERROR) {
if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Billing\::$function failed: ". $EVAL_ERROR->faultstring;
} else {
die "Billing\::$function failed: $@";
die "Billing\::$function failed: $EVAL_ERROR";
}
}
@ -146,5 +147,5 @@ sub call_prov {
}
sub usage {
die "Usage:\n$0 prefs.yml\n";
die "Usage:\n$PROGRAM_NAME prefs.yml\n";
}

@ -21,6 +21,7 @@
use strict;
use warnings;
use English;
use Getopt::Std;
use Cwd 'abs_path';
use YAML;
@ -55,7 +56,7 @@ sub main {
my $param;
# cf_set
my $set = {}; # name -> set_id
my $time = {}; # name -> time_id
my $time = {}; # name -> time_id
foreach my $s (@{$cf->{$key}->{'destination_set'}})
{
$param = { username => $fields[0], domain => $fields[1], data => $s };
@ -119,11 +120,11 @@ sub call_prov {
});
};
if($@) {
if(ref $@ eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring;
if($EVAL_ERROR) {
if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else {
die "Voip\::$function failed: $@";
die "Voip\::$function failed: $EVAL_ERROR";
}
}
@ -131,5 +132,5 @@ sub call_prov {
}
sub usage {
die "Usage:\n$0 callforward.yml\n";
die "Usage:\n$PROGRAM_NAME callforward.yml\n";
}

@ -21,6 +21,7 @@
use strict;
use warnings;
use English;
use Getopt::Std;
use Cwd 'abs_path';
use YAML;
@ -80,11 +81,11 @@ sub call_prov {
});
};
if($@) {
if(ref $@ eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring;
if($EVAL_ERROR) {
if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else {
die "Voip\::$function failed: $@";
die "Voip\::$function failed: $EVAL_ERROR";
}
}
@ -92,5 +93,5 @@ sub call_prov {
}
sub usage {
die "Usage:\n$0 speeddial.yml\n";
die "Usage:\n$PROGRAM_NAME speeddial.yml\n";
}

@ -1,217 +0,0 @@
#!/usr/bin/env python
#
# Copyright: 2013 Sipwise Development Team <support@sipwise.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This package is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
#
from check import XAvp, Test, check_flow, check_flow_vars, check_sip, check_sip_out
import yaml
import unittest
class TestXAvp(unittest.TestCase):
def setUp(self):
self.name = 'test'
self.data = [
{'koko': [1]},
{'koko': [1, 2]},
{'lolo': [3], 'lola': [7]},
]
self.xavp = XAvp('$xavp(%s)' % self.name, self.data)
def test_init(self):
self.assertEqual(self.name, self.xavp._name)
self.assertItemsEqual(self.data, self.xavp._data)
def test_init_wrong_type(self):
self.assertRaises(Exception, self.xavp, '$var(whatever)', None)
def test_parse_type(self):
self.assertRaisesRegexp(Exception, 'no xavp', XAvp.parse, '$var(whatever)')
self.assertRaisesRegexp(Exception, 'no xavp', XAvp.parse, '$fU')
def test_get_wrong_name(self):
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro)')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0])')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro=>whatever)')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0]=>whatever)')
def test_get_wrong_nindx(self):
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[10]=>koko)')
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[10]=>koko[1])')
def test_get_wrong_kindx(self):
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[0]=>koko[1])')
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[1]=>koko[10])')
def test_get_value(self):
self.assertEqual(self.xavp.get('$xavp(test=>koko)'), 1)
self.assertEqual(self.xavp.get('$xavp(test[1]=>koko[1])'), 2)
self.assertEqual(self.xavp.get('$xavp(test[2]=>lola[0])'), 7)
def test_get_value_all(self):
self.assertItemsEqual(self.xavp.get('$xavp(test[1]=>koko[*])'), [1,2])
class TestCheckFlowVars(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.check_ok = [
{ 'R0': { '$xavp(v0)':
[{
'k0': [1],
'k1': ['a', 'b', 'fuckthisshit']
},
{
'k0': [1,2],
'k1': ['a',]
},
],
'fU': 'testpep',
},
},
{ 'R1': { '$xavp(v0)': [{'k0': [1,2]}] }},
]
self.scen_noxavp = [
{ 'R0': {'fU': 'testpep'} },
{ 'R1': {} },
]
self.scen = [
{ 'R0': { '$xavp(v0[0]=>k0[0])': 1,
'$xavp(v0[0]=>k1[0])': 'a',
'$xavp(v0[0]=>k1[2])': '^f',
'$var(no)': 'None',
'$xavp(nono=>koko)': 'None',
'$xavp(v0=>k10)': 'None',
'$xavp(v0[1]=>k0[1])': '\d+'}
},
{ 'R1': {'$xavp(v0[1]=>k0[0])': 1} },
]
def testXAvp(self):
data = self.check_ok[0]['R0']['$xavp(v0)']
xavp = XAvp('$xavp(v0)', data)
self.assertEqual(xavp.get('$xavp(v0=>k0)'), 1)
self.assertEqual(xavp.get('$xavp(v0=>k1[*])'), ['a','b', 'fuckthisshit'])
self.assertEqual(xavp.get('$xavp(v0[1]=>k0[1])'), 2)
self.assertEqual(xavp.get('$xavp(v0[1]=>k1[*])'), ['a'])
def testFlow_noxavp(self):
check_flow(self.scen_noxavp, self.check_ok, self.ctest)
self.assertFalse(self.ctest.isError())
def testFlowVars_noxavp(self):
check_flow_vars('RO', self.scen_noxavp[0]['R0'], self.check_ok[0]['R0'], self.ctest)
print self.ctest
self.assertFalse(self.ctest.isError())
def testFlowVars_xavp(self):
check_flow_vars('RO', self.scen[0]['R0'], self.check_ok[0]['R0'], self.ctest)
print self.ctest
self.assertFalse(self.ctest.isError())
class TestCheckSipIn(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.msg ="""REGISTER sip:testuser1003@spce.test SIP/2.0
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
Authorization: Digest username="testuser1003",realm="spce.test",uri="sip:127.0.0.1:5060",nonce="Una661J2ub8hRdmjTwus8Habu3n6G/By",response="c1b8da80c3d05f8d4da478128f94907a",algorithm=MD5
Contact: "TestBria" <sip:testuser1003@127.126.0.1:50602;ob>;reg-id=1;+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>"
Contact: "TestBria" <sip:testuser1003@127.126.0.1:6666;ob>;expires=0
Expires: 600
Max-Forwards: 16
Content-Length: 0
P-NGCP-Src-Ip: 127.126.0.1
P-NGCP-Src-Port: 50602
P-NGCP-Src-Proto: udp
P-NGCP-Src-Af: 4
P-Sock-Info: udp:127.0.0.1:5060
Path: <sip:lb@127.0.0.1;lr;socket=sip:127.0.0.1:5060>"""
def testSipIn(self):
sip_in = yaml.load("""
- '^REGISTER'
- 'Contact: "TestBria" <sip:testuser1003@127.126.0.1:50602;ob>;reg-id=1;\+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>"'
- 'Contact: "TestBria" <sip:testuser1003@127.126.0.1:6666;ob>;expires=0'
- 'Content-Length: 0'
- 'Expires: \d+'
- '_:NOT:_Expires: 0'
- 'Authorization: Digest username="testuser1003"'""")
check_sip(sip_in, self.msg, self.ctest)
print self.ctest
self.assertFalse(self.ctest.isError())
class TestCheckSipOut(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.msg = yaml.load("""
- |+
SIP/2.0 100 Trying
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
P-Out-Socket: udp:127.0.0.1:5060
Server: Sipwise NGCP Proxy 2.X
Content-Length: 0
- |+
SIP/2.0 200 OK
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0;rport=5060
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>;tag=1d24a28a0bded6c40d31e6db8aab9ac6.504e
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
P-Out-Socket: udp:127.0.0.1:5060
P-NGCP-Authorization: testuser1003@spce.test
P-NGCP-Authorized: 1
P-Caller-UUID: 2cc06244-5a63-46cd-aee9-1d804d314371
Contact: <sip:testuser1003@127.126.0.1:50602;ob>;expires=600;+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>";reg-id=1
Server: Sipwise NGCP Proxy 2.X
Content-Length: 0""")
def testSipOut(self):
sip_out = yaml.load("""
- [
'^SIP/2.0 100 Trying',
'Content-Length: 0'
]
- [
'^SIP/2.0 200 OK',
'Contact: <sip:testuser1003@127.126.0.1:50602;ob>;expires=600;\+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>";reg-id=1',
'Content-Length: 0',
'P-NGCP-Authorization: testuser1003@',
'P-NGCP-Authorized: 1',
'_:NOT:_Contact: <sip:testuser1003@127.126.0.1:6666;ob>;expires=\d+'
]""")
check_sip_out(sip_out, self.msg, self.ctest)
print self.ctest
self.assertFalse(self.ctest.isError())
if __name__ == '__main__':
unittest.main()

@ -75,7 +75,7 @@ sub save_data
YAML::DumpFile($path, $data);
#print "$data->{'msgid'} saved\n";
# This tries to fix problems with string values '-' being saved
# without quotes.
# without quotes.
tie my @array, 'Tie::File', $path or die ('Can not open $path');
for (@array)
{
@ -90,6 +90,7 @@ sub save_data
sip_in => '',
sip_out => [],
};
return;
}
my $pid;
@ -154,7 +155,7 @@ given($#ARGV)
$filename = abs_path($filename);
$output_dir = abs_path($output_dir);
my $out;
open($log, "<$filename") or die "Couldn't open kamailio log, $!";
open($log, '<', "$filename") or die "Couldn't open kamailio log, $ERRNO";
first_line();
do

@ -21,6 +21,7 @@
use strict;
use warnings;
use English;
use Cwd 'abs_path';
use YAML;
use Getopt::Long;
@ -113,11 +114,11 @@ sub call_prov {
});
};
if($@) {
if(ref $@ eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring;
if($EVAL_ERROR) {
if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else {
die "Voip\::$function failed: $@";
die "Voip\::$function failed: $EVAL_ERROR";
}
}
@ -125,5 +126,6 @@ sub call_prov {
}
sub usage {
return "Usage:\n$0 [-h] [-i IP] [-p PORT] peer_host_name peer.yml\n";
return "Usage:\n$PROGRAM_NAME [-h] [-i IP] [-p PORT]" .
" peer_host_name peer.yml\n";
}

@ -0,0 +1,19 @@
REGISTER sip:testuser1003@spce.test SIP/2.0
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
Authorization: Digest username="testuser1003",realm="spce.test",uri="sip:127.0.0.1:5060",nonce="Una661J2ub8hRdmjTwus8Habu3n6G/By",response="c1b8da80c3d05f8d4da478128f94907a",algorithm=MD5
Contact: "TestBria" <sip:testuser1003@127.126.0.1:50602;ob>;reg-id=1;+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>"
Contact: "TestBria" <sip:testuser1003@127.126.0.1:6666;ob>;expires=0
Expires: 600
Max-Forwards: 16
Content-Length: 0
P-NGCP-Src-Ip: 127.126.0.1
P-NGCP-Src-Port: 50602
P-NGCP-Src-Proto: udp
P-NGCP-Src-Af: 4
P-Sock-Info: udp:127.0.0.1:5060
Path: <sip:lb@127.0.0.1;lr;socket=sip:127.0.0.1:5060>

@ -0,0 +1,27 @@
- |+
SIP/2.0 100 Trying
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
P-Out-Socket: udp:127.0.0.1:5060
Server: Sipwise NGCP Proxy 2.X
Content-Length: 0
- |+
SIP/2.0 200 OK
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0;rport=5060
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>;tag=1d24a28a0bded6c40d31e6db8aab9ac6.504e
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
P-Out-Socket: udp:127.0.0.1:5060
P-NGCP-Authorization: testuser1003@spce.test
P-NGCP-Authorized: 1
P-Caller-UUID: 2cc06244-5a63-46cd-aee9-1d804d314371
Contact: <sip:testuser1003@127.126.0.1:50602;ob>;expires=600;+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>";reg-id=1
Server: Sipwise NGCP Proxy 2.X
Content-Length: 0

@ -0,0 +1,163 @@
#!/usr/bin/env python
#
# Copyright: 2013-2015 Sipwise Development Team <support@sipwise.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This package is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
#
import os
import sys
lib_path = os.path.abspath('bin')
sys.path.append(lib_path)
from check import check_sip, check_sip_out
from check import XAvp, Test, check_flow, check_flow_vars
import unittest
import yaml
class TestXAvp(unittest.TestCase):
def setUp(self):
self.name = 'test'
self.data = [
{'koko': [1]},
{'koko': [1, 2]},
{'lolo': [3], 'lola': [7]},
]
self.xavp = XAvp('$xavp(%s)' % self.name, self.data)
def test_init(self):
self.assertEqual(self.name, self.xavp._name)
self.assertItemsEqual(self.data, self.xavp._data)
def test_init_wrong_type(self):
self.assertRaises(Exception, self.xavp, '$var(whatever)', None)
def test_parse_type(self):
self.assertRaisesRegexp(
Exception, 'no xavp', XAvp.parse, '$var(whatever)')
self.assertRaisesRegexp(Exception, 'no xavp', XAvp.parse, '$fU')
def test_get_wrong_name(self):
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro)')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0])')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro=>whatever)')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0]=>whatever)')
def test_get_wrong_nindx(self):
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[10]=>koko)')
self.assertRaises(
IndexError, self.xavp.get, '$xavp(test[10]=>koko[1])')
def test_get_wrong_kindx(self):
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[0]=>koko[1])')
self.assertRaises(
IndexError, self.xavp.get, '$xavp(test[1]=>koko[10])')
def test_get_value(self):
self.assertEqual(self.xavp.get('$xavp(test=>koko)'), 1)
self.assertEqual(self.xavp.get('$xavp(test[1]=>koko[1])'), 2)
self.assertEqual(self.xavp.get('$xavp(test[2]=>lola[0])'), 7)
def test_get_value_all(self):
self.assertItemsEqual(self.xavp.get('$xavp(test[1]=>koko[*])'), [1, 2])
class TestCheckFlowVars(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.check_ok = [
{'R0': {'$xavp(v0)': [{'k0': [1],
'k1': ['a', 'b', 'fuckthisshit']},
{'k0': [1, 2], 'k1': ['a', ]},
],
'fU': 'testpep',
}},
{'R1': {'$xavp(v0)': [{'k0': [1, 2]}]}},
]
self.scen_noxavp = [
{'R0': {'fU': 'testpep'}},
{'R1': {}},
]
self.scen = [
{'R0': {'$xavp(v0[0]=>k0[0])': 1,
'$xavp(v0[0]=>k1[0])': 'a',
'$xavp(v0[0]=>k1[2])': '^f',
'$var(no)': 'None',
'$xavp(nono=>koko)': 'None',
'$xavp(v0=>k10)': 'None',
'$xavp(v0[1]=>k0[1])': '\d+'}
},
{'R1': {'$xavp(v0[1]=>k0[0])': 1}},
]
def testXAvp(self):
data = self.check_ok[0]['R0']['$xavp(v0)']
xavp = XAvp('$xavp(v0)', data)
self.assertEqual(xavp.get('$xavp(v0=>k0)'), 1)
self.assertEqual(
xavp.get('$xavp(v0=>k1[*])'), ['a', 'b', 'fuckthisshit'])
self.assertEqual(xavp.get('$xavp(v0[1]=>k0[1])'), 2)
self.assertEqual(xavp.get('$xavp(v0[1]=>k1[*])'), ['a'])
def testFlow_noxavp(self):
check_flow(self.scen_noxavp, self.check_ok, self.ctest)
self.assertFalse(self.ctest.isError())
def testFlowVars_noxavp(self):
check_flow_vars(
'RO', self.scen_noxavp[0]['R0'],
self.check_ok[0]['R0'], self.ctest)
# print self.ctest
self.assertFalse(self.ctest.isError())
def testFlowVars_xavp(self):
check_flow_vars(
'RO', self.scen[0]['R0'],
self.check_ok[0]['R0'], self.ctest)
# print self.ctest
self.assertFalse(self.ctest.isError())
class TestCheckSipIn(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.msg = open('tests/fixtures/sip_in.txt', 'r').read()
def testSipIn(self):
sip_in = yaml.load(open('tests/fixtures/sip_in_test.yml', 'r'))
check_sip(sip_in, self.msg, self.ctest)
# print self.ctest
self.assertFalse(self.ctest.isError())
class TestCheckSipOut(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.msg = yaml.load(open('tests/fixtures/sip_out.yml', 'r'))
def testSipOut(self):
sip_out = yaml.load(open('tests/fixtures/sip_out_test.yml', 'r'))
check_sip_out(sip_out, self.msg, self.ctest)
# print self.ctest
self.assertFalse(self.ctest.isError())
if __name__ == '__main__':
unittest.main()
Loading…
Cancel
Save