TT#26466 cdr test data generator

Change-Id: I456da4465daa1df3acac717afddb0872003b1d65
changes/40/17640/3
Rene Krenn 8 years ago
parent 5ab47b7ba7
commit b797a9bacc

@ -14,7 +14,8 @@ use NGCP::BulkProcessor::ConnectorPool qw(
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
update_record
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
@ -25,7 +26,11 @@ our @EXPORT_OK = qw(
gettablename
check_table
update_row
insert_row
delete_callids
countby_ratingstatus
);
#process_records
@ -164,6 +169,90 @@ sub delete_callids {
}
sub countby_ratingstatus {
my ($rating_status) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
my @terms = ();
if (defined $rating_status) {
push(@terms,$db->columnidentifier('rating_status') . ' = ?');
push(@params,$rating_status);
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
return $db->db_get_value($stmt,@params);
}
sub update_row {
my ($xa_db,$data) = @_;
check_table();
return update_record($get_db,$xa_db,__PACKAGE__,$data);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
#my %params = @_;
#my ($contract_id,
# $domain_id,
# $username,
# $uuid) = @params{qw/
# contract_id
# domain_id
# username
# uuid
# /};
#
#if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
# $db->columnidentifier('contact_id') . ', ' .
# $db->columnidentifier('contract_id') . ', ' .
# $db->columnidentifier('domain_id') . ', ' .
# $db->columnidentifier('external_id') . ', ' .
# $db->columnidentifier('primary_number_id') . ', ' .
# $db->columnidentifier('status') . ', ' .
# $db->columnidentifier('username') . ', ' .
# $db->columnidentifier('uuid') . ') VALUES (' .
# 'NULL, ' .
# '?, ' .
# '?, ' .
# 'NULL, ' .
# 'NULL, ' .
# '\'' . $ACTIVE_STATE . '\', ' .
# '?, ' .
# '?)',
# $contract_id,
# $domain_id,
# $username,
# $uuid,
# )) {
# rowinserted($db,$tablename,getlogger(__PACKAGE__));
# return $xa_db->db_last_insert_id();
#}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;

@ -29,6 +29,7 @@ our @EXPORT_OK = qw(
update_row
findby_reselleridfields
findby_id
);
my $tablename = 'contacts';
@ -106,6 +107,23 @@ sub findby_reselleridfields {
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub update_row {
my ($xa_db,$data) = @_;

@ -31,6 +31,7 @@ our @EXPORT_OK = qw(
countby_status_resellerid
findby_contactid
findby_id
process_records
@ -99,6 +100,23 @@ sub findby_contactid {
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub countby_status_resellerid {
my ($status,$reseller_id) = @_;
@ -116,8 +134,13 @@ sub countby_status_resellerid {
push(@params,$status);
}
if ($reseller_id) {
push(@terms,'contact.reseller_id = ?');
push(@params,$reseller_id);
if ('ARRAY' eq ref $reseller_id) {
push(@terms,'contact.reseller_id IN (' . substr(',?' x scalar @$reseller_id,1) . ')');
push(@params,@$reseller_id);
} else {
push(@terms,'contact.reseller_id = ?');
push(@params,$reseller_id);
}
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);

@ -22,6 +22,7 @@ our @EXPORT_OK = qw(
findby_domain
findby_id
findall
);
my $tablename = 'domains';
@ -46,6 +47,21 @@ sub new {
}
sub findall {
my ($load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my $rows = $db->db_get_all_arrayref($stmt);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_domain {
my ($domain,$load_recursive) = @_;

@ -22,6 +22,7 @@ our @EXPORT_OK = qw(
findby_name
findby_id
findall
);
my $tablename = 'resellers';
@ -48,6 +49,21 @@ sub new {
}
sub findall {
my ($load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my $rows = $db->db_get_all_arrayref($stmt);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_name {
my ($name,$load_recursive) = @_;

@ -34,6 +34,8 @@ our @EXPORT_OK = qw(
forupdate_cc_ac_sn_subscriberid
release_subscriber_numbers
countby_ccacsn
$ACTIVE_STATE
);
@ -89,6 +91,25 @@ sub findby_subscriberid {
}
sub countby_ccacsn {
my ($xa_db,$cc,$ac,$sn) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE ' .
$db->columnidentifier('cc') . ' = ?' .
' AND ' . $db->columnidentifier('ac') . ' = ?' .
' AND ' . $db->columnidentifier('sn') . ' = ?';
my @params = ($cc // '',$ac // '',$sn // '');
return $db->db_get_value($stmt,@params);
}
sub forupdate_cc_ac_sn_subscriberid {
my ($xa_db,$cc,$ac,$sn,$subscriber_id,$load_recursive) = @_;

@ -16,6 +16,7 @@ use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
update_record
process_table
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
@ -33,6 +34,9 @@ our @EXPORT_OK = qw(
findby_domainid_username_states
countby_status_resellerid
process_records
find_minmaxid
find_random
$TERMINATED_STATE
$ACTIVE_STATE
@ -101,6 +105,103 @@ sub findby_domainid_username_states {
}
sub find_minmaxid {
my ($xa_db,$states,$reseller_id) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my @ids = ();
foreach my $func ('MIN','MAX') {
my @params = ();
my $stmt = 'SELECT ' . $func . '(r1.id) FROM ' . $table . ' AS r1';
if ($reseller_id) {
$stmt .= ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::contracts::gettablename()) . ' AS contract ON r1.contract_id = contract.id' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::contacts::gettablename()) . ' AS contact ON contract.contact_id = contact.id';
}
$stmt .= ' WHERE 1=1';
if ($reseller_id) {
if ('ARRAY' eq ref $reseller_id) {
$stmt .= ' AND contact.reseller_id IN (' . substr(',?' x scalar @$reseller_id,1) . ')';
push(@params,@$reseller_id);
} else {
$stmt .= ' AND contact.reseller_id = ?';
push(@params,$reseller_id);
}
}
if (defined $states and 'HASH' eq ref $states) {
foreach my $in (keys %$states) {
my @values = (defined $states->{$in} and 'ARRAY' eq ref $states->{$in} ? @{$states->{$in}} : ($states->{$in}));
$stmt .= ' AND r1.status ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
push(@params,@values);
}
} elsif (defined $states and length($states) > 0) {
$stmt .= ' AND r1.status = ?';
push(@params,$states);
}
push(@ids,$db->db_get_value($stmt,@params));
}
return @ids;
}
sub find_random {
my ($xa_db,$excluding_id,$states,$reseller_id,$min_id,$max_id,$load_recursive) = @_;
if (not defined $min_id or not defined $max_id) {
($min_id,$max_id) = find_minmaxid($xa_db,$states,$reseller_id);
}
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT r1.* FROM ' . $table . ' AS r1' .
' JOIN (SELECT ? + RAND() * ? AS id) AS r2';
my @params = ();
push(@params,$min_id,$max_id - $min_id);
if ($reseller_id) {
$stmt .= ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::contracts::gettablename()) . ' AS contract ON r1.contract_id = contract.id' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::contacts::gettablename()) . ' AS contact ON contract.contact_id = contact.id';
}
$stmt .= ' WHERE r1.id >= r2.id';
if (defined $states and 'HASH' eq ref $states) {
foreach my $in (keys %$states) {
my @values = (defined $states->{$in} and 'ARRAY' eq ref $states->{$in} ? @{$states->{$in}} : ($states->{$in}));
$stmt .= ' AND r1.status ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
push(@params,@values);
}
} elsif (defined $states and length($states) > 0) {
$stmt .= ' AND r1.status = ?';
push(@params,$states);
}
if (defined $excluding_id) {
$stmt .= ' AND r1.id != ?';
push(@params,$excluding_id);
}
if ($reseller_id) {
if ('ARRAY' eq ref $reseller_id) {
$stmt .= ' AND contact.reseller_id IN (' . substr(',?' x scalar @$reseller_id,1) . ')';
push(@params,@$reseller_id);
} else {
$stmt .= ' AND contact.reseller_id = ?';
push(@params,$reseller_id);
}
}
$stmt .= ' ORDER BY r1.id ASC LIMIT 1';
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub countby_status_resellerid {
my ($status,$reseller_id) = @_;
@ -119,8 +220,13 @@ sub countby_status_resellerid {
push(@params,$status);
}
if ($reseller_id) {
push(@terms,'contact.reseller_id = ?');
push(@params,$reseller_id);
if ('ARRAY' eq ref $reseller_id) {
push(@terms,'contact.reseller_id IN (' . substr(',?' x scalar @$reseller_id,1) . ')');
push(@params,@$reseller_id);
} else {
push(@terms,'contact.reseller_id = ?');
push(@params,$reseller_id);
}
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
@ -191,6 +297,50 @@ sub insert_row {
}
sub process_records {
my %params = @_;
my ($process_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$blocksize,
$numofthreads,
$load_recursive) = @params{qw/
process_code
static_context
init_process_context_code
uninit_process_context_code
multithreading
blocksize
numofthreads
load_recursive
/};
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return process_table(
get_db => $get_db,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset);
},
static_context => $static_context,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_dbs_code => \&destroy_dbs,
multithreading => $multithreading,
blocksize => $blocksize,
tableprocessing_threads => $numofthreads,
'select' => 'SELECT * FROM ' . $table . ' WHERE ' . $db->columnidentifier('status') . ' != "' . $TERMINATED_STATE . '"',
'selectcount' => 'SELECT COUNT(*) FROM ' . $table . ' WHERE ' . $db->columnidentifier('status') . ' != "' . $TERMINATED_STATE . '"',
);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;

@ -32,6 +32,7 @@ our @EXPORT_OK = qw(
findby_subscriberid_username
findby_domainid_username
countby_subscriberidisprimary
findby_subscriberidisprimary
);
my $tablename = 'voip_dbaliases';
@ -103,6 +104,24 @@ sub findby_domainid_username {
}
sub findby_subscriberidisprimary {
my ($subscriber_id,$is_primary,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('subscriber_id') . ' = ?' .
' AND ' . $db->columnidentifier('is_primary') . ' = ?';
my @params = ($subscriber_id,$is_primary);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub countby_subscriberidisprimary {
my ($subscriber_id,$is_primary) = @_;

@ -57,7 +57,29 @@ sub setup_provider {
type
/};
my $provider = {};
if (not _load_provider($provider,$reseller_name,$domain_name) and not $dry) {
$provider->{reseller} = _find_entity('NGCP::BulkProcessor::RestRequests::Trunk::Resellers',
name => $reseller_name,
);
my $new_reseller = 0;
if (defined $provider->{reseller}) {
_info("reseller '$reseller_name' found");
$provider->{contract} = NGCP::BulkProcessor::RestRequests::Trunk::Contracts::get_item($provider->{reseller}->{contract_id});
if (defined $provider->{contract}) {
_info("contract ID $provider->{reseller}->{contract_id} found");
} else {
_info("contract ID $provider->{reseller}->{contract_id} not found");
return undef;
}
$provider->{contact} = NGCP::BulkProcessor::RestRequests::Trunk::SystemContacts::get_item($provider->{contract}->{contact_id});
if (defined $provider->{contact}) {
_info("contact ID $provider->{contract}->{contact_id} found");
} else {
_info("contact ID $provider->{contract}->{contact_id} not found");
return undef;
}
} elsif (not $dry) {
$provider->{contact} = _create_systemcontact();
_info("contact ID $provider->{contact}->{id} created");
$provider->{contract} = _create_contract(
@ -71,6 +93,40 @@ sub setup_provider {
name => $reseller_name, #"test <t> <n>",
);
_info("reseller '$reseller_name' created");
$new_reseller = 1;
} else {
_info("reseller '$reseller_name' not found");
return undef;
}
$provider->{domain} = _find_entity('NGCP::BulkProcessor::RestRequests::Trunk::Domains',
domain => $domain_name,
);
if (defined $provider->{domain}) {
_info("domain '$domain_name' found");
} elsif (not $dry) {
$provider->{domain} = _create_domain(
reseller_id => $provider->{reseller}->{id},
#domain => $domain_name.'.<t>',
domain => $domain_name,
);
_info("domain '$domain_name' created");
} else {
_info("domain '$domain_name' not found");
return undef;
}
my $provider_profile = NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::get_item($provider->{contract}->{billing_profile_id});
if (not $new_reseller and defined $provider_profile) {
_info("provider billing profile ID $provider_profile->{id} found");
my $profile_fee = {};
($profile_fee->{profile},
$profile_fee->{zone},
$profile_fee->{fee},
$profile_fee->{fees}) = _load_fees($provider_profile);
$provider->{profile} = $profile_fee->{profile};
$provider->{provider_fee} = $profile_fee;
} elsif (not $dry) {
if (defined $provider_rate) {
my $profile_fee = {};
($profile_fee->{profile},
@ -87,91 +143,41 @@ sub setup_provider {
);
_info("contract ID $provider->{contract}->{id} updated");
}
$provider->{domain} = _create_domain(
reseller_id => $provider->{reseller}->{id},
#domain => $domain_name.'.<t>',
domain => $domain_name,
);
_info("domain '$domain_name' created");
$provider->{subscriber_fees} = [];
foreach my $rate (@$subscriber_rates) {
my $profile_fee = {};
($profile_fee->{profile},
$profile_fee->{zone},
$profile_fee->{fee},
$profile_fee->{fees}) = _setup_fees($provider->{reseller},
%$rate
);
push(@{$provider->{subscriber_fees}},$profile_fee);
}
}
return $provider;
}
sub _load_provider {
my ($provider,$reseller_name,$domain_name) = @_;
$provider->{reseller} = _find_entity('NGCP::BulkProcessor::RestRequests::Trunk::Resellers',
name => $reseller_name,
);
if (defined $provider->{reseller}) {
_info("reseller '$reseller_name' found");
} else {
return 0;
}
$provider->{contract} = NGCP::BulkProcessor::RestRequests::Trunk::Contracts::get_item($provider->{reseller}->{contract_id});
if (defined $provider->{contract}) {
_info("contract ID $provider->{reseller}->{contract_id} found");
} else {
return 0;
_info("provider billing profile ID $provider->{contract}->{billing_profile_id} not found");
return undef;
}
$provider->{contact} = NGCP::BulkProcessor::RestRequests::Trunk::SystemContacts::get_item($provider->{contract}->{contact_id});
if (defined $provider->{contact}) {
_info("contact ID $provider->{contract}->{contact_id} found");
} else {
return 0;
}
$provider->{domain} = _find_entity('NGCP::BulkProcessor::RestRequests::Trunk::Domains',
domain => $domain_name,
);
if (defined $provider->{domain}) {
_info("domain '$domain_name' found");
} else {
return 0;
$provider->{subscriber_fees} = [];
foreach my $subscriber_profile (@{NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::findby_resellerid($provider->{reseller}->{id})}) {
next if (defined $provider_profile and $provider_profile->{id} == $subscriber_profile->{id});
_info("subscriber billing profile ID $subscriber_profile->{id} found");
my $profile_fee = {};
($profile_fee->{profile},
$profile_fee->{zone},
$profile_fee->{fee},
$profile_fee->{fees}) = _load_fees($subscriber_profile);
push(@{$provider->{subscriber_fees}},$profile_fee);
}
my $provider_profile = NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::get_item($provider->{contract}->{billing_profile_id});
if (defined $provider_profile) {
_info("provider billing profile ID $provider_profile->{id} found");
}
if (defined $provider_profile) {
my $profile_fee = {};
if ((scalar @{$provider->{subscriber_fees}}) == 0 and defined $subscriber_rates and (scalar @$subscriber_rates) > 0) {
if (not $dry) {
foreach my $rate (@$subscriber_rates) {
my $profile_fee = {};
($profile_fee->{profile},
$profile_fee->{zone},
$profile_fee->{fee},
$profile_fee->{fees}) = _load_fees($provider_profile);
$provider->{profile} = $profile_fee->{profile};
$provider->{provider_fee} = $profile_fee;
$profile_fee->{fees}) = _setup_fees($provider->{reseller},
%$rate
);
push(@{$provider->{subscriber_fees}},$profile_fee);
}
} else {
_info("no subscriber billing profile(s) found");
return undef;
}
}
$provider->{subscriber_fees} = [];
foreach my $subscriber_profile (@{NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::findby_resellerid($provider->{reseller}->{id})}) {
next if (defined $provider_profile and $provider_profile->{id} == $subscriber_profile->{id});
_info("subscriber billing profile ID $subscriber_profile->{id} found");
my $profile_fee = {};
($profile_fee->{profile},
$profile_fee->{zone},
$profile_fee->{fee},
$profile_fee->{fees}) = _load_fees($subscriber_profile);
push(@{$provider->{subscriber_fees}},$profile_fee);
}
return 1;
return $provider;
}
sub _load_fees {

@ -0,0 +1,528 @@
package NGCP::BulkProcessor::Projects::Massive::Generator::CDR;
use strict;
## no critic
use threads::shared qw();
use Time::HiRes qw(sleep);
use String::MkPasswd qw();
#use List::Util qw();
use Data::Rmap qw();
use Tie::IxHash;
use NGCP::BulkProcessor::Globals qw(
$enablemultithreading
);
use NGCP::BulkProcessor::Projects::Massive::Generator::Settings qw(
$dry
$skip_errors
$deadlock_retries
$generate_cdr_multithreading
$generate_cdr_numofthreads
$generate_cdr_count
@providers
);
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
processing_debug
);
use NGCP::BulkProcessor::LogError qw(
rowprocessingerror
rowprocessingwarn
fileerror
);
use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
use NGCP::BulkProcessor::ConnectorPool qw(
get_xa_db
ping_dbs
destroy_dbs
);
use NGCP::BulkProcessor::Utils qw(threadid timestamp); # stringtobool check_ipnet trim);
#use NGCP::BulkProcessor::DSSorter qw(sort_by_configs);
#use NGCP::BulkProcessor::RandomString qw(createtmpstring);
use NGCP::BulkProcessor::Array qw(array_to_map);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
generate_cdrs
);
my $thread_sleep_secs = 0.1;
my $RUNNING = 1;
my $COMPLETED = 2;
my $ERROR = 4;
my $STOP = 8;
my $total_count :shared = 0;
my $t = time;
my %offnet_domain_subscriber_map : shared = ();
sub generate_cdrs {
my $context = {};
my $result = _generate_cdrs_create_context($context);
destroy_dbs();
if ($result) {
if ($enablemultithreading and $generate_cdr_multithreading and $generate_cdr_count > 1) {
$context->{cdr_count} = int($generate_cdr_count / $generate_cdr_numofthreads);
#$context->{sn_increment} = $generate_cdr_numofthreads;
my %processors = ();
for (my $i = 0; $i < $generate_cdr_numofthreads; $i++) {
$context->{cdr_count} += ($generate_cdr_count - $context->{cdr_count} * $generate_cdr_numofthreads) if $i == 0;
_info($context,'starting generator thread ' . ($i + 1) . ' of ' . $generate_cdr_numofthreads);
$context->{sn_offset} = $i;
my $processor = threads->create(\&_generate_cdr,$context);
if (!defined $processor) {
_info($context,'generator thread ' . ($i + 1) . ' of ' . $generate_cdr_numofthreads . ' NOT started');
}
$processors{$processor->tid()} = $processor;
}
local $SIG{'INT'} = sub {
_info($context,"interrupt signal received");
$result = 0;
lock $context->{errorstates};
$context->{errorstates}->{$context->{tid}} = $STOP;
};
while ((scalar keys %processors) > 0) {
foreach my $processor (values %processors) {
if (defined $processor and $processor->is_joinable()) {
$processor->join();
delete $processors{$processor->tid()};
_info($context,'generator thread tid ' . $processor->tid() . ' joined');
}
}
sleep($thread_sleep_secs);
}
$result &= (_get_threads_state($context->{errorstates},$context->{tid}) & $COMPLETED) == $COMPLETED;
} else {
$context->{cdr_count} = $generate_cdr_count;
#$context->{sn_increment} = 1;
#$context->{sn_offset} = 0;
local $SIG{'INT'} = sub {
_info($context,"interrupt signal received");
$context->{errorstates}->{$context->{tid}} = $STOP;
};
$result = _generate_cdr($context);
}
}
return $result;
}
sub _generate_cdr {
my $context = shift;
my $tid = threadid();
{
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $RUNNING;
}
$context->{tid} = $tid;
$context->{db} = &get_xa_db();
my $cdr_count = 0;
my $broadcast_state;
while (($broadcast_state = _get_threads_state($context->{errorstates})) == 0
or
(($broadcast_state & $ERROR) == 0
and ($broadcast_state & $STOP) == 0)) {
last if $cdr_count >= $context->{cdr_count};
$cdr_count += 1;
eval {
next unless _generate_cdr_init_context($context);
};
if ($@ and not $skip_errors) {
undef $context->{db};
destroy_dbs();
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $ERROR;
return 0;
}
my $retry = 1;
while ($retry > 0) {
eval {
$context->{db}->db_begin();
_create_cdr($context);
{
#lock $db_lock; #concurrent writes to voip_numbers causes deadlocks
lock $total_count;
$total_count += 1;
_info($context,"$total_count CDRs created",($total_count % 10) > 0);
}
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
}
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
};
if ($err =~ /deadlock/gi and $retry < $deadlock_retries) {
my $sleep = 0.01 * 2**$retry;
_info($context,"retrying in $sleep secs");
sleep($sleep);
$retry += 1;
} elsif (not $skip_errors) {
undef $context->{db};
destroy_dbs();
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $ERROR;
return 0;
}
} else {
$retry = 0;
}
}
}
undef $context->{db};
destroy_dbs();
if (($broadcast_state & $ERROR) == $ERROR) {
_info($context,"shutting down (error broadcast)");
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $STOP;
return 0;
} elsif (($broadcast_state & $STOP) == $STOP) {
_info($context,"shutting down (stop broadcast)");
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $STOP;
return 0;
} else {
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $COMPLETED;
return 1;
}
}
sub _generate_cdrs_create_context {
my ($context) = @_;
my $result = 1;
my %errorstates :shared = ();
my $tid = threadid();
$context->{tid} = $tid;
$context->{now} = timestamp();
$context->{errorstates} = \%errorstates;
{
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $RUNNING;
}
$context->{error_count} = 0;
$context->{warning_count} = 0;
my $result = 1;
my @reseller_ids = map { $_->{reseller}->{id}; } @providers;
$context->{reseller_ids} = \@reseller_ids;
my $domain_count = 0;
eval {
($context->{domain_map},my $ids,my $domains) = array_to_map(NGCP::BulkProcessor::Dao::Trunk::billing::domains::findall(),
sub { return shift->{id}; }, sub { return shift; }, 'first' );
$domain_count = (scalar keys %{$context->{domain_map}});
};
if ($@ or $domain_count == 0) {
_error($context,"cannot find any domains");
$result = 0; #even in skip-error mode..
} else {
_info($context,"$domain_count domains cached");
}
my $reseller_count = 0;
eval {
($context->{reseller_map},my $ids,my $resellers) = array_to_map(NGCP::BulkProcessor::Dao::Trunk::billing::resellers::findall(),
sub { return shift->{id}; }, sub { return shift; }, 'first' );
$reseller_count = (scalar keys %{$context->{reseller_map}});
};
if ($@ or $reseller_count == 0) {
_error($context,"cannot find any resellers");
$result = 0; #even in skip-error mode..
} else {
_info($context,"$reseller_count resellers cached");
}
my $active_count = 0;
eval {
$active_count = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(
$NGCP::BulkProcessor::Dao::Trunk::billing::contracts::ACTIVE_STATE,
((scalar @{$context->{reseller_ids}}) > 0 ? $context->{reseller_ids} : undef)
);
($context->{min_id},$context->{max_id}) = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::find_minmaxid(undef,
{ 'IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::ACTIVE_STATE },
((scalar @{$context->{reseller_ids}}) > 0 ? $context->{reseller_ids} : undef)
);
};
if ($@ or $active_count == 0) {
_error($context,"cannot find active subscribers");
$result = 0; #even in skip-error mode..
} else {
_info($context,"$active_count active subscribers found");
}
return $result;
}
sub _create_cdr {
my ($context) = @_;
NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::insert_row($context->{db},$context->{cdr});
return 1;
}
sub _generate_cdr_init_context {
my ($context) = @_;
my $result = 1;
#my $provider = $providers[rand @providers];
my $offnet_in;
my $offnet_out;
my $source_subscriber;
$source_subscriber = _get_random_subscriber($context) unless $offnet_in;
my $dest_subscriber;
$dest_subscriber = _get_random_subscriber($context,(defined $source_subscriber ? $source_subscriber->{id} : undef)) unless $offnet_out;
my $source_peering_subscriber_info;
my $source_reseller;
if ($source_subscriber) {
$source_subscriber->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::findby_id($source_subscriber->{contract_id});
$source_subscriber->{contract}->{contact} = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_id($source_subscriber->{contract}->{contact_id});
$source_subscriber->{contract}->{prov_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(undef,$source_subscriber->{uuid});
$source_subscriber->{primary_alias} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberidisprimary($source_subscriber->{contract}->{prov_subscriber}->{id},1);
$source_subscriber->{domain} = $context->{domain_map}->{$source_subscriber->{domain_id}}->{domain};
$source_reseller = $context->{reseller_map}->{$source_subscriber->{contract}->{contact}->{reseller_id}};
} else {
$source_peering_subscriber_info = _prepare_offnet_subscriber_info();
}
my $dest_peering_subscriber_info;
my $dest_reseller;
if ($dest_subscriber) {
$dest_subscriber->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::findby_id($dest_subscriber->{contract_id});
$dest_subscriber->{contract}->{contact} = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_id($dest_subscriber->{contract}->{contact_id});
$dest_subscriber->{contract}->{prov_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(undef,$dest_subscriber->{uuid});
$dest_subscriber->{primary_alias} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberidisprimary($dest_subscriber->{contract}->{prov_subscriber}->{id},1);
$dest_subscriber->{domain} = $context->{domain_map}->{$dest_subscriber->{domain_id}}->{domain};
$dest_reseller = $context->{reseller_map}->{$dest_subscriber->{contract}->{contact}->{reseller_id}};
} else {
$dest_peering_subscriber_info = _prepare_offnet_subscriber_info();
}
my $source_ip = '192.168.0.1';
my $time = time();
my $duration = 120;
$context->{cdr} = {
#id => ,
#update_time => ,
source_user_id => ($source_subscriber ? $source_subscriber->{uuid} : '0'),
source_provider_id => ($source_reseller ? $source_reseller->{contract_id} : '0'),
#source_external_subscriber_id => ,
#source_external_contract_id => ,
source_account_id => ($source_subscriber ? $source_subscriber->{contract_id} : '0'),
source_user => ($source_subscriber ? $source_subscriber->{username} : $source_peering_subscriber_info->{username}),
source_domain => ($source_subscriber ? $source_subscriber->{domain} : $source_peering_subscriber_info->{domain}),
source_cli => ($source_subscriber ? ($source_subscriber->{primary_alias}->{username} // $source_subscriber->{username}) : $source_peering_subscriber_info->{username}),
#source_clir => '0',
source_ip => $source_ip,
#source_gpp0 => ,
#source_gpp1 => ,
#source_gpp2 => ,
#source_gpp3 => ,
#source_gpp4 => ,
#source_gpp5 => ,
#source_gpp6 => ,
#source_gpp7 => ,
#source_gpp8 => ,
#source_gpp9 => ,
destination_user_id => ($dest_subscriber ? $dest_subscriber->{uuid} : '0'),
destination_provider_id => ($dest_reseller ? $dest_reseller->{contract_id} : '0'),
#destination_external_subscriber_id => ,
#destination_external_contract_id => ,
destination_account_id => ($dest_subscriber ? $dest_subscriber->{contract_id} : '0'),
destination_user => ($dest_subscriber ? $dest_subscriber->{username} : $dest_peering_subscriber_info->{username}),
destination_domain => ($dest_subscriber ? $dest_subscriber->{domain} : $dest_peering_subscriber_info->{domain}),
destination_user_dialed => ($dest_subscriber ? ($dest_subscriber->{primary_alias}->{username} // $dest_subscriber->{username}) : $dest_peering_subscriber_info->{username}),
destination_user_in => ($dest_subscriber ? ($dest_subscriber->{primary_alias}->{username} // $dest_subscriber->{username}) : $dest_peering_subscriber_info->{username}),
destination_domain_in => ($dest_subscriber ? $dest_subscriber->{domain} : $dest_peering_subscriber_info->{domain}),
#destination_gpp0 => ,
#destination_gpp1 => ,
#destination_gpp2 => ,
#destination_gpp3 => ,
#destination_gpp4 => ,
#destination_gpp5 => ,
#destination_gpp6 => ,
#destination_gpp7 => ,
#destination_gpp8 => ,
#destination_gpp9 => ,
#peer_auth_user => ,
#peer_auth_realm => ,
call_type => 'call',
call_status => 'ok',
call_code => '200',
init_time => $time,
start_time => $time,
duration => $duration,
call_id => _generate_call_id(),
#source_carrier_cost => ,
#source_reseller_cost => ,
#source_customer_cost => ,
#source_carrier_free_time => ,
#source_reseller_free_time => ,
#source_customer_free_time => ,
#source_carrier_billing_fee_id => ,
#source_reseller_billing_fee_id => ,
#source_customer_billing_fee_id => ,
#source_carrier_billing_zone_id => ,
#source_reseller_billing_zone_id => ,
#source_customer_billing_zone_id => ,
#destination_carrier_cost => ,
#destination_reseller_cost => ,
#destination_customer_cost => ,
#destination_carrier_free_time => ,
#destination_reseller_free_time => ,
#destination_customer_free_time => ,
#destination_carrier_billing_fee_id => ,
#destination_reseller_billing_fee_id => ,
#destination_customer_billing_fee_id => ,
#destination_carrier_billing_zone_id => ,
#destination_reseller_billing_zone_id => ,
#destination_customer_billing_zone_id => ,
#frag_carrier_onpeak => ,
#frag_reseller_onpeak => ,
#frag_customer_onpeak => ,
#is_fragmented => ,
#split => ,
#rated_at => ,
#rating_status => 'unrated',
#exported_at => ,
#export_status => ,
};
return $result;
}
sub _prepare_offnet_subscriber_info {
my ($username_primary_number,$domain) = @_;
lock %offnet_domain_subscriber_map;
my $n = 1 + scalar keys %offnet_domain_subscriber_map;
Data::Rmap::rmap { $_ =~ s/<n>/$n/; $_ =~ s/<i>/$n/; $_ =~ s/<t>/$t/; } ($domain);
$n = 1 + (exists $offnet_domain_subscriber_map{$domain} ? scalar keys %{$offnet_domain_subscriber_map{$domain}} : 0);
Data::Rmap::rmap { $_ =~ s/<n>/$n/; $_ =~ s/<i>/$n/; $_ =~ s/<t>/$t/; } ($username_primary_number);
my $username;
if ('HASH' eq ref $username_primary_number) {
$username = ($username_primary_number->{cc} // '') . ($username_primary_number->{ac} // '') . ($username_primary_number->{sn} // '');
} else {
$username = $username_primary_number;
}
$offnet_domain_subscriber_map{$domain} = {} if not exists $offnet_domain_subscriber_map{$domain};
$offnet_domain_subscriber_map{$domain}->{$username} = 1;
return { username => $username, domain => $domain };
}
sub _get_random_subscriber {
my ($context,$excluding_id) = @_;
return NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::find_random(
$context->{db},
$excluding_id,
{ 'IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::ACTIVE_STATE },
((scalar @{$context->{reseller_ids}}) > 0 ? $context->{reseller_ids} : undef),
$context->{min_id},$context->{max_id},
);
}
sub _generate_call_id {
return '*TEST*'._random_string(26,'a'..'z','A'..'Z',0..9,'-','.');
}
sub _random_string {
my ($length,@chars) = @_;
return join('',@chars[ map{ rand @chars } 1 .. $length ]);
}
sub _get_threads_state {
my ($errorstates,$tid) = @_;
my $result = 0;
if (defined $errorstates and ref $errorstates eq 'HASH') {
lock $errorstates;
foreach my $threadid (keys %$errorstates) {
if (not defined $tid or $threadid != $tid) {
$result |= $errorstates->{$threadid};
}
}
}
return $result;
}
sub _error {
my ($context,$message) = @_;
$context->{error_count} = $context->{error_count} + 1;
rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _info {
my ($context,$message,$debug) = @_;
if ($debug) {
processing_debug($context->{tid},$message,getlogger(__PACKAGE__));
} else {
processing_info($context->{tid},$message,getlogger(__PACKAGE__));
}
}
1;

@ -12,7 +12,6 @@ use Tie::IxHash;
use NGCP::BulkProcessor::Globals qw(
$enablemultithreading
$cpucount
);
use NGCP::BulkProcessor::Projects::Massive::Generator::Settings qw(
@ -28,6 +27,7 @@ use NGCP::BulkProcessor::Projects::Massive::Generator::Settings qw(
$sipusername_length
$sippassword_length
@providers
);
@ -105,7 +105,7 @@ my $ERROR = 4;
my $STOP = 8;
my $total_count :shared = 0;
my $db_lock :shared = undef;
#my $db_lock :shared = undef;
sub provision_subscribers {
@ -114,7 +114,7 @@ sub provision_subscribers {
destroy_dbs();
if ($result) {
if ($enablemultithreading and $provision_subscriber_multithreading and $cpucount > 1) {
if ($enablemultithreading and $provision_subscriber_multithreading and $provision_subscriber_count > 1) {
$context->{subscriber_count} = int($provision_subscriber_count / $provision_subscriber_numofthreads);
$context->{sn_increment} = $provision_subscriber_numofthreads;
my %processors = ();
@ -185,60 +185,67 @@ sub _provision_subscriber {
last if $subscriber_count >= $context->{subscriber_count};
$subscriber_count += 1;
next unless _provision_susbcriber_init_context($context);
next unless _provision_subscriber_init_context($context);
my $retry = 1;
while ($retry > 0) {
eval {
$context->{db}->db_begin();
#_info($context,"test" . $subscriber_count);
#die() if (($tid == 1 or $tid == 0) and $subscriber_count == 500);
_create_contact($context);
_create_contract($context);
{
#lock $db_lock; #concurrent writes to voip_numbers causes deadlocks
lock $total_count;
_create_subscriber($context);
_create_aliases($context);
$total_count += 1;
_info($context,"$total_count subscribers created",($total_count % 10) > 0);
}
# _update_preferences($context);
# _set_registrations($context);
# _set_callforwards($context);
# #todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it
# #in this main provisioning loop, or align it in separate run-modes, according to the files given.
#
# } else {
# _warn($context,(scalar @$existing_billing_voip_subscribers) . ' existing billing subscribers found, skipping');
# }
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
}
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
$context->{db}->db_begin();
#_info($context,"test" . $subscriber_count);
#die() if (($tid == 1 or $tid == 0) and $subscriber_count == 500);
if (NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::countby_ccacsn($context->{db},
$context->{numbers}->{primary}->{cc},
$context->{numbers}->{primary}->{ac},
$context->{numbers}->{primary}->{sn},
) == 0) {
_create_contact($context);
_create_contract($context);
{
#lock $db_lock; #concurrent writes to voip_numbers causes deadlocks
lock $total_count;
_create_subscriber($context);
_create_aliases($context);
$total_count += 1;
_info($context,"$total_count subscribers created",($total_count % 10) > 0);
}
# _update_preferences($context);
# _set_registrations($context);
# _set_callforwards($context);
# #todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it
# #in this main provisioning loop, or align it in separate run-modes, according to the files given.
#
} else {
_info($context,'subscriber with primary number $context->{numbers}->{primary}->{number} already exists, skipping',1);
}
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
}
};
if ($err =~ /deadlock/gi and $retry < $deadlock_retries) {
my $sleep = 0.01 * 2**$retry;
_info($context,"retrying in $sleep secs");
sleep($sleep);
$retry += 1;
} elsif (not $skip_errors) {
undef $context->{db};
destroy_dbs();
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $ERROR;
return 0;
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
};
if ($err =~ /deadlock/gi and $retry < $deadlock_retries) {
my $sleep = 0.01 * 2**$retry;
_info($context,"retrying in $sleep secs");
sleep($sleep);
$retry += 1;
} elsif (not $skip_errors) {
undef $context->{db};
destroy_dbs();
lock $context->{errorstates};
$context->{errorstates}->{$tid} = $ERROR;
return 0;
}
} else {
$retry = 0;
}
} else {
$retry = 0;
}
}
}
undef $context->{db};
@ -280,29 +287,50 @@ sub _provision_subscribers_create_context {
my $result = 1;
if ((scalar @providers) == 0) {
_error($context,"load/create providers first");
$result = 0; #even in skip-error mode..
}
#$context->{providers}
foreach my $provider (@providers) {
unless ($provider->{provider_fee}) {
_error($context,"no provider fee for reseller '$provider->{reseller}->{name}' found");
$result = 0; #even in skip-error mode..
}
if ((scalar @{$provider->{subscriber_fees}}) == 0) {
_error($context,"no subscriber fees for reseller '$provider->{reseller}->{name}' found");
$result = 0; #even in skip-error mode..
}
eval {
$provider->{domain}->{prov_domain} =
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains::findby_domain($provider->{domain}->{domain});
};
if ($@ or not $provider->{domain}->{prov_domain}) {
rowprocessingerror(threadid(),"cannot find provisioning domain '$provider->{domain}->{domain}'",getlogger(__PACKAGE__));
_error($context,"cannot find provisioning domain '$provider->{domain}->{domain}'");
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"provisioning domain '$provider->{domain}->{domain}' found",getlogger(__PACKAGE__));
_info($context,"provisioning domain '$provider->{domain}->{domain}' found");
}
$provider->{numbers_per_subscriber} //= 1;
$provider->{numbers_per_subscriber} = 1 if $provider->{numbers_per_subscriber} <= 0;
my ($sn_min,$sn_max) = split(/[: -]+/,$provider->{sn},2);
my $sn_length = length($sn_min);
$sn_length = length($sn_max) if length($sn_max) > $sn_length;
if ($sn_length > 0 and $sn_max > $sn_min and $sn_min >= 0) {
my @sn_block = map { zerofill($_,$sn_length); } ($sn_min..$sn_max);
$provider->{sn_block} = \@sn_block;
if (($provision_subscriber_count * $provider->{numbers_per_subscriber}) > scalar @sn_block) {
_error($context,"sn range $provider->{sn} less than numbers needed ($provider->{numbers_per_subscriber} * $provision_subscriber_count)");
$result = 0; #even in skip-error mode..
} else {
$provider->{sn_block} = \@sn_block;
}
#$provider->{sn_block_size} = scalar @sn_block;
} else {
rowprocessingerror(threadid(),"invalid sn block definition for provider '$provider->{sn}'",getlogger(__PACKAGE__));
_error($context,"invalid sn block definition for provider '$provider->{sn}'");
$result = 0; #even in skip-error mode..
}
}
@ -867,7 +895,7 @@ sub _create_aliases {
#
#}
sub _provision_susbcriber_init_context {
sub _provision_subscriber_init_context {
my ($context) = @_;
@ -923,7 +951,7 @@ sub _provision_susbcriber_init_context {
$context->{ncos_level} = undef;
my @numbers = ();
foreach (1..($provider->{numbers_per_subscriber} // 1)) {
foreach (1..$provider->{numbers_per_subscriber}) {
my $number = {};
my @cc = @{$provider->{cc}};
$number->{cc} = $cc[rand @cc];
@ -957,6 +985,11 @@ sub _provision_susbcriber_init_context {
$context->{numbers}->{primary} = shift(@{$context->{numbers}->{other}});
#return 0 unless scalar @{$context->{numbers}->{other}};
#if ($number_for_sipusername) {
# $context->{prov_subscriber}->{username} = $context->{numbers}->{primary}->{number};
# $context->{bill_subscriber}->{username} = $context->{numbers}->{primary}->{number};
#}
$context->{voip_numbers} = {};
$context->{voip_numbers}->{primary} = undef;
$context->{voip_numbers}->{other} = [];

@ -60,6 +60,10 @@ our @EXPORT_OK = qw(
@providers
$providers_yml
$generate_cdr_multithreading
$generate_cdr_numofthreads
$generate_cdr_count
);
our $defaultconfig = 'config.cfg';
@ -86,6 +90,10 @@ our @provider_config = ();
our @providers = ();
our $providers_yml = undef;
our $generate_cdr_multithreading = $enablemultithreading;
our $generate_cdr_numofthreads = $cpucount;
our $generate_cdr_count = 0;
sub update_settings {
my ($data,$configfile) = @_;
@ -121,6 +129,10 @@ sub update_settings {
$providers_yml = $data->{providers_yml} if exists $data->{providers_yml};
$generate_cdr_multithreading = $data->{generate_cdr_multithreading} if exists $data->{generate_cdr_multithreading};
$generate_cdr_numofthreads = _get_numofthreads($cpucount,$data,'generate_cdr_numofthreads');
$generate_cdr_count = $data->{generate_cdr_count} if exists $data->{generate_cdr_count};
return $result;
}

@ -62,7 +62,7 @@ use NGCP::BulkProcessor::ConnectorPool qw(destroy_dbs);
#use NGCP::BulkProcessor::Projects::Massive::Generator::Dao::Blah qw();
#use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw();
use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
@ -70,6 +70,9 @@ use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
use NGCP::BulkProcessor::Projects::Massive::Generator::Provisioning qw(
provision_subscribers
);
use NGCP::BulkProcessor::Projects::Massive::Generator::CDR qw(
generate_cdrs
);
use NGCP::BulkProcessor::Projects::Massive::Generator::Api qw(
setup_provider
);
@ -91,6 +94,9 @@ push(@TASK_OPTS,$setup_provider_task_opt);
my $provision_subscriber_task_opt = 'provision_subscriber';
push(@TASK_OPTS,$provision_subscriber_task_opt);
my $generate_cdr_task_opt = 'generate_cdr';
push(@TASK_OPTS,$generate_cdr_task_opt);
if (init()) {
main();
exit(0);
@ -155,6 +161,13 @@ sub main() {
$completion |= 1;
}
} elsif (lc($generate_cdr_task_opt) eq lc($task)) {
if (taskinfo($generate_cdr_task_opt,$result,1)) {
next unless check_dry();
$result &= generate_cdr_task(\@messages);
$completion |= 1;
}
} else {
$result = 0;
scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
@ -273,6 +286,30 @@ sub provision_subscriber_task {
}
sub generate_cdr_task {
my ($messages) = @_;
my ($result) = (0);
eval {
($result) = generate_cdrs();
};
my $err = $@;
my $stats = ":";
eval {
$stats .= "\n total CDRs: " .
NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::countby_ratingstatus(undef) . ' rows';
};
if ($err or !$result) {
push(@$messages,"generate cdrs INCOMPLETE$stats");
} else {
push(@$messages,"generate cdrs completed$stats");
}
destroy_dbs();
return $result;
}
#END {
# # this should not be required explicitly, but prevents Log4Perl's
# # "rootlogger not initialized error upon exit..

@ -0,0 +1,128 @@
-
domain: domain1.com
reseller: reseller1
cc:
- 111
ac:
- 456
sn: '000001-999999'
numbers_per_subscriber: 1
provider_rate:
prepaid: 0
fees:
-
source: .
destination: .
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 0
offpeak_init_interval: 1
offpeak_init_rate: 0
onpeak_follow_interval: 1
onpeak_follow_rate: 0
onpeak_init_interval: 1
onpeak_init_rate: 0
-
source: .
destination: ^[^1].+
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 1
offpeak_init_interval: 1
offpeak_init_rate: 1
onpeak_follow_interval: 1
onpeak_follow_rate: 1
onpeak_init_interval: 1
onpeak_init_rate: 1
subscriber_rates:
-
prepaid: 0
fees:
-
source: .
destination: .
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 0
offpeak_init_interval: 60
offpeak_init_rate: 0
onpeak_follow_interval: 1
onpeak_follow_rate: 0
onpeak_init_interval: 60
onpeak_init_rate: 0
-
source: .
destination: ^[^1].+
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 1
offpeak_init_interval: 60
offpeak_init_rate: 1
onpeak_follow_interval: 1
onpeak_follow_rate: 1
onpeak_init_interval: 60
onpeak_init_rate: 1
-
domain: domain2.com
reseller: reseller2
cc:
- 222
ac:
- 456
sn: '000001-999999'
numbers_per_subscriber: 1
provider_rate:
prepaid: 0
fees:
-
source: .
destination: .
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 0
offpeak_init_interval: 1
offpeak_init_rate: 0
onpeak_follow_interval: 1
onpeak_follow_rate: 0
onpeak_init_interval: 1
onpeak_init_rate: 0
-
source: .
destination: ^[^2].+
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 1
offpeak_init_interval: 1
offpeak_init_rate: 1
onpeak_follow_interval: 1
onpeak_follow_rate: 1
onpeak_init_interval: 1
onpeak_init_rate: 1
subscriber_rates:
-
prepaid: 0
fees:
-
source: .
destination: .
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 0
offpeak_init_interval: 60
offpeak_init_rate: 0
onpeak_follow_interval: 1
onpeak_follow_rate: 0
onpeak_init_interval: 60
onpeak_init_rate: 0
-
source: .
destination: ^[^2].+
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 1
offpeak_init_interval: 60
offpeak_init_rate: 1
onpeak_follow_interval: 1
onpeak_follow_rate: 1
onpeak_init_interval: 60
onpeak_init_rate: 1

@ -1,110 +1,128 @@
-
domain: narf1.com
reseller: narf1
numbers_per_subscriber: 3
domain: domain1.com
reseller: reseller1
cc:
- 888
- 999
- 111
ac:
- 123
- 456
sn: '000001-100000'
-
domain: narf2.com
reseller: narf2
cc:
- 888
- 999
ac:
- 123
- 456
sn: '000001-100000'
-
domain: narf3.com
reseller: narf3
cc:
- 888
- 999
ac:
- 123
- 456
sn: '000001-100000'
sn: '000001-999999'
numbers_per_subscriber: 1
provider_rate:
prepaid: 0
fees:
-
destination: ^888.+
source: .
destination: .
direction: out
offpeak_follow_interval: 5
offpeak_follow_rate: 1
offpeak_init_interval: 5
offpeak_init_rate: 2
onpeak_follow_interval: 5
onpeak_follow_rate: 1
onpeak_init_interval: 5
onpeak_init_rate: 2
offpeak_follow_interval: 1
offpeak_follow_rate: 0
offpeak_init_interval: 1
offpeak_init_rate: 0
onpeak_follow_interval: 1
onpeak_follow_rate: 0
onpeak_init_interval: 1
onpeak_init_rate: 0
-
destination: .
direction: in
offpeak_follow_interval: 5
source: .
destination: ^[^1].+
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 1
offpeak_init_interval: 5
offpeak_init_interval: 1
offpeak_init_rate: 1
onpeak_follow_interval: 5
onpeak_follow_interval: 1
onpeak_follow_rate: 1
onpeak_init_interval: 5
onpeak_init_rate: 1
source: ^888.+
onpeak_init_interval: 1
onpeak_init_rate: 1
subscriber_rates:
-
prepaid: 0
fees:
-
destination: ^8882.+
source: .
destination: .
direction: out
offpeak_follow_interval: 5
offpeak_follow_rate: 1
offpeak_init_interval: 5
offpeak_init_rate: 6
onpeak_follow_interval: 5
onpeak_follow_rate: 1
onpeak_init_interval: 5
onpeak_init_rate: 6
offpeak_follow_interval: 1
offpeak_follow_rate: 0
offpeak_init_interval: 60
offpeak_init_rate: 0
onpeak_follow_interval: 1
onpeak_follow_rate: 0
onpeak_init_interval: 60
onpeak_init_rate: 0
-
destination: .
direction: in
offpeak_follow_interval: 5
source: .
destination: ^[^1].+
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 1
offpeak_init_interval: 5
offpeak_init_rate: 5
onpeak_follow_interval: 5
offpeak_init_interval: 60
offpeak_init_rate: 1
onpeak_follow_interval: 1
onpeak_follow_rate: 1
onpeak_init_interval: 5
onpeak_init_rate: 5
source: ^8881.+
onpeak_init_interval: 60
onpeak_init_rate: 1
-
domain: domain2.com
reseller: reseller2
cc:
- 222
ac:
- 456
sn: '000001-999999'
numbers_per_subscriber: 1
provider_rate:
prepaid: 0
fees:
-
source: .
destination: .
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 0
offpeak_init_interval: 1
offpeak_init_rate: 0
onpeak_follow_interval: 1
onpeak_follow_rate: 0
onpeak_init_interval: 1
onpeak_init_rate: 0
-
source: .
destination: ^[^2].+
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 1
offpeak_init_interval: 1
offpeak_init_rate: 1
onpeak_follow_interval: 1
onpeak_follow_rate: 1
onpeak_init_interval: 1
onpeak_init_rate: 1
subscriber_rates:
-
prepaid: 1
prepaid: 0
fees:
-
destination: ^8882.+
source: .
destination: .
direction: out
offpeak_follow_interval: 5
offpeak_follow_rate: 1
offpeak_init_interval: 5
offpeak_init_rate: 4
onpeak_follow_interval: 5
onpeak_follow_rate: 1
onpeak_init_interval: 5
onpeak_init_rate: 4
offpeak_follow_interval: 1
offpeak_follow_rate: 0
offpeak_init_interval: 60
offpeak_init_rate: 0
onpeak_follow_interval: 1
onpeak_follow_rate: 0
onpeak_init_interval: 60
onpeak_init_rate: 0
-
destination: .
direction: in
offpeak_follow_interval: 5
source: .
destination: ^[^2].+
direction: out
offpeak_follow_interval: 1
offpeak_follow_rate: 1
offpeak_init_interval: 5
offpeak_init_rate: 3
onpeak_follow_interval: 5
offpeak_init_interval: 60
offpeak_init_rate: 1
onpeak_follow_interval: 1
onpeak_follow_rate: 1
onpeak_init_interval: 5
onpeak_init_rate: 3
source: ^8881.+
onpeak_init_interval: 60
onpeak_init_rate: 1

@ -8,7 +8,11 @@ webpassword_length = 8
webusername_length = 8
sippassword_length = 16
sipusername_length = 8
provision_subscriber_count = 2000
provision_subscriber_count = 20000
providers_yml = providers.yml
generate_cdr_multithreading = 1
#generate_cdr_numofthreads = 2
generate_cdr_count = 50000

@ -0,0 +1,17 @@
#dry=0
#skip_errors=0
provision_subscriber_multithreading = 1
#provision_subscriber_numofthreads = 2
webpassword_length = 8
webusername_length = 8
sippassword_length = 16
sipusername_length = 8
provision_subscriber_count = 100
providers_yml = providers.yml
generate_cdr_multithreading = 1
#generate_cdr_numofthreads = 2
generate_cdr_count = 100
Loading…
Cancel
Save