You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rate-o-mat/t/rateomat-91-cdr-insert-para...

229 lines
8.6 KiB

use strict;
use warnings;
use threads qw();
use Thread::Queue qw();
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__));
use Utils::Api qw();
use Utils::Rateomat qw();
use Test::More;
### testcase outline:
### test to check if cdr ids generated by concurrent bulk
### inserts form a consecutive sequence each
my $init_secs = 60;
my $follow_secs = 30;
my $number_of_callers = 1;
my $number_of_callees = 1;
my @number_of_cdrs = ( 1, 10, 500, 1000, 5000 ); #per thread
my $iterations_per_thread = 100;
my @call_durations = ( 3, 5, 10, 30, 60, 120, 200, 600 );
my $num_of_threads = 4;
my $date = Utils::Api::get_now()->ymd();
my $t = Utils::Api::datetime_from_string($date . ' 07:30:00');
Utils::Api::set_time($t);
my $provider = create_provider('test.com');
my $profile = $provider->{subscriber_fees}->[0]->{profile};
my $balance = 0;
my @callers = map { Utils::Api::setup_subscriber($provider,$profile,$balance,{ cc => 888, ac => $_.'<n>', sn => '<t>' }); } (1..$number_of_callers);
my @callees = map { Utils::Api::setup_subscriber($provider,$profile,$balance,{ cc => 989, ac => $_.'<n>', sn => '<t>' }); } (1..$number_of_callees);
my $thread_sleep_secs = 0.1;
{
my $queue = Thread::Queue->new();
my %workers = ();
for (my $i = 0; $i < $num_of_threads; $i++) {
my $processor = threads->create(sub {
foreach (1..$iterations_per_thread) {
my $label = '[' . threads->tid() . ']';
my $number_of_cdrs = $call_durations[int(rand(scalar @number_of_cdrs))];
my $results = {};
(my $cols,my $vals,$results->{call_ids}) = create_test_set($number_of_cdrs);
my $ary_vals = [ map { @{$_}; } @$vals ];
my $dbh;
eval {
$dbh = Utils::Rateomat::_connect_accounting_db();
my $sth = get_sth($dbh,'cdr',$cols,$number_of_cdrs);
my $t1 = time();
$sth->execute(@$ary_vals);
$results->{first} = $dbh->last_insert_id('accounting','accounting','cdr','id');
diag("$label: $number_of_cdrs inserted (" . (time() - $t1) . ' secs)');
$sth->finish();
};
if ($@) {
diag($@);
}
Utils::Rateomat::_disconnect_db($dbh);
$queue->enqueue($results);
}
});
ok(defined $processor,'inserter thread ' . ($i + 1) . " of $num_of_threads started, tid " . $processor->tid());
$workers{$processor->tid()} = $processor;
}
my $tester = threads->create(sub {
my $label = '[' . threads->tid() . ']';
while (defined(my $results = $queue->dequeue_timed(20))) {
my @got; my @expected;
my $dbh;
eval {
$dbh = Utils::Rateomat::_connect_accounting_db();
my $sth = $dbh->prepare('SELECT id FROM cdr WHERE call_id = ?');
my $offset = 0;
foreach my $call_id (@{$results->{call_ids}}) {
$sth->execute($call_id);
push(@got,$sth->fetchrow_array());
push(@expected,$results->{first} + $offset);
$offset += 1;
}
$sth->finish();
};
if ($@) {
diag($@);
}
Utils::Rateomat::_disconnect_db($dbh);
is_deeply(\@got,\@expected,"$label: " . (scalar @got) . ' cdr ids ok');
}
});
ok(defined $tester,'tester thread started, tid ' . $tester->tid());
$workers{$tester->tid()} = $tester;
while ((scalar keys %workers) > 0) {
foreach my $worker (values %workers) {
if (defined $worker and $worker->is_joinable()) {
$worker->join();
delete $workers{$worker->tid()};
diag('thread tid ' . $worker->tid() . ' joined');
}
sleep($thread_sleep_secs);
}
}
is((scalar keys %workers), 0,'all threads joined');
}
done_testing();
exit;
sub get_sth {
my ($dbh,$table,$keys,$groups) = @_;
$groups //= 1;
my $tuple = ',(' . substr(',?' x scalar @$keys,1) . ')';
return $dbh->prepare('INSERT INTO ' . $table . ' (' .
join(',', @$keys) .
') VALUES ' . substr($tuple x $groups,1));
}
sub create_test_set {
my ($number_of_cdrs) = @_;
my @cdr_test_data = ();
my @keys;
my @call_ids = ();
foreach my $i (1..$number_of_cdrs) {
my $caller = $callers[int(rand $number_of_callers)];
my $callee = $callees[int(rand $number_of_callees)];
my %cdr = %{ Utils::Rateomat::prepare_cdr($caller->{subscriber},undef,$caller->{reseller},
$callee->{subscriber},undef,$callee->{reseller},
'192.168.0.1',$t->epoch + $i,$call_durations[int(rand scalar @call_durations )]) };
@keys = keys %cdr unless scalar @keys;
push(@cdr_test_data,[ @cdr{@keys} ]);
push(@call_ids,$cdr{call_id});
}
#diag("test set created");
return (\@keys,\@cdr_test_data,\@call_ids);
}
sub create_provider {
my $domain = shift;
return Utils::Api::setup_provider($domain,
[ #subscriber rates:
{ prepaid => 0,
fees => [{ #outgoing:
direction => 'out',
destination => '^8882.+',
onpeak_init_rate => 6,
onpeak_init_interval => $init_secs,
onpeak_follow_rate => 1,
onpeak_follow_interval => $follow_secs,
offpeak_init_rate => 6,
offpeak_init_interval => $init_secs,
offpeak_follow_rate => 1,
offpeak_follow_interval => $follow_secs,
},
{ #incoming:
direction => 'in',
destination => '.',
source => '^8881.+',
onpeak_init_rate => 5,
onpeak_init_interval => $init_secs,
onpeak_follow_rate => 1,
onpeak_follow_interval => $follow_secs,
offpeak_init_rate => 5,
offpeak_init_interval => $init_secs,
offpeak_follow_rate => 1,
offpeak_follow_interval => $follow_secs,
}]},
{ prepaid => 1,
fees => [{ #outgoing:
direction => 'out',
destination => '^8882.+',
onpeak_init_rate => 4,
onpeak_init_interval => $init_secs,
onpeak_follow_rate => 1,
onpeak_follow_interval => $follow_secs,
offpeak_init_rate => 4,
offpeak_init_interval => $init_secs,
offpeak_follow_rate => 1,
offpeak_follow_interval => $follow_secs,
},
{ #incoming:
direction => 'in',
destination => '.',
source => '^8881.+',
onpeak_init_rate => 3,
onpeak_init_interval => $init_secs,
onpeak_follow_rate => 1,
onpeak_follow_interval => $follow_secs,
offpeak_init_rate => 3,
offpeak_init_interval => $init_secs,
offpeak_follow_rate => 1,
offpeak_follow_interval => $follow_secs,
}]},
],
undef, # no billing networks in this test suite
# provider rate:
{ prepaid => 0,
fees => [{ #outgoing:
direction => 'out',
destination => '^888.+',
onpeak_init_rate => 2,
onpeak_init_interval => $init_secs,
onpeak_follow_rate => 1,
onpeak_follow_interval => $follow_secs,
offpeak_init_rate => 2,
offpeak_init_interval => $init_secs,
offpeak_follow_rate => 1,
offpeak_follow_interval => $follow_secs,
},
{ #incoming:
direction => 'in',
destination => '.',
source => '^888.+',
onpeak_init_rate => 1,
onpeak_init_interval => $init_secs,
onpeak_follow_rate => 1,
onpeak_follow_interval => $follow_secs,
offpeak_init_rate => 1,
offpeak_init_interval => $init_secs,
offpeak_follow_rate => 1,
offpeak_follow_interval => $follow_secs,
}]},
);
}