MT#11921 fix tap-test errors and move unit-tests to tests dir

Change-Id: I5a76db17a3d9178dca42440f50b5c9af1837d80c
changes/54/1354/3
Victor Seva 11 years ago
parent 473cee37f3
commit 2383cafe85

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# #
# Copyright: 2013 Sipwise Development Team <support@sipwise.com> # Copyright: 2013-2015 Sipwise Development Team <support@sipwise.com>
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
@ -18,262 +18,291 @@
# On Debian systems, the complete text of the GNU General # On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3". # Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
# #
import io, sys, re, getopt import io
import sys
import re
import getopt
from yaml import load from yaml import load
from pprint import pprint from pprint import pprint
try: try:
from yaml import CLoader as Loader from yaml import CLoader as Loader
except: except:
from yaml import Loader from yaml import Loader
class XAvp: class XAvp:
""" Class to simulate the xavp """
def __init__(self, name, data): """ Class to simulate the xavp """
result = re.match('\$xavp\((\w+)\)', name)
try: def __init__(self, name, data):
self._name = result.group(1) result = re.match('\$xavp\((\w+)\)', name)
except: try:
raise Exception('not a xavp') self._name = result.group(1)
self._data = data except:
raise Exception('not a xavp')
def get(self, str): self._data = data
info = XAvp.parse(str)
def get(self, str):
if self._name != info['name']: info = XAvp.parse(str)
raise KeyError(
'diferent name. name:%s != %s' % (self._name, info['name']) if self._name != info['name']:
) raise KeyError(
'diferent name. name:%s != %s' % (self._name, info['name'])
nsize = len(self._data) )
if nsize <= info['nindx']:
raise IndexError('%s has %d elements' % (self._name, nsize)) nsize = len(self._data)
if nsize <= info['nindx']:
if self._data[info['nindx']].has_key(info['key']): raise IndexError('%s has %d elements' % (self._name, nsize))
values = self._data[info['nindx']][info['key']]
else: if info['key'] in self._data[info['nindx']]:
raise KeyError('no %s key found' % info['key'] ) values = self._data[info['nindx']][info['key']]
if info['kindx'] == '*':
return values;
ksize = len(values)
if ksize <= info['kindx']:
raise IndexError('%s has %d elements not %s' % (info['key'], ksize, info['kindx']))
return values[info['kindx']]
@classmethod
def parse(cls, str):
pattern_nindx = '(\[(?P<%s>\d+)\])?' % 'nindx'
pattern_kindx = '(\[(?P<%s>\d+|\*+)\])?' % 'kindx'
pattern = '\$xavp\((?P<name>\w+)%s(=>(?P<key>\w+)%s)?\)' % (pattern_nindx, pattern_kindx)
result = re.match(pattern, str)
if result is not None:
try:
nindx = int(result.group('nindx'))
except:
nindx = 0
try:
kindx = int(result.group('kindx'))
except:
if result.group('kindx') == '*':
kindx = '*'
else: else:
kindx = 0 raise KeyError('no %s key found' % info['key'])
return {'name': result.group('name'), 'nindx': nindx, 'key': result.group('key'), 'kindx': kindx }
else: if info['kindx'] == '*':
raise Exception('no xavp') return values
ksize = len(values)
if ksize <= info['kindx']:
raise IndexError('%s has %d elements not %s' %
(info['key'], ksize, info['kindx']))
return values[info['kindx']]
@classmethod
def parse(cls, str):
pattern_nindx = '(\[(?P<%s>\d+)\])?' % 'nindx'
pattern_kindx = '(\[(?P<%s>\d+|\*+)\])?' % 'kindx'
pattern = '\$xavp\((?P<name>\w+)%s(=>(?P<key>\w+)%s)?\)' % (
pattern_nindx, pattern_kindx)
result = re.match(pattern, str)
if result is not None:
try:
nindx = int(result.group('nindx'))
except:
nindx = 0
try:
kindx = int(result.group('kindx'))
except:
if result.group('kindx') == '*':
kindx = '*'
else:
kindx = 0
return {
'name': result.group('name'),
'nindx': nindx,
'key': result.group('key'),
'kindx': kindx}
else:
raise Exception('no xavp')
class Test: class Test:
""" Class to create TAP output """
def __init__(self): """ Class to create TAP output """
self._step = []
self._errflag = False def __init__(self):
self._step = []
def comment(self, msg): self._errflag = False
""" Add a comment """
self._step.append({'result': None, 'msg_ok': msg}) def comment(self, msg):
""" Add a comment """
def ok(self, msg = None): self._step.append({'result': None, 'msg_ok': msg})
""" Add a ok result """
self._step.append({'result': True, 'msg_ok': msg}) def ok(self, msg=None):
""" Add a ok result """
def error(self, msg_err): self._step.append({'result': True, 'msg_ok': msg})
""" Add an error result"""
self._step.append({'result': False, 'msg_err': msg_err}) def error(self, msg_err):
self._errflag = True """ Add an error result"""
self._step.append({'result': False, 'msg_err': msg_err})
@classmethod self._errflag = True
def compare(cls, val0, val1):
if isinstance(val0, basestring): @classmethod
result = re.search(val0, str(val1)) def compare(cls, val0, val1):
else: if isinstance(val0, basestring):
result = (val0 == val1) result = re.search(val0, str(val1))
return result
def test(self, value_expected, value, msg_err, msg_ok = None):
""" Test two values and add the result"""
result = Test.compare(value_expected, value)
self._step.append({'result': result, 'msg_err': msg_err, 'msg_ok': msg_ok})
if not result:
self._errflag = True
def isError(self):
return self._errflag
def _num_tests(self):
"""get the num of tests"""
test = 0
for s in self._step:
if (s['result'] is not None):
test = test + 1
return test
def __str__(self):
"""get the TAP output"""
output = "1..%s\n" % self._num_tests()
test = 1
for s in self._step:
if (s['result'] is None):
output += '# %s\n' % s['msg_ok']
continue
elif (s['result']):
if (s['msg_ok'] is not None):
output += "ok %d - %s\n" % (test, s['msg_ok'])
else: else:
output += "ok %d\n" % test result = (val0 == val1)
else: return result
output += "not ok %d - ERROR: %s\n" % (test, s['msg_err'])
test = test + 1 def test(self, value_expected, value, msg_err, msg_ok=None):
return output """ Test two values and add the result"""
result = Test.compare(value_expected, value)
self._step.append(
{'result': result, 'msg_err': msg_err, 'msg_ok': msg_ok})
if not result:
self._errflag = True
def isError(self):
return self._errflag
def _num_tests(self):
"""get the num of tests"""
test = 0
for s in self._step:
if (s['result'] is not None):
test = test + 1
return test
def __str__(self):
"""get the TAP output"""
output = "1..%s\n" % self._num_tests()
test = 1
for s in self._step:
if (s['result'] is None):
output += '# %s\n' % s['msg_ok']
continue
elif (s['result']):
if (s['msg_ok'] is not None):
output += "ok %d - %s\n" % (test, s['msg_ok'])
else:
output += "ok %d\n" % test
else:
output += "not ok %d - ERROR: %s\n" % (test, s['msg_err'])
test = test + 1
return output
def check_flow_vars(sk, sv, cv, test): def check_flow_vars(sk, sv, cv, test):
""" check the vars on a flow level""" """ check the vars on a flow level"""
for k in sv.iterkeys(): for k in sv.iterkeys():
if(not cv.has_key(k)): if(k not in cv):
try: try:
info = XAvp.parse(k) info = XAvp.parse(k)
search_key = '$xavp(%s)' % info['name'] search_key = '$xavp(%s)' % info['name']
if(not cv.has_key(search_key)): if(search_key not in cv):
raise Exception("search_key: %s info:%s" % (search_key, info)) raise Exception("search_key: %s info:%s" %
xavp = XAvp(search_key, cv[search_key]) (search_key, info))
val = xavp.get(k) xavp = XAvp(search_key, cv[search_key])
#print "testing %s == %s" % (sv[k], val) val = xavp.get(k)
test.test(sv[k], val, 'flow[%s] expected %s == %s but is %s' % (sk, k, sv[k], val), 'flow[%s] %s' % (sk, k)) # print "testing %s == %s" % (sv[k], val)
except LookupError as err: test.test(sv[k], val,
if(sv[k] == 'None'): 'flow[%s] expected %s == %s but is %s' %
test.ok('flow[%s] %s is not there' % (sk, k)) (sk, k, sv[k], val),
'flow[%s] %s' % (sk, k))
except LookupError as err:
if(sv[k] == 'None'):
test.ok('flow[%s] %s is not there' % (sk, k))
else:
test.error('LookupError with %s. Error:%s' % (k, err))
except Exception as err:
if(sv[k] == 'None'):
test.ok('flow[%s] %s is not there' % (sk, k))
else:
test.error(
'Expected var %s on flow[%s]. %s' % (k, sk, err))
else: else:
test.error('LookupError with %s. Error:%s' % (k, err)) test.test(sv[k], cv[k], 'flow[%s] expected %s == %s but is %s' % (
except Exception as err: sk, k, sv[k], cv[k]), 'flow[%s] %s' % (sk, k))
if(sv[k] == 'None'):
test.ok('flow[%s] %s is not there' % (sk, k))
else:
test.error('Expected var %s on flow[%s]. %s' % (k,sk, err))
else:
test.test(sv[k], cv[k], 'flow[%s] expected %s == %s but is %s' % (sk, k, sv[k], cv[k]), 'flow[%s] %s' % (sk, k))
def check_flow(scen, check, test): def check_flow(scen, check, test):
""" checks the flow and the vars inside""" """ checks the flow and the vars inside"""
for i in range(len(scen)): for i in range(len(scen)):
(sk, sv) = scen[i].popitem() (sk, sv) = scen[i].popitem()
try: try:
(ck, cv) = check[i].popitem() (ck, cv) = check[i].popitem()
except: except:
test.error('wrong flow. Expected: %s but is nothing there' % sk) test.error('wrong flow. Expected: %s but is nothing there' % sk)
continue continue
if(sk != ck): if(sk != ck):
test.error('wrong flow. Expected: %s but is %s' % (sk, ck)) test.error('wrong flow. Expected: %s but is %s' % (sk, ck))
continue continue
if sv is None: if sv is None:
test.ok('flow[%s] no var to check' % sk) test.ok('flow[%s] no var to check' % sk)
continue continue
else: else:
test.ok('flow[%s]' % sk) test.ok('flow[%s]' % sk)
check_flow_vars(sk, sv, cv, test) check_flow_vars(sk, sv, cv, test)
if(len(check)>len(scen)): if(len(check) > len(scen)):
l = [] l = []
for i in check: for i in check:
for k in i.keys(): for k in i.keys():
l.append(k) l.append(k)
test.error('Expected to end but there are more flows %s' % l) test.error('Expected to end but there are more flows %s' % l)
def check_sip(scen, msg, test): def check_sip(scen, msg, test):
for rule in scen: for rule in scen:
if rule.startswith('_:NOT:_'): if rule.startswith('_:NOT:_'):
flag = False flag = False
rule = rule[7:] rule = rule[7:]
msg_ok = '%s not match' msg_ok = '%s not match'
msg_ko = '%s match' msg_ko = '%s match'
else: else:
flag = True flag = True
msg_ok = '%s match' msg_ok = '%s match'
msg_ko = '%s not match' msg_ko = '%s not match'
result = re.search(rule, msg) result = re.search(rule, msg)
if (result is not None) == flag: if (result is not None) == flag:
test.ok(msg_ok % rule) test.ok(msg_ok % rule)
continue continue
test.comment('result:%s' % result) test.comment('result:%s' % result)
test.error(msg_ko % rule) test.error(msg_ko % rule)
def check_sip_out(scen, msgs, test): def check_sip_out(scen, msgs, test):
num_msgs = len(msgs) num_msgs = len(msgs)
num_scen = len(scen) num_scen = len(scen)
for i in (range(num_scen)): for i in (range(num_scen)):
test.comment("sip_out %d" % i) test.comment("sip_out %d" % i)
if(i<num_msgs): if(i < num_msgs):
check_sip(scen[i], msgs[i], test) check_sip(scen[i], msgs[i], test)
else: else:
test.error("sip_out[%d] does not exist" % i) test.error("sip_out[%d] does not exist" % i)
if ( num_scen != num_msgs ): if (num_scen != num_msgs):
test.error("we expected %d out messages but we have %d" % ( num_scen, num_msgs )) test.error("we expected %d out messages but we have %d" %
(num_scen, num_msgs))
def usage(): def usage():
print 'Usage: check.py [-h] scenario.yml test.yml' print 'Usage: check.py [-h] scenario.yml test.yml'
print '-h: this help' print '-h: this help'
def main(): def main():
try: try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help"]) opts, args = getopt.getopt(sys.argv[1:], "h", ["help"])
except getopt.GetoptError as err: except getopt.GetoptError as err:
# print help information and exit: # print help information and exit:
print str(err) # will print something like "option -a not recognized" print str(err) # will print something like "option -a not recognized"
usage() usage()
sys.exit(2) sys.exit(2)
for o, a in opts: for o, a in opts:
if o in ("-h", "--help"): if o in ("-h", "--help"):
usage() usage()
sys.exit() sys.exit()
else: else:
assert False, "unhandled option" assert False, "unhandled option"
if(len(args)!=2): if(len(args) != 2):
usage() usage()
sys.exit(1) sys.exit(1)
with io.open(args[0], 'r') as file: with io.open(args[0], 'r') as file:
scen = load(file, Loader=Loader) scen = load(file, Loader=Loader)
file.close()
test = Test()
try:
with io.open(args[1], 'r') as file:
check = load(file, Loader=Loader)
file.close() file.close()
except:
check = {'flow': [], 'sip_in': '', 'sip_out': []} test = Test()
test.error("Error loading file:%s" % args[1])
try:
test.comment('check flow') with io.open(args[1], 'r') as file:
check_flow(scen['flow'], check['flow'], test) check = load(file, Loader=Loader)
test.comment('check sip_in') file.close()
check_sip(scen['sip_in'], check['sip_in'], test) except:
test.comment('check sip_out') check = {'flow': [], 'sip_in': '', 'sip_out': []}
check_sip_out(scen['sip_out'], check['sip_out'], test) test.error("Error loading file:%s" % args[1])
print test
if test.isError(): test.comment('check flow')
sys.exit(1) check_flow(scen['flow'], check['flow'], test)
test.comment('check sip_in')
check_sip(scen['sip_in'], check['sip_in'], test)
test.comment('check sip_out')
check_sip_out(scen['sip_out'], check['sip_out'], test)
print test
if test.isError():
sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

@ -18,13 +18,14 @@
# On Debian systems, the complete text of the GNU General # On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3". # Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
# #
use File::Spec; use English;
use File::Copy; use File::Copy;
use Tie::File; use File::Spec;
use Getopt::Long;
use strict; use strict;
use Tie::File;
use warnings; use warnings;
use YAML::Tiny; use YAML::Tiny;
use Getopt::Long;
sub usage sub usage
{ {
@ -87,12 +88,13 @@ if (lc($action) eq "off")
} }
else else
{ {
copy($file, $file.".orig") or die "Copy failed: $!" unless(-e $file.".orig"); copy($file, $file.".orig") or die "Copy failed: $ERRNO" unless(-e $file.".orig");
$yaml = YAML::Tiny->read($file) or die "File $file could not be read"; $yaml = YAML::Tiny->read($file) or die "File $file could not be read";
$yaml->[0]->{kamailio}{lb}{debug} = 'yes'; $yaml->[0]->{kamailio}{lb}{debug} = 'yes';
$yaml->[0]->{kamailio}{lb}{use_dns_cache} = 'off'; $yaml->[0]->{kamailio}{lb}{use_dns_cache} = 'off';
$yaml->[0]->{kamailio}{proxy}{debug} = 'yes'; $yaml->[0]->{kamailio}{proxy}{debug} = 'yes';
$yaml->[0]->{kamailio}{proxy}{presence}{enable} = 'yes'; $yaml->[0]->{kamailio}{proxy}{presence}{enable} = 'yes';
$yaml->[0]->{kamailio}{proxy}{fritzbox_prefixes} = [ '112', '110', '118[0-9]{2}' ];
$yaml->[0]->{sems}{debug} = 'yes'; $yaml->[0]->{sems}{debug} = 'yes';
$yaml->[0]->{checktools}{sip_check_enable} = 0; $yaml->[0]->{checktools}{sip_check_enable} = 0;

@ -21,6 +21,7 @@
use strict; use strict;
use warnings; use warnings;
use English;
use Getopt::Std; use Getopt::Std;
use Cwd 'abs_path'; use Cwd 'abs_path';
use YAML; use YAML;
@ -109,11 +110,11 @@ sub call_prov {
}); });
}; };
if($@) { if($EVAL_ERROR) {
if(ref $@ eq 'SOAP::Fault') { if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring; die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else { } else {
die "Voip\::$function failed: $@"; die "Voip\::$function failed: $EVAL_ERROR";
} }
} }
@ -121,5 +122,5 @@ sub call_prov {
} }
sub usage { sub usage {
return "Usage:\n$0 ncos.yml\n"; return "Usage:\n$PROGRAM_NAME ncos.yml\n";
} }

@ -21,6 +21,7 @@
use strict; use strict;
use warnings; use warnings;
use English;
use Getopt::Std; use Getopt::Std;
use Cwd 'abs_path'; use Cwd 'abs_path';
use YAML; use YAML;
@ -116,7 +117,7 @@ sub do_create {
foreach (@{$peer->{groups}}) foreach (@{$peer->{groups}})
{ {
$_->{peering_contract_id} = $contract_id; $_->{peering_contract_id} = $contract_id;
call_prov( $vprov, 'create_peer_group', $_ ); call_prov( $vprov, 'create_peer_group', $_ );
} }
my $result = call_prov( $vprov, 'get_peer_groups'); my $result = call_prov( $vprov, 'get_peer_groups');
foreach (@{$result}) foreach (@{$result})
@ -147,7 +148,6 @@ sub do_create {
$param = { id => $_->{id}, preferences => $host->{$_->{name}}->{preferences}}; $param = { id => $_->{id}, preferences => $host->{$_->{name}}->{preferences}};
call_prov($vprov, 'set_peer_preferences', $param); call_prov($vprov, 'set_peer_preferences', $param);
} }
} }
} }
} }
@ -171,11 +171,11 @@ sub call_prov {
}); });
}; };
if($@) { if($EVAL_ERROR) {
if(ref $@ eq 'SOAP::Fault') { if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring; die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else { } else {
die "Voip\::$function failed: $@"; die "Voip\::$function failed: $EVAL_ERROR";
} }
} }
@ -183,5 +183,5 @@ sub call_prov {
} }
sub usage { sub usage {
return "Usage:\n$0 peer.yml\n"; return "Usage:\n$PROGRAM_NAME peer.yml\n";
} }

@ -21,6 +21,7 @@
use strict; use strict;
use warnings; use warnings;
use English;
use Getopt::Std; use Getopt::Std;
use Cwd 'abs_path'; use Cwd 'abs_path';
use YAML; use YAML;
@ -130,11 +131,11 @@ sub call_prov {
}); });
}; };
if($@) { if($EVAL_ERROR) {
if(ref $@ eq 'SOAP::Fault') { if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring; die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else { } else {
die "Voip\::$function failed: $@"; die "Voip\::$function failed: $EVAL_ERROR";
} }
} }
@ -142,5 +143,5 @@ sub call_prov {
} }
sub usage { sub usage {
return "Usage:\n$0 scenario.yml\n"; return "Usage:\n$PROGRAM_NAME scenario.yml\n";
} }

@ -21,6 +21,7 @@
use strict; use strict;
use warnings; use warnings;
use English;
use YAML; use YAML;
use Getopt::Long; use Getopt::Long;
use Sipwise::Provisioning::Billing; use Sipwise::Provisioning::Billing;
@ -40,7 +41,7 @@ our %BILLING = (
); );
sub usage { sub usage {
die "Usage:\n$0 scenario.yml\n". die "Usage:\n$PROGRAM_NAME scenario.yml\n".
"Options:\n". "Options:\n".
" -h this help\n"; " -h this help\n";
} }
@ -72,6 +73,7 @@ sub main
call_prov( $bprov, 'create_voip_account', { data => { %BILLING, subscribers => \@subs }}); call_prov( $bprov, 'create_voip_account', { data => { %BILLING, subscribers => \@subs }});
if($debug) { print("created ".($#subs+1)." subscribers"); } if($debug) { print("created ".($#subs+1)." subscribers"); }
} }
return;
} }
sub call_prov { sub call_prov {
@ -91,11 +93,11 @@ sub call_prov {
}); });
}; };
if($@) { if($EVAL_ERROR) {
if(ref $@ eq 'SOAP::Fault') { if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Billing\::$function failed: ". $@->faultstring; die "Billing\::$function failed: ". $EVAL_ERROR->faultstring;
} else { } else {
die "Billing\::$function failed: $@"; die "Billing\::$function failed: $EVAL_ERROR";
} }
} }

@ -18,23 +18,29 @@
# On Debian systems, the complete text of the GNU General # On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3". # Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
# #
import sys, os, csv, argparse import argparse
import csv
import os
import sys
import xmlrpclib import xmlrpclib
KAM_URL='http://127.0.0.1:5062' KAM_URL = 'http://127.0.0.1:5062'
KAM_LINES=10 KAM_LINES = 10
proxy = xmlrpclib.ServerProxy(KAM_URL) proxy = xmlrpclib.ServerProxy(KAM_URL)
def get_headers(l, prefix): def get_headers(l, prefix):
res = [] res = []
for i in l: for i in l:
res.append("%s%s" % (prefix, i)) res.append("%s%s" % (prefix, i))
return res return res
def sum_row(s, r): def sum_row(s, r):
for i in ['used', 'real_used', 'free']: for i in ['used', 'real_used', 'free']:
s[i] = s[i] + r[i]; s[i] = s[i] + r[i]
def save_data(datafile, headers, prefix, res): def save_data(datafile, headers, prefix, res):
show_headers = get_headers(headers, prefix) show_headers = get_headers(headers, prefix)
@ -42,12 +48,13 @@ def save_data(datafile, headers, prefix, res):
spamwriter = csv.writer(csvfile) spamwriter = csv.writer(csvfile)
spamwriter.writerow(show_headers) spamwriter.writerow(show_headers)
spamwriter = csv.DictWriter(csvfile, spamwriter = csv.DictWriter(csvfile,
headers, extrasaction='ignore') headers, extrasaction='ignore')
for row in res: for row in res:
spamwriter.writerow(row) spamwriter.writerow(row)
def get_pvm(private_file): def get_pvm(private_file):
res = [{'pid':'SUM', 'used':0, 'real_used':0, 'free':0},] res = [{'pid': 'SUM', 'used': 0, 'real_used': 0, 'free': 0}, ]
headers = ['pid', 'used', 'real_used', 'free'] headers = ['pid', 'used', 'real_used', 'free']
prefix = 'rss_' prefix = 'rss_'
try: try:
@ -72,10 +79,11 @@ def get_pvm(private_file):
private_file_pp = '%s_per_pid%s' % (fileName, fileExtension) private_file_pp = '%s_per_pid%s' % (fileName, fileExtension)
save_data(private_file_pp, headers, prefix, res[2:]) save_data(private_file_pp, headers, prefix, res[2:])
def get_shm(share_file): def get_shm(share_file):
headers = ['used', 'real_used', 'max_used', headers = ['used', 'real_used', 'max_used',
'free', 'fragments', 'total' 'free', 'fragments', 'total'
] ]
prefix = 'shared_' prefix = 'shared_'
try: try:
res = proxy.core.shmmem('b') res = proxy.core.shmmem('b')
@ -93,18 +101,20 @@ def get_shm(share_file):
sys.exit(-2) sys.exit(-2)
save_data(share_file, headers, prefix, [res, ]) save_data(share_file, headers, prefix, [res, ])
def main(): def main():
parser = argparse.ArgumentParser(description='export to csv kamailio proxy stats.') parser = argparse.ArgumentParser(
description='export to csv kamailio proxy stats.')
parser.add_argument('--private_file', '-P', nargs='?', default='pvm.csv', parser.add_argument('--private_file', '-P', nargs='?', default='pvm.csv',
help='path to the private csv file') help='path to the private csv file')
parser.add_argument('--share_file', '-S', nargs='?', default='shm.csv', parser.add_argument('--share_file', '-S', nargs='?', default='shm.csv',
help='path to the share csv file') help='path to the share csv file')
parser.add_argument('--rpc_url', nargs='?', default=KAM_URL, parser.add_argument('--rpc_url', nargs='?', default=KAM_URL,
help='rpc URL of kamailio. Default:%s' % KAM_URL) help='rpc URL of kamailio. Default:%s' % KAM_URL)
args = parser.parse_args() args = parser.parse_args()
get_pvm(args.private_file) get_pvm(args.private_file)
get_shm(args.share_file) get_shm(args.share_file)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

@ -18,16 +18,18 @@
On Debian systems, the complete text of the GNU General On Debian systems, the complete text of the GNU General
Public License version 3 can be found in "/usr/share/common-licenses/GPL-3". Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
""" """
import sys
import os.path
import pyinotify
import logging import logging
import os import os
import os.path
import pyinotify
import sys
BASE_DIR = "/usr/share/kamailio-config-tests" BASE_DIR = "/usr/share/kamailio-config-tests"
if os.environ.has_key('BASE_DIR'): if 'BASE_DIR' in os.environ:
BASE_DIR = os.environ['BASE_DIR'] BASE_DIR = os.environ['BASE_DIR']
filelog = os.path.join(BASE_DIR, 'log', 'pid_watcher.log') filelog = os.path.join(BASE_DIR, 'log', 'pid_watcher.log')
logging.basicConfig(filename=filelog, level=logging.DEBUG, format='%(asctime)s %(message)s') logging.basicConfig(
filename=filelog, level=logging.DEBUG, format='%(asctime)s %(message)s')
base_path = "/var/run" base_path = "/var/run"
@ -49,31 +51,33 @@ watched_dirs = [
watched = {} watched = {}
class Handler(pyinotify.ProcessEvent): class Handler(pyinotify.ProcessEvent):
def my_init(self, watched): def my_init(self, watched):
self.watched = watched self.watched = watched
def check_all(self): def check_all(self):
all = True all = True
for k,v in self.watched.iteritems(): for k, v in self.watched.iteritems():
all = (all and (v['created'] or v['modified'])) all = (all and (v['created'] or v['modified']))
logging.info("checking: %s[%s] all:%s" % (k,v, all)) logging.info("checking: %s[%s] all:%s" % (k, v, all))
return all return all
def process_IN_CREATE(self, event): def process_IN_CREATE(self, event):
if watched.has_key(event.pathname): if event.pathname in watched:
watched[event.pathname]['created'] = True watched[event.pathname]['created'] = True
logging.info("created %s" % event.pathname) logging.info("created %s" % event.pathname)
if self.check_all(): if self.check_all():
sys.exit(0) sys.exit(0)
def process_IN_IGNORED(self, event): def process_IN_IGNORED(self, event):
if watched.has_key(event.pathname): if event.pathname in watched:
watched[event.pathname]['deleted'] = True watched[event.pathname]['deleted'] = True
logging.info("deleted %s" % event.pathname) logging.info("deleted %s" % event.pathname)
def process_IN_MODIFY(self, event): def process_IN_MODIFY(self, event):
if watched.has_key(event.pathname): if event.pathname in watched:
watched[event.pathname]['modified'] = True watched[event.pathname]['modified'] = True
logging.info("modified %s" % event.pathname) logging.info("modified %s" % event.pathname)
# for debug # for debug
@ -88,7 +92,8 @@ notifier = pyinotify.Notifier(wm, default_proc_fun=handler)
for service in services: for service in services:
service_pid = os.path.join(base_path, service) service_pid = os.path.join(base_path, service)
logging.info("Watching %s" % service_pid) logging.info("Watching %s" % service_pid)
watched[service_pid] = {'deleted': False, 'created': False, 'modified': False } watched[service_pid] = {
'deleted': False, 'created': False, 'modified': False}
wm.add_watch(service_pid, pyinotify.IN_IGNORED) wm.add_watch(service_pid, pyinotify.IN_IGNORED)
for d in watched_dirs: for d in watched_dirs:
wm.add_watch(d, pyinotify.ALL_EVENTS) wm.add_watch(d, pyinotify.ALL_EVENTS)

@ -94,6 +94,7 @@ sub get_subs_info
{ {
die("domain:".$data->{domain}." not defined in subscribers"); die("domain:".$data->{domain}." not defined in subscribers");
} }
return;
} }
sub generate sub generate
@ -133,7 +134,7 @@ sub generate
{ {
# by default foreign is no # by default foreign is no
$_->{foreign} = "no" unless defined($_->{foreign}); $_->{foreign} = "no" unless defined($_->{foreign});
if(!defined($_->{peer_host}) and $_->{foreign} ne "yes") if(not defined($_->{peer_host}) and $_->{foreign} ne "yes")
{ {
get_subs_info($data->{subscribers}, $_); get_subs_info($data->{subscribers}, $_);
} }
@ -161,6 +162,7 @@ sub generate
} }
$id++; $id++;
} }
return;
} }
sub generate_reg sub generate_reg
@ -169,6 +171,7 @@ sub generate_reg
my $vars = { line => $num }; my $vars = { line => $num };
my $fn = File::Spec->catfile($base_check_dir, "sipp_scenario_responder".(sprintf "%02i", $num)."_reg.xml"); my $fn = File::Spec->catfile($base_check_dir, "sipp_scenario_responder".(sprintf "%02i", $num)."_reg.xml");
$tt->process($template_reg, $vars, $fn) or die($tt->error(), "\n"); $tt->process($template_reg, $vars, $fn) or die($tt->error(), "\n");
return;
} }
sub generate_presence sub generate_presence
@ -198,6 +201,7 @@ sub generate_presence
chmod(0755, $fn); chmod(0755, $fn);
undef $fn; undef $fn;
} }
return;
} }
sub generate_foreign_dom sub generate_foreign_dom
@ -207,6 +211,7 @@ sub generate_foreign_dom
my $fn = IO::File->new($file, "w") or die("can't create $file"); my $fn = IO::File->new($file, "w") or die("can't create $file");
print {$fn} "$ip $domain\n"; print {$fn} "$ip $domain\n";
undef $fn; undef $fn;
return;
} }
generate($cf); generate($cf);

@ -21,6 +21,7 @@
use strict; use strict;
use warnings; use warnings;
use English;
use Getopt::Std; use Getopt::Std;
use Cwd 'abs_path'; use Cwd 'abs_path';
use YAML; use YAML;
@ -134,11 +135,11 @@ sub call_prov {
}); });
}; };
if($@) { if($EVAL_ERROR) {
if(ref $@ eq 'SOAP::Fault') { if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Billing\::$function failed: ". $@->faultstring; die "Billing\::$function failed: ". $EVAL_ERROR->faultstring;
} else { } else {
die "Billing\::$function failed: $@"; die "Billing\::$function failed: $EVAL_ERROR";
} }
} }
@ -146,5 +147,5 @@ sub call_prov {
} }
sub usage { sub usage {
die "Usage:\n$0 prefs.yml\n"; die "Usage:\n$PROGRAM_NAME prefs.yml\n";
} }

@ -21,6 +21,7 @@
use strict; use strict;
use warnings; use warnings;
use English;
use Getopt::Std; use Getopt::Std;
use Cwd 'abs_path'; use Cwd 'abs_path';
use YAML; use YAML;
@ -55,7 +56,7 @@ sub main {
my $param; my $param;
# cf_set # cf_set
my $set = {}; # name -> set_id my $set = {}; # name -> set_id
my $time = {}; # name -> time_id my $time = {}; # name -> time_id
foreach my $s (@{$cf->{$key}->{'destination_set'}}) foreach my $s (@{$cf->{$key}->{'destination_set'}})
{ {
$param = { username => $fields[0], domain => $fields[1], data => $s }; $param = { username => $fields[0], domain => $fields[1], data => $s };
@ -119,11 +120,11 @@ sub call_prov {
}); });
}; };
if($@) { if($EVAL_ERROR) {
if(ref $@ eq 'SOAP::Fault') { if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring; die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else { } else {
die "Voip\::$function failed: $@"; die "Voip\::$function failed: $EVAL_ERROR";
} }
} }
@ -131,5 +132,5 @@ sub call_prov {
} }
sub usage { sub usage {
die "Usage:\n$0 callforward.yml\n"; die "Usage:\n$PROGRAM_NAME callforward.yml\n";
} }

@ -21,6 +21,7 @@
use strict; use strict;
use warnings; use warnings;
use English;
use Getopt::Std; use Getopt::Std;
use Cwd 'abs_path'; use Cwd 'abs_path';
use YAML; use YAML;
@ -80,11 +81,11 @@ sub call_prov {
}); });
}; };
if($@) { if($EVAL_ERROR) {
if(ref $@ eq 'SOAP::Fault') { if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring; die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else { } else {
die "Voip\::$function failed: $@"; die "Voip\::$function failed: $EVAL_ERROR";
} }
} }
@ -92,5 +93,5 @@ sub call_prov {
} }
sub usage { sub usage {
die "Usage:\n$0 speeddial.yml\n"; die "Usage:\n$PROGRAM_NAME speeddial.yml\n";
} }

@ -1,217 +0,0 @@
#!/usr/bin/env python
#
# Copyright: 2013 Sipwise Development Team <support@sipwise.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This package is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
#
from check import XAvp, Test, check_flow, check_flow_vars, check_sip, check_sip_out
import yaml
import unittest
class TestXAvp(unittest.TestCase):
def setUp(self):
self.name = 'test'
self.data = [
{'koko': [1]},
{'koko': [1, 2]},
{'lolo': [3], 'lola': [7]},
]
self.xavp = XAvp('$xavp(%s)' % self.name, self.data)
def test_init(self):
self.assertEqual(self.name, self.xavp._name)
self.assertItemsEqual(self.data, self.xavp._data)
def test_init_wrong_type(self):
self.assertRaises(Exception, self.xavp, '$var(whatever)', None)
def test_parse_type(self):
self.assertRaisesRegexp(Exception, 'no xavp', XAvp.parse, '$var(whatever)')
self.assertRaisesRegexp(Exception, 'no xavp', XAvp.parse, '$fU')
def test_get_wrong_name(self):
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro)')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0])')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro=>whatever)')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0]=>whatever)')
def test_get_wrong_nindx(self):
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[10]=>koko)')
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[10]=>koko[1])')
def test_get_wrong_kindx(self):
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[0]=>koko[1])')
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[1]=>koko[10])')
def test_get_value(self):
self.assertEqual(self.xavp.get('$xavp(test=>koko)'), 1)
self.assertEqual(self.xavp.get('$xavp(test[1]=>koko[1])'), 2)
self.assertEqual(self.xavp.get('$xavp(test[2]=>lola[0])'), 7)
def test_get_value_all(self):
self.assertItemsEqual(self.xavp.get('$xavp(test[1]=>koko[*])'), [1,2])
class TestCheckFlowVars(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.check_ok = [
{ 'R0': { '$xavp(v0)':
[{
'k0': [1],
'k1': ['a', 'b', 'fuckthisshit']
},
{
'k0': [1,2],
'k1': ['a',]
},
],
'fU': 'testpep',
},
},
{ 'R1': { '$xavp(v0)': [{'k0': [1,2]}] }},
]
self.scen_noxavp = [
{ 'R0': {'fU': 'testpep'} },
{ 'R1': {} },
]
self.scen = [
{ 'R0': { '$xavp(v0[0]=>k0[0])': 1,
'$xavp(v0[0]=>k1[0])': 'a',
'$xavp(v0[0]=>k1[2])': '^f',
'$var(no)': 'None',
'$xavp(nono=>koko)': 'None',
'$xavp(v0=>k10)': 'None',
'$xavp(v0[1]=>k0[1])': '\d+'}
},
{ 'R1': {'$xavp(v0[1]=>k0[0])': 1} },
]
def testXAvp(self):
data = self.check_ok[0]['R0']['$xavp(v0)']
xavp = XAvp('$xavp(v0)', data)
self.assertEqual(xavp.get('$xavp(v0=>k0)'), 1)
self.assertEqual(xavp.get('$xavp(v0=>k1[*])'), ['a','b', 'fuckthisshit'])
self.assertEqual(xavp.get('$xavp(v0[1]=>k0[1])'), 2)
self.assertEqual(xavp.get('$xavp(v0[1]=>k1[*])'), ['a'])
def testFlow_noxavp(self):
check_flow(self.scen_noxavp, self.check_ok, self.ctest)
self.assertFalse(self.ctest.isError())
def testFlowVars_noxavp(self):
check_flow_vars('RO', self.scen_noxavp[0]['R0'], self.check_ok[0]['R0'], self.ctest)
print self.ctest
self.assertFalse(self.ctest.isError())
def testFlowVars_xavp(self):
check_flow_vars('RO', self.scen[0]['R0'], self.check_ok[0]['R0'], self.ctest)
print self.ctest
self.assertFalse(self.ctest.isError())
class TestCheckSipIn(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.msg ="""REGISTER sip:testuser1003@spce.test SIP/2.0
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
Authorization: Digest username="testuser1003",realm="spce.test",uri="sip:127.0.0.1:5060",nonce="Una661J2ub8hRdmjTwus8Habu3n6G/By",response="c1b8da80c3d05f8d4da478128f94907a",algorithm=MD5
Contact: "TestBria" <sip:testuser1003@127.126.0.1:50602;ob>;reg-id=1;+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>"
Contact: "TestBria" <sip:testuser1003@127.126.0.1:6666;ob>;expires=0
Expires: 600
Max-Forwards: 16
Content-Length: 0
P-NGCP-Src-Ip: 127.126.0.1
P-NGCP-Src-Port: 50602
P-NGCP-Src-Proto: udp
P-NGCP-Src-Af: 4
P-Sock-Info: udp:127.0.0.1:5060
Path: <sip:lb@127.0.0.1;lr;socket=sip:127.0.0.1:5060>"""
def testSipIn(self):
sip_in = yaml.load("""
- '^REGISTER'
- 'Contact: "TestBria" <sip:testuser1003@127.126.0.1:50602;ob>;reg-id=1;\+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>"'
- 'Contact: "TestBria" <sip:testuser1003@127.126.0.1:6666;ob>;expires=0'
- 'Content-Length: 0'
- 'Expires: \d+'
- '_:NOT:_Expires: 0'
- 'Authorization: Digest username="testuser1003"'""")
check_sip(sip_in, self.msg, self.ctest)
print self.ctest
self.assertFalse(self.ctest.isError())
class TestCheckSipOut(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.msg = yaml.load("""
- |+
SIP/2.0 100 Trying
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
P-Out-Socket: udp:127.0.0.1:5060
Server: Sipwise NGCP Proxy 2.X
Content-Length: 0
- |+
SIP/2.0 200 OK
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0;rport=5060
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>;tag=1d24a28a0bded6c40d31e6db8aab9ac6.504e
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
P-Out-Socket: udp:127.0.0.1:5060
P-NGCP-Authorization: testuser1003@spce.test
P-NGCP-Authorized: 1
P-Caller-UUID: 2cc06244-5a63-46cd-aee9-1d804d314371
Contact: <sip:testuser1003@127.126.0.1:50602;ob>;expires=600;+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>";reg-id=1
Server: Sipwise NGCP Proxy 2.X
Content-Length: 0""")
def testSipOut(self):
sip_out = yaml.load("""
- [
'^SIP/2.0 100 Trying',
'Content-Length: 0'
]
- [
'^SIP/2.0 200 OK',
'Contact: <sip:testuser1003@127.126.0.1:50602;ob>;expires=600;\+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>";reg-id=1',
'Content-Length: 0',
'P-NGCP-Authorization: testuser1003@',
'P-NGCP-Authorized: 1',
'_:NOT:_Contact: <sip:testuser1003@127.126.0.1:6666;ob>;expires=\d+'
]""")
check_sip_out(sip_out, self.msg, self.ctest)
print self.ctest
self.assertFalse(self.ctest.isError())
if __name__ == '__main__':
unittest.main()

@ -75,7 +75,7 @@ sub save_data
YAML::DumpFile($path, $data); YAML::DumpFile($path, $data);
#print "$data->{'msgid'} saved\n"; #print "$data->{'msgid'} saved\n";
# This tries to fix problems with string values '-' being saved # This tries to fix problems with string values '-' being saved
# without quotes. # without quotes.
tie my @array, 'Tie::File', $path or die ('Can not open $path'); tie my @array, 'Tie::File', $path or die ('Can not open $path');
for (@array) for (@array)
{ {
@ -90,6 +90,7 @@ sub save_data
sip_in => '', sip_in => '',
sip_out => [], sip_out => [],
}; };
return;
} }
my $pid; my $pid;
@ -154,7 +155,7 @@ given($#ARGV)
$filename = abs_path($filename); $filename = abs_path($filename);
$output_dir = abs_path($output_dir); $output_dir = abs_path($output_dir);
my $out; my $out;
open($log, "<$filename") or die "Couldn't open kamailio log, $!"; open($log, '<', "$filename") or die "Couldn't open kamailio log, $ERRNO";
first_line(); first_line();
do do

@ -21,6 +21,7 @@
use strict; use strict;
use warnings; use warnings;
use English;
use Cwd 'abs_path'; use Cwd 'abs_path';
use YAML; use YAML;
use Getopt::Long; use Getopt::Long;
@ -113,11 +114,11 @@ sub call_prov {
}); });
}; };
if($@) { if($EVAL_ERROR) {
if(ref $@ eq 'SOAP::Fault') { if(ref $EVAL_ERROR eq 'SOAP::Fault') {
die "Voip\::$function failed: ". $@->faultstring; die "Voip\::$function failed: ". $EVAL_ERROR->faultstring;
} else { } else {
die "Voip\::$function failed: $@"; die "Voip\::$function failed: $EVAL_ERROR";
} }
} }
@ -125,5 +126,6 @@ sub call_prov {
} }
sub usage { sub usage {
return "Usage:\n$0 [-h] [-i IP] [-p PORT] peer_host_name peer.yml\n"; return "Usage:\n$PROGRAM_NAME [-h] [-i IP] [-p PORT]" .
" peer_host_name peer.yml\n";
} }

@ -0,0 +1,19 @@
REGISTER sip:testuser1003@spce.test SIP/2.0
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
Authorization: Digest username="testuser1003",realm="spce.test",uri="sip:127.0.0.1:5060",nonce="Una661J2ub8hRdmjTwus8Habu3n6G/By",response="c1b8da80c3d05f8d4da478128f94907a",algorithm=MD5
Contact: "TestBria" <sip:testuser1003@127.126.0.1:50602;ob>;reg-id=1;+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>"
Contact: "TestBria" <sip:testuser1003@127.126.0.1:6666;ob>;expires=0
Expires: 600
Max-Forwards: 16
Content-Length: 0
P-NGCP-Src-Ip: 127.126.0.1
P-NGCP-Src-Port: 50602
P-NGCP-Src-Proto: udp
P-NGCP-Src-Af: 4
P-Sock-Info: udp:127.0.0.1:5060
Path: <sip:lb@127.0.0.1;lr;socket=sip:127.0.0.1:5060>

@ -0,0 +1,27 @@
- |+
SIP/2.0 100 Trying
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
P-Out-Socket: udp:127.0.0.1:5060
Server: Sipwise NGCP Proxy 2.X
Content-Length: 0
- |+
SIP/2.0 200 OK
Via: SIP/2.0/UDP 127.0.0.1;branch=z9hG4bK3969.2f1956baf914fd272ada0363d9577b29.0;rport=5060
Via: SIP/2.0/UDP 127.126.0.1:50602;rport=50602;branch=z9hG4bK-24881-1-3
From: <sip:testuser1003@spce.test>;tag=24881SIPpTag001
To: "TestBria" <sip:testuser1003@spce.test>;tag=1d24a28a0bded6c40d31e6db8aab9ac6.504e
Call-ID: 1-24881@127.126.0.1
CSeq: 4 REGISTER
P-Out-Socket: udp:127.0.0.1:5060
P-NGCP-Authorization: testuser1003@spce.test
P-NGCP-Authorized: 1
P-Caller-UUID: 2cc06244-5a63-46cd-aee9-1d804d314371
Contact: <sip:testuser1003@127.126.0.1:50602;ob>;expires=600;+sip.instance="<urn:uuid:C3DD6013-20E8-40E3-8EA2-5849B02ED0C4>";reg-id=1
Server: Sipwise NGCP Proxy 2.X
Content-Length: 0

@ -0,0 +1,163 @@
#!/usr/bin/env python
#
# Copyright: 2013-2015 Sipwise Development Team <support@sipwise.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This package is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# On Debian systems, the complete text of the GNU General
# Public License version 3 can be found in "/usr/share/common-licenses/GPL-3".
#
import os
import sys
lib_path = os.path.abspath('bin')
sys.path.append(lib_path)
from check import check_sip, check_sip_out
from check import XAvp, Test, check_flow, check_flow_vars
import unittest
import yaml
class TestXAvp(unittest.TestCase):
def setUp(self):
self.name = 'test'
self.data = [
{'koko': [1]},
{'koko': [1, 2]},
{'lolo': [3], 'lola': [7]},
]
self.xavp = XAvp('$xavp(%s)' % self.name, self.data)
def test_init(self):
self.assertEqual(self.name, self.xavp._name)
self.assertItemsEqual(self.data, self.xavp._data)
def test_init_wrong_type(self):
self.assertRaises(Exception, self.xavp, '$var(whatever)', None)
def test_parse_type(self):
self.assertRaisesRegexp(
Exception, 'no xavp', XAvp.parse, '$var(whatever)')
self.assertRaisesRegexp(Exception, 'no xavp', XAvp.parse, '$fU')
def test_get_wrong_name(self):
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro)')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0])')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro=>whatever)')
self.assertRaises(KeyError, self.xavp.get, '$xavp(otro[0]=>whatever)')
def test_get_wrong_nindx(self):
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[10]=>koko)')
self.assertRaises(
IndexError, self.xavp.get, '$xavp(test[10]=>koko[1])')
def test_get_wrong_kindx(self):
self.assertRaises(IndexError, self.xavp.get, '$xavp(test[0]=>koko[1])')
self.assertRaises(
IndexError, self.xavp.get, '$xavp(test[1]=>koko[10])')
def test_get_value(self):
self.assertEqual(self.xavp.get('$xavp(test=>koko)'), 1)
self.assertEqual(self.xavp.get('$xavp(test[1]=>koko[1])'), 2)
self.assertEqual(self.xavp.get('$xavp(test[2]=>lola[0])'), 7)
def test_get_value_all(self):
self.assertItemsEqual(self.xavp.get('$xavp(test[1]=>koko[*])'), [1, 2])
class TestCheckFlowVars(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.check_ok = [
{'R0': {'$xavp(v0)': [{'k0': [1],
'k1': ['a', 'b', 'fuckthisshit']},
{'k0': [1, 2], 'k1': ['a', ]},
],
'fU': 'testpep',
}},
{'R1': {'$xavp(v0)': [{'k0': [1, 2]}]}},
]
self.scen_noxavp = [
{'R0': {'fU': 'testpep'}},
{'R1': {}},
]
self.scen = [
{'R0': {'$xavp(v0[0]=>k0[0])': 1,
'$xavp(v0[0]=>k1[0])': 'a',
'$xavp(v0[0]=>k1[2])': '^f',
'$var(no)': 'None',
'$xavp(nono=>koko)': 'None',
'$xavp(v0=>k10)': 'None',
'$xavp(v0[1]=>k0[1])': '\d+'}
},
{'R1': {'$xavp(v0[1]=>k0[0])': 1}},
]
def testXAvp(self):
data = self.check_ok[0]['R0']['$xavp(v0)']
xavp = XAvp('$xavp(v0)', data)
self.assertEqual(xavp.get('$xavp(v0=>k0)'), 1)
self.assertEqual(
xavp.get('$xavp(v0=>k1[*])'), ['a', 'b', 'fuckthisshit'])
self.assertEqual(xavp.get('$xavp(v0[1]=>k0[1])'), 2)
self.assertEqual(xavp.get('$xavp(v0[1]=>k1[*])'), ['a'])
def testFlow_noxavp(self):
check_flow(self.scen_noxavp, self.check_ok, self.ctest)
self.assertFalse(self.ctest.isError())
def testFlowVars_noxavp(self):
check_flow_vars(
'RO', self.scen_noxavp[0]['R0'],
self.check_ok[0]['R0'], self.ctest)
# print self.ctest
self.assertFalse(self.ctest.isError())
def testFlowVars_xavp(self):
check_flow_vars(
'RO', self.scen[0]['R0'],
self.check_ok[0]['R0'], self.ctest)
# print self.ctest
self.assertFalse(self.ctest.isError())
class TestCheckSipIn(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.msg = open('tests/fixtures/sip_in.txt', 'r').read()
def testSipIn(self):
sip_in = yaml.load(open('tests/fixtures/sip_in_test.yml', 'r'))
check_sip(sip_in, self.msg, self.ctest)
# print self.ctest
self.assertFalse(self.ctest.isError())
class TestCheckSipOut(unittest.TestCase):
def setUp(self):
self.ctest = Test()
self.msg = yaml.load(open('tests/fixtures/sip_out.yml', 'r'))
def testSipOut(self):
sip_out = yaml.load(open('tests/fixtures/sip_out_test.yml', 'r'))
check_sip_out(sip_out, self.msg, self.ctest)
# print self.ctest
self.assertFalse(self.ctest.isError())
if __name__ == '__main__':
unittest.main()
Loading…
Cancel
Save