use strict; use warnings; use threads qw(); use Thread::Queue qw(); use File::Basename; use Cwd; use lib Cwd::abs_path(File::Basename::dirname(__FILE__)); use Utils::Api qw(); use Utils::Rateomat qw(); use Test::More; ### testcase outline: ### test to check if cdr ids generated by concurrent bulk ### inserts form a consecutive sequence each my $init_secs = 60; my $follow_secs = 30; my $number_of_callers = 1; my $number_of_callees = 1; my @number_of_cdrs = ( 1, 10, 500, 1000, 5000 ); #per thread my $iterations_per_thread = 100; my @call_durations = ( 3, 5, 10, 30, 60, 120, 200, 600 ); my $num_of_threads = 4; my $date = Utils::Api::get_now()->ymd(); my $t = Utils::Api::datetime_from_string($date . ' 07:30:00'); Utils::Api::set_time($t); my $provider = create_provider('test.com'); my $profile = $provider->{subscriber_fees}->[0]->{profile}; my $balance = 0; my @callers = map { Utils::Api::setup_subscriber($provider,$profile,$balance,{ cc => 888, ac => $_.'', sn => '' }); } (1..$number_of_callers); my @callees = map { Utils::Api::setup_subscriber($provider,$profile,$balance,{ cc => 989, ac => $_.'', sn => '' }); } (1..$number_of_callees); my $thread_sleep_secs = 0.1; { my $queue = Thread::Queue->new(); my %workers = (); for (my $i = 0; $i < $num_of_threads; $i++) { my $processor = threads->create(sub { foreach (1..$iterations_per_thread) { my $label = '[' . threads->tid() . ']'; my $number_of_cdrs = $call_durations[int(rand(scalar @number_of_cdrs))]; my $results = {}; (my $cols,my $vals,$results->{call_ids}) = create_test_set($number_of_cdrs); my $ary_vals = [ map { @{$_}; } @$vals ]; my $dbh; eval { $dbh = Utils::Rateomat::_connect_accounting_db(); my $sth = get_sth($dbh,'cdr',$cols,$number_of_cdrs); my $t1 = time(); $sth->execute(@$ary_vals); $results->{first} = $dbh->last_insert_id('accounting','accounting','cdr','id'); diag("$label: $number_of_cdrs inserted (" . (time() - $t1) . ' secs)'); $sth->finish(); }; if ($@) { diag($@); } Utils::Rateomat::_disconnect_db($dbh); $queue->enqueue($results); } }); ok(defined $processor,'inserter thread ' . ($i + 1) . " of $num_of_threads started, tid " . $processor->tid()); $workers{$processor->tid()} = $processor; } my $tester = threads->create(sub { my $label = '[' . threads->tid() . ']'; while (defined(my $results = $queue->dequeue_timed(20))) { my @got; my @expected; my $dbh; eval { $dbh = Utils::Rateomat::_connect_accounting_db(); my $sth = $dbh->prepare('SELECT id FROM cdr WHERE call_id = ?'); my $offset = 0; foreach my $call_id (@{$results->{call_ids}}) { $sth->execute($call_id); push(@got,$sth->fetchrow_array()); push(@expected,$results->{first} + $offset); $offset += 1; } $sth->finish(); }; if ($@) { diag($@); } Utils::Rateomat::_disconnect_db($dbh); is_deeply(\@got,\@expected,"$label: " . (scalar @got) . ' cdr ids ok'); } }); ok(defined $tester,'tester thread started, tid ' . $tester->tid()); $workers{$tester->tid()} = $tester; while ((scalar keys %workers) > 0) { foreach my $worker (values %workers) { if (defined $worker and $worker->is_joinable()) { $worker->join(); delete $workers{$worker->tid()}; diag('thread tid ' . $worker->tid() . ' joined'); } sleep($thread_sleep_secs); } } is((scalar keys %workers), 0,'all threads joined'); } done_testing(); exit; sub get_sth { my ($dbh,$table,$keys,$groups) = @_; $groups //= 1; my $tuple = ',(' . substr(',?' x scalar @$keys,1) . ')'; return $dbh->prepare('INSERT INTO ' . $table . ' (' . join(',', @$keys) . ') VALUES ' . substr($tuple x $groups,1)); } sub create_test_set { my ($number_of_cdrs) = @_; my @cdr_test_data = (); my @keys; my @call_ids = (); foreach my $i (1..$number_of_cdrs) { my $caller = $callers[int(rand $number_of_callers)]; my $callee = $callees[int(rand $number_of_callees)]; my %cdr = %{ Utils::Rateomat::prepare_cdr($caller->{subscriber},undef,$caller->{reseller}, $callee->{subscriber},undef,$callee->{reseller}, '192.168.0.1',$t->epoch + $i,$call_durations[int(rand scalar @call_durations )]) }; @keys = keys %cdr unless scalar @keys; push(@cdr_test_data,[ @cdr{@keys} ]); push(@call_ids,$cdr{call_id}); } #diag("test set created"); return (\@keys,\@cdr_test_data,\@call_ids); } sub create_provider { my $domain = shift; return Utils::Api::setup_provider($domain, [ #subscriber rates: { prepaid => 0, fees => [{ #outgoing: direction => 'out', destination => '^8882.+', onpeak_init_rate => 6, onpeak_init_interval => $init_secs, onpeak_follow_rate => 1, onpeak_follow_interval => $follow_secs, offpeak_init_rate => 6, offpeak_init_interval => $init_secs, offpeak_follow_rate => 1, offpeak_follow_interval => $follow_secs, }, { #incoming: direction => 'in', destination => '.', source => '^8881.+', onpeak_init_rate => 5, onpeak_init_interval => $init_secs, onpeak_follow_rate => 1, onpeak_follow_interval => $follow_secs, offpeak_init_rate => 5, offpeak_init_interval => $init_secs, offpeak_follow_rate => 1, offpeak_follow_interval => $follow_secs, }]}, { prepaid => 1, fees => [{ #outgoing: direction => 'out', destination => '^8882.+', onpeak_init_rate => 4, onpeak_init_interval => $init_secs, onpeak_follow_rate => 1, onpeak_follow_interval => $follow_secs, offpeak_init_rate => 4, offpeak_init_interval => $init_secs, offpeak_follow_rate => 1, offpeak_follow_interval => $follow_secs, }, { #incoming: direction => 'in', destination => '.', source => '^8881.+', onpeak_init_rate => 3, onpeak_init_interval => $init_secs, onpeak_follow_rate => 1, onpeak_follow_interval => $follow_secs, offpeak_init_rate => 3, offpeak_init_interval => $init_secs, offpeak_follow_rate => 1, offpeak_follow_interval => $follow_secs, }]}, ], undef, # no billing networks in this test suite # provider rate: { prepaid => 0, fees => [{ #outgoing: direction => 'out', destination => '^888.+', onpeak_init_rate => 2, onpeak_init_interval => $init_secs, onpeak_follow_rate => 1, onpeak_follow_interval => $follow_secs, offpeak_init_rate => 2, offpeak_init_interval => $init_secs, offpeak_follow_rate => 1, offpeak_follow_interval => $follow_secs, }, { #incoming: direction => 'in', destination => '.', source => '^888.+', onpeak_init_rate => 1, onpeak_init_interval => $init_secs, onpeak_follow_rate => 1, onpeak_follow_interval => $follow_secs, offpeak_init_rate => 1, offpeak_init_interval => $init_secs, offpeak_follow_rate => 1, offpeak_follow_interval => $follow_secs, }]}, ); }