You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
229 lines
8.6 KiB
229 lines
8.6 KiB
|
|
use strict;
|
|
use warnings;
|
|
|
|
use threads qw();
|
|
use Thread::Queue qw();
|
|
|
|
use File::Basename;
|
|
use Cwd;
|
|
use lib Cwd::abs_path(File::Basename::dirname(__FILE__));
|
|
|
|
use Utils::Api qw();
|
|
use Utils::Rateomat qw();
|
|
use Test::More;
|
|
|
|
### testcase outline:
|
|
### test to check if cdr ids generated by concurrent bulk
|
|
### inserts form a consecutive sequence each
|
|
|
|
my $init_secs = 60;
|
|
my $follow_secs = 30;
|
|
|
|
my $number_of_callers = 1;
|
|
my $number_of_callees = 1;
|
|
my @number_of_cdrs = ( 1, 10, 500, 1000, 5000 ); #per thread
|
|
my $iterations_per_thread = 100;
|
|
my @call_durations = ( 3, 5, 10, 30, 60, 120, 200, 600 );
|
|
my $num_of_threads = 4;
|
|
|
|
my $date = Utils::Api::get_now()->ymd();
|
|
my $t = Utils::Api::datetime_from_string($date . ' 07:30:00');
|
|
Utils::Api::set_time($t);
|
|
my $provider = create_provider('test.com');
|
|
my $profile = $provider->{subscriber_fees}->[0]->{profile};
|
|
my $balance = 0;
|
|
my @callers = map { Utils::Api::setup_subscriber($provider,$profile,$balance,{ cc => 888, ac => $_.'<n>', sn => '<t>' }); } (1..$number_of_callers);
|
|
my @callees = map { Utils::Api::setup_subscriber($provider,$profile,$balance,{ cc => 989, ac => $_.'<n>', sn => '<t>' }); } (1..$number_of_callees);
|
|
my $thread_sleep_secs = 0.1;
|
|
|
|
{
|
|
my $queue = Thread::Queue->new();
|
|
my %workers = ();
|
|
for (my $i = 0; $i < $num_of_threads; $i++) {
|
|
my $processor = threads->create(sub {
|
|
foreach (1..$iterations_per_thread) {
|
|
my $label = '[' . threads->tid() . ']';
|
|
my $number_of_cdrs = $call_durations[int(rand(scalar @number_of_cdrs))];
|
|
my $results = {};
|
|
(my $cols,my $vals,$results->{call_ids}) = create_test_set($number_of_cdrs);
|
|
my $ary_vals = [ map { @{$_}; } @$vals ];
|
|
my $dbh;
|
|
eval {
|
|
$dbh = Utils::Rateomat::_connect_accounting_db();
|
|
my $sth = get_sth($dbh,'cdr',$cols,$number_of_cdrs);
|
|
my $t1 = time();
|
|
$sth->execute(@$ary_vals);
|
|
$results->{first} = $dbh->last_insert_id('accounting','accounting','cdr','id');
|
|
diag("$label: $number_of_cdrs inserted (" . (time() - $t1) . ' secs)');
|
|
$sth->finish();
|
|
};
|
|
if ($@) {
|
|
diag($@);
|
|
}
|
|
Utils::Rateomat::_disconnect_db($dbh);
|
|
$queue->enqueue($results);
|
|
}
|
|
});
|
|
ok(defined $processor,'inserter thread ' . ($i + 1) . " of $num_of_threads started, tid " . $processor->tid());
|
|
$workers{$processor->tid()} = $processor;
|
|
}
|
|
my $tester = threads->create(sub {
|
|
my $label = '[' . threads->tid() . ']';
|
|
while (defined(my $results = $queue->dequeue_timed(20))) {
|
|
my @got; my @expected;
|
|
my $dbh;
|
|
eval {
|
|
$dbh = Utils::Rateomat::_connect_accounting_db();
|
|
my $sth = $dbh->prepare('SELECT id FROM cdr WHERE call_id = ?');
|
|
my $offset = 0;
|
|
foreach my $call_id (@{$results->{call_ids}}) {
|
|
$sth->execute($call_id);
|
|
push(@got,$sth->fetchrow_array());
|
|
push(@expected,$results->{first} + $offset);
|
|
$offset += 1;
|
|
}
|
|
$sth->finish();
|
|
};
|
|
if ($@) {
|
|
diag($@);
|
|
}
|
|
Utils::Rateomat::_disconnect_db($dbh);
|
|
is_deeply(\@got,\@expected,"$label: " . (scalar @got) . ' cdr ids ok');
|
|
}
|
|
});
|
|
ok(defined $tester,'tester thread started, tid ' . $tester->tid());
|
|
$workers{$tester->tid()} = $tester;
|
|
while ((scalar keys %workers) > 0) {
|
|
foreach my $worker (values %workers) {
|
|
if (defined $worker and $worker->is_joinable()) {
|
|
$worker->join();
|
|
delete $workers{$worker->tid()};
|
|
diag('thread tid ' . $worker->tid() . ' joined');
|
|
}
|
|
sleep($thread_sleep_secs);
|
|
}
|
|
}
|
|
is((scalar keys %workers), 0,'all threads joined');
|
|
|
|
}
|
|
|
|
done_testing();
|
|
exit;
|
|
|
|
sub get_sth {
|
|
my ($dbh,$table,$keys,$groups) = @_;
|
|
$groups //= 1;
|
|
my $tuple = ',(' . substr(',?' x scalar @$keys,1) . ')';
|
|
return $dbh->prepare('INSERT INTO ' . $table . ' (' .
|
|
join(',', @$keys) .
|
|
') VALUES ' . substr($tuple x $groups,1));
|
|
}
|
|
|
|
sub create_test_set {
|
|
my ($number_of_cdrs) = @_;
|
|
my @cdr_test_data = ();
|
|
my @keys;
|
|
my @call_ids = ();
|
|
foreach my $i (1..$number_of_cdrs) {
|
|
my $caller = $callers[int(rand $number_of_callers)];
|
|
my $callee = $callees[int(rand $number_of_callees)];
|
|
my %cdr = %{ Utils::Rateomat::prepare_cdr($caller->{subscriber},undef,$caller->{reseller},
|
|
$callee->{subscriber},undef,$callee->{reseller},
|
|
'192.168.0.1',$t->epoch + $i,$call_durations[int(rand scalar @call_durations )]) };
|
|
@keys = keys %cdr unless scalar @keys;
|
|
push(@cdr_test_data,[ @cdr{@keys} ]);
|
|
push(@call_ids,$cdr{call_id});
|
|
}
|
|
#diag("test set created");
|
|
return (\@keys,\@cdr_test_data,\@call_ids);
|
|
}
|
|
|
|
sub create_provider {
|
|
my $domain = shift;
|
|
return Utils::Api::setup_provider($domain,
|
|
[ #subscriber rates:
|
|
{ prepaid => 0,
|
|
fees => [{ #outgoing:
|
|
direction => 'out',
|
|
destination => '^8882.+',
|
|
onpeak_init_rate => 6,
|
|
onpeak_init_interval => $init_secs,
|
|
onpeak_follow_rate => 1,
|
|
onpeak_follow_interval => $follow_secs,
|
|
offpeak_init_rate => 6,
|
|
offpeak_init_interval => $init_secs,
|
|
offpeak_follow_rate => 1,
|
|
offpeak_follow_interval => $follow_secs,
|
|
},
|
|
{ #incoming:
|
|
direction => 'in',
|
|
destination => '.',
|
|
source => '^8881.+',
|
|
onpeak_init_rate => 5,
|
|
onpeak_init_interval => $init_secs,
|
|
onpeak_follow_rate => 1,
|
|
onpeak_follow_interval => $follow_secs,
|
|
offpeak_init_rate => 5,
|
|
offpeak_init_interval => $init_secs,
|
|
offpeak_follow_rate => 1,
|
|
offpeak_follow_interval => $follow_secs,
|
|
}]},
|
|
{ prepaid => 1,
|
|
fees => [{ #outgoing:
|
|
direction => 'out',
|
|
destination => '^8882.+',
|
|
onpeak_init_rate => 4,
|
|
onpeak_init_interval => $init_secs,
|
|
onpeak_follow_rate => 1,
|
|
onpeak_follow_interval => $follow_secs,
|
|
offpeak_init_rate => 4,
|
|
offpeak_init_interval => $init_secs,
|
|
offpeak_follow_rate => 1,
|
|
offpeak_follow_interval => $follow_secs,
|
|
},
|
|
{ #incoming:
|
|
direction => 'in',
|
|
destination => '.',
|
|
source => '^8881.+',
|
|
onpeak_init_rate => 3,
|
|
onpeak_init_interval => $init_secs,
|
|
onpeak_follow_rate => 1,
|
|
onpeak_follow_interval => $follow_secs,
|
|
offpeak_init_rate => 3,
|
|
offpeak_init_interval => $init_secs,
|
|
offpeak_follow_rate => 1,
|
|
offpeak_follow_interval => $follow_secs,
|
|
}]},
|
|
],
|
|
undef, # no billing networks in this test suite
|
|
# provider rate:
|
|
{ prepaid => 0,
|
|
fees => [{ #outgoing:
|
|
direction => 'out',
|
|
destination => '^888.+',
|
|
onpeak_init_rate => 2,
|
|
onpeak_init_interval => $init_secs,
|
|
onpeak_follow_rate => 1,
|
|
onpeak_follow_interval => $follow_secs,
|
|
offpeak_init_rate => 2,
|
|
offpeak_init_interval => $init_secs,
|
|
offpeak_follow_rate => 1,
|
|
offpeak_follow_interval => $follow_secs,
|
|
},
|
|
{ #incoming:
|
|
direction => 'in',
|
|
destination => '.',
|
|
source => '^888.+',
|
|
onpeak_init_rate => 1,
|
|
onpeak_init_interval => $init_secs,
|
|
onpeak_follow_rate => 1,
|
|
onpeak_follow_interval => $follow_secs,
|
|
offpeak_init_rate => 1,
|
|
offpeak_init_interval => $init_secs,
|
|
offpeak_follow_rate => 1,
|
|
offpeak_follow_interval => $follow_secs,
|
|
}]},
|
|
);
|
|
}
|