diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm index 1655bb9..899209e 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm @@ -14,7 +14,8 @@ use NGCP::BulkProcessor::ConnectorPool qw( use NGCP::BulkProcessor::SqlProcessor qw( checktableinfo - + insert_record + update_record copy_row ); use NGCP::BulkProcessor::SqlRecord qw(); @@ -25,7 +26,11 @@ our @EXPORT_OK = qw( gettablename check_table + update_row + insert_row + delete_callids + countby_ratingstatus ); #process_records @@ -164,6 +169,90 @@ sub delete_callids { } +sub countby_ratingstatus { + + my ($rating_status) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + my @terms = (); + if (defined $rating_status) { + push(@terms,$db->columnidentifier('rating_status') . ' = ?'); + push(@params,$rating_status); + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + + return $db->db_get_value($stmt,@params); + +} + +sub update_row { + + my ($xa_db,$data) = @_; + + check_table(); + return update_record($get_db,$xa_db,__PACKAGE__,$data); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + #my %params = @_; + #my ($contract_id, + # $domain_id, + # $username, + # $uuid) = @params{qw/ + # contract_id + # domain_id + # username + # uuid + # /}; + # + #if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + # $db->columnidentifier('contact_id') . ', ' . + # $db->columnidentifier('contract_id') . ', ' . + # $db->columnidentifier('domain_id') . ', ' . + # $db->columnidentifier('external_id') . ', ' . + # $db->columnidentifier('primary_number_id') . ', ' . + # $db->columnidentifier('status') . ', ' . + # $db->columnidentifier('username') . ', ' . + # $db->columnidentifier('uuid') . ') VALUES (' . + # 'NULL, ' . + # '?, ' . + # '?, ' . + # 'NULL, ' . + # 'NULL, ' . + # '\'' . $ACTIVE_STATE . '\', ' . + # '?, ' . + # '?)', + # $contract_id, + # $domain_id, + # $username, + # $uuid, + # )) { + # rowinserted($db,$tablename,getlogger(__PACKAGE__)); + # return $xa_db->db_last_insert_id(); + #} + } + return undef; + +} + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contacts.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contacts.pm index 69d519e..416292f 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contacts.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contacts.pm @@ -29,6 +29,7 @@ our @EXPORT_OK = qw( update_row findby_reselleridfields + findby_id ); my $tablename = 'contacts'; @@ -106,6 +107,23 @@ sub findby_reselleridfields { } +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub update_row { my ($xa_db,$data) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm index f3fc8c5..4ce8932 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm @@ -31,6 +31,7 @@ our @EXPORT_OK = qw( countby_status_resellerid findby_contactid + findby_id process_records @@ -99,6 +100,23 @@ sub findby_contactid { } +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub countby_status_resellerid { my ($status,$reseller_id) = @_; @@ -116,8 +134,13 @@ sub countby_status_resellerid { push(@params,$status); } if ($reseller_id) { - push(@terms,'contact.reseller_id = ?'); - push(@params,$reseller_id); + if ('ARRAY' eq ref $reseller_id) { + push(@terms,'contact.reseller_id IN (' . substr(',?' x scalar @$reseller_id,1) . ')'); + push(@params,@$reseller_id); + } else { + push(@terms,'contact.reseller_id = ?'); + push(@params,$reseller_id); + } } if ((scalar @terms) > 0) { $stmt .= ' WHERE ' . join(' AND ',@terms); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domains.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domains.pm index 6e3d73f..9dd3c0c 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domains.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/domains.pm @@ -22,6 +22,7 @@ our @EXPORT_OK = qw( findby_domain findby_id + findall ); my $tablename = 'domains'; @@ -46,6 +47,21 @@ sub new { } +sub findall { + + my ($load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table; + my $rows = $db->db_get_all_arrayref($stmt); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub findby_domain { my ($domain,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/resellers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/resellers.pm index 81e071a..9066461 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/resellers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/resellers.pm @@ -22,6 +22,7 @@ our @EXPORT_OK = qw( findby_name findby_id + findall ); my $tablename = 'resellers'; @@ -48,6 +49,21 @@ sub new { } +sub findall { + + my ($load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table; + my $rows = $db->db_get_all_arrayref($stmt); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub findby_name { my ($name,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm index 8ff906c..015e6eb 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_numbers.pm @@ -34,6 +34,8 @@ our @EXPORT_OK = qw( forupdate_cc_ac_sn_subscriberid release_subscriber_numbers + countby_ccacsn + $ACTIVE_STATE ); @@ -89,6 +91,25 @@ sub findby_subscriberid { } + +sub countby_ccacsn { + + my ($xa_db,$cc,$ac,$sn) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE ' . + $db->columnidentifier('cc') . ' = ?' . + ' AND ' . $db->columnidentifier('ac') . ' = ?' . + ' AND ' . $db->columnidentifier('sn') . ' = ?'; + my @params = ($cc // '',$ac // '',$sn // ''); + return $db->db_get_value($stmt,@params); + +} + sub forupdate_cc_ac_sn_subscriberid { my ($xa_db,$cc,$ac,$sn,$subscriber_id,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm index 0cf25c2..24d156c 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm @@ -16,6 +16,7 @@ use NGCP::BulkProcessor::SqlProcessor qw( checktableinfo insert_record update_record + process_table copy_row ); use NGCP::BulkProcessor::SqlRecord qw(); @@ -33,6 +34,9 @@ our @EXPORT_OK = qw( findby_domainid_username_states countby_status_resellerid + process_records + find_minmaxid + find_random $TERMINATED_STATE $ACTIVE_STATE @@ -101,6 +105,103 @@ sub findby_domainid_username_states { } +sub find_minmaxid { + + my ($xa_db,$states,$reseller_id) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my @ids = (); + foreach my $func ('MIN','MAX') { + my @params = (); + my $stmt = 'SELECT ' . $func . '(r1.id) FROM ' . $table . ' AS r1'; + if ($reseller_id) { + $stmt .= ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::contracts::gettablename()) . ' AS contract ON r1.contract_id = contract.id' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::contacts::gettablename()) . ' AS contact ON contract.contact_id = contact.id'; + } + $stmt .= ' WHERE 1=1'; + if ($reseller_id) { + if ('ARRAY' eq ref $reseller_id) { + $stmt .= ' AND contact.reseller_id IN (' . substr(',?' x scalar @$reseller_id,1) . ')'; + push(@params,@$reseller_id); + } else { + $stmt .= ' AND contact.reseller_id = ?'; + push(@params,$reseller_id); + } + } + if (defined $states and 'HASH' eq ref $states) { + foreach my $in (keys %$states) { + my @values = (defined $states->{$in} and 'ARRAY' eq ref $states->{$in} ? @{$states->{$in}} : ($states->{$in})); + $stmt .= ' AND r1.status ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $states and length($states) > 0) { + $stmt .= ' AND r1.status = ?'; + push(@params,$states); + } + push(@ids,$db->db_get_value($stmt,@params)); + } + return @ids; + +} + +sub find_random { + + my ($xa_db,$excluding_id,$states,$reseller_id,$min_id,$max_id,$load_recursive) = @_; + + if (not defined $min_id or not defined $max_id) { + ($min_id,$max_id) = find_minmaxid($xa_db,$states,$reseller_id); + } + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT r1.* FROM ' . $table . ' AS r1' . + ' JOIN (SELECT ? + RAND() * ? AS id) AS r2'; + my @params = (); + push(@params,$min_id,$max_id - $min_id); + if ($reseller_id) { + $stmt .= ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::contracts::gettablename()) . ' AS contract ON r1.contract_id = contract.id' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::contacts::gettablename()) . ' AS contact ON contract.contact_id = contact.id'; + } + $stmt .= ' WHERE r1.id >= r2.id'; + + if (defined $states and 'HASH' eq ref $states) { + foreach my $in (keys %$states) { + my @values = (defined $states->{$in} and 'ARRAY' eq ref $states->{$in} ? @{$states->{$in}} : ($states->{$in})); + $stmt .= ' AND r1.status ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $states and length($states) > 0) { + $stmt .= ' AND r1.status = ?'; + push(@params,$states); + } + if (defined $excluding_id) { + $stmt .= ' AND r1.id != ?'; + push(@params,$excluding_id); + } + if ($reseller_id) { + if ('ARRAY' eq ref $reseller_id) { + $stmt .= ' AND contact.reseller_id IN (' . substr(',?' x scalar @$reseller_id,1) . ')'; + push(@params,@$reseller_id); + } else { + $stmt .= ' AND contact.reseller_id = ?'; + push(@params,$reseller_id); + } + } + $stmt .= ' ORDER BY r1.id ASC LIMIT 1'; + + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub countby_status_resellerid { my ($status,$reseller_id) = @_; @@ -119,8 +220,13 @@ sub countby_status_resellerid { push(@params,$status); } if ($reseller_id) { - push(@terms,'contact.reseller_id = ?'); - push(@params,$reseller_id); + if ('ARRAY' eq ref $reseller_id) { + push(@terms,'contact.reseller_id IN (' . substr(',?' x scalar @$reseller_id,1) . ')'); + push(@params,@$reseller_id); + } else { + push(@terms,'contact.reseller_id = ?'); + push(@params,$reseller_id); + } } if ((scalar @terms) > 0) { $stmt .= ' WHERE ' . join(' AND ',@terms); @@ -191,6 +297,50 @@ sub insert_row { } +sub process_records { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $blocksize, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + blocksize + numofthreads + load_recursive + /}; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_dbs, + multithreading => $multithreading, + blocksize => $blocksize, + tableprocessing_threads => $numofthreads, + 'select' => 'SELECT * FROM ' . $table . ' WHERE ' . $db->columnidentifier('status') . ' != "' . $TERMINATED_STATE . '"', + 'selectcount' => 'SELECT COUNT(*) FROM ' . $table . ' WHERE ' . $db->columnidentifier('status') . ' != "' . $TERMINATED_STATE . '"', + ); +} + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm index 0c14edf..cc6b7d1 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm @@ -32,6 +32,7 @@ our @EXPORT_OK = qw( findby_subscriberid_username findby_domainid_username countby_subscriberidisprimary + findby_subscriberidisprimary ); my $tablename = 'voip_dbaliases'; @@ -103,6 +104,24 @@ sub findby_domainid_username { } +sub findby_subscriberidisprimary { + + my ($subscriber_id,$is_primary,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('subscriber_id') . ' = ?' . + ' AND ' . $db->columnidentifier('is_primary') . ' = ?'; + my @params = ($subscriber_id,$is_primary); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub countby_subscriberidisprimary { my ($subscriber_id,$is_primary) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Api.pm b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Api.pm index 2570af5..25ebd77 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Api.pm +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Api.pm @@ -57,7 +57,29 @@ sub setup_provider { type /}; my $provider = {}; - if (not _load_provider($provider,$reseller_name,$domain_name) and not $dry) { + + $provider->{reseller} = _find_entity('NGCP::BulkProcessor::RestRequests::Trunk::Resellers', + name => $reseller_name, + ); + my $new_reseller = 0; + if (defined $provider->{reseller}) { + _info("reseller '$reseller_name' found"); + $provider->{contract} = NGCP::BulkProcessor::RestRequests::Trunk::Contracts::get_item($provider->{reseller}->{contract_id}); + if (defined $provider->{contract}) { + _info("contract ID $provider->{reseller}->{contract_id} found"); + } else { + _info("contract ID $provider->{reseller}->{contract_id} not found"); + return undef; + } + + $provider->{contact} = NGCP::BulkProcessor::RestRequests::Trunk::SystemContacts::get_item($provider->{contract}->{contact_id}); + if (defined $provider->{contact}) { + _info("contact ID $provider->{contract}->{contact_id} found"); + } else { + _info("contact ID $provider->{contract}->{contact_id} not found"); + return undef; + } + } elsif (not $dry) { $provider->{contact} = _create_systemcontact(); _info("contact ID $provider->{contact}->{id} created"); $provider->{contract} = _create_contract( @@ -71,6 +93,40 @@ sub setup_provider { name => $reseller_name, #"test ", ); _info("reseller '$reseller_name' created"); + $new_reseller = 1; + } else { + _info("reseller '$reseller_name' not found"); + return undef; + } + + $provider->{domain} = _find_entity('NGCP::BulkProcessor::RestRequests::Trunk::Domains', + domain => $domain_name, + ); + if (defined $provider->{domain}) { + _info("domain '$domain_name' found"); + } elsif (not $dry) { + $provider->{domain} = _create_domain( + reseller_id => $provider->{reseller}->{id}, + #domain => $domain_name.'.', + domain => $domain_name, + ); + _info("domain '$domain_name' created"); + } else { + _info("domain '$domain_name' not found"); + return undef; + } + + my $provider_profile = NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::get_item($provider->{contract}->{billing_profile_id}); + if (not $new_reseller and defined $provider_profile) { + _info("provider billing profile ID $provider_profile->{id} found"); + my $profile_fee = {}; + ($profile_fee->{profile}, + $profile_fee->{zone}, + $profile_fee->{fee}, + $profile_fee->{fees}) = _load_fees($provider_profile); + $provider->{profile} = $profile_fee->{profile}; + $provider->{provider_fee} = $profile_fee; + } elsif (not $dry) { if (defined $provider_rate) { my $profile_fee = {}; ($profile_fee->{profile}, @@ -87,91 +143,41 @@ sub setup_provider { ); _info("contract ID $provider->{contract}->{id} updated"); } - - $provider->{domain} = _create_domain( - reseller_id => $provider->{reseller}->{id}, - #domain => $domain_name.'.', - domain => $domain_name, - ); - _info("domain '$domain_name' created"); - $provider->{subscriber_fees} = []; - foreach my $rate (@$subscriber_rates) { - my $profile_fee = {}; - ($profile_fee->{profile}, - $profile_fee->{zone}, - $profile_fee->{fee}, - $profile_fee->{fees}) = _setup_fees($provider->{reseller}, - %$rate - ); - push(@{$provider->{subscriber_fees}},$profile_fee); - } - } - return $provider; -} - -sub _load_provider { - my ($provider,$reseller_name,$domain_name) = @_; - - $provider->{reseller} = _find_entity('NGCP::BulkProcessor::RestRequests::Trunk::Resellers', - name => $reseller_name, - ); - if (defined $provider->{reseller}) { - _info("reseller '$reseller_name' found"); - } else { - return 0; - } - - $provider->{contract} = NGCP::BulkProcessor::RestRequests::Trunk::Contracts::get_item($provider->{reseller}->{contract_id}); - if (defined $provider->{contract}) { - _info("contract ID $provider->{reseller}->{contract_id} found"); } else { - return 0; + _info("provider billing profile ID $provider->{contract}->{billing_profile_id} not found"); + return undef; } - $provider->{contact} = NGCP::BulkProcessor::RestRequests::Trunk::SystemContacts::get_item($provider->{contract}->{contact_id}); - if (defined $provider->{contact}) { - _info("contact ID $provider->{contract}->{contact_id} found"); - } else { - return 0; - } - - $provider->{domain} = _find_entity('NGCP::BulkProcessor::RestRequests::Trunk::Domains', - domain => $domain_name, - ); - if (defined $provider->{domain}) { - _info("domain '$domain_name' found"); - } else { - return 0; + $provider->{subscriber_fees} = []; + foreach my $subscriber_profile (@{NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::findby_resellerid($provider->{reseller}->{id})}) { + next if (defined $provider_profile and $provider_profile->{id} == $subscriber_profile->{id}); + _info("subscriber billing profile ID $subscriber_profile->{id} found"); + my $profile_fee = {}; + ($profile_fee->{profile}, + $profile_fee->{zone}, + $profile_fee->{fee}, + $profile_fee->{fees}) = _load_fees($subscriber_profile); + push(@{$provider->{subscriber_fees}},$profile_fee); } - - my $provider_profile = NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::get_item($provider->{contract}->{billing_profile_id}); - if (defined $provider_profile) { - _info("provider billing profile ID $provider_profile->{id} found"); - } - if (defined $provider_profile) { - my $profile_fee = {}; + if ((scalar @{$provider->{subscriber_fees}}) == 0 and defined $subscriber_rates and (scalar @$subscriber_rates) > 0) { + if (not $dry) { + foreach my $rate (@$subscriber_rates) { + my $profile_fee = {}; ($profile_fee->{profile}, $profile_fee->{zone}, $profile_fee->{fee}, - $profile_fee->{fees}) = _load_fees($provider_profile); - $provider->{profile} = $profile_fee->{profile}; - $provider->{provider_fee} = $profile_fee; + $profile_fee->{fees}) = _setup_fees($provider->{reseller}, + %$rate + ); + push(@{$provider->{subscriber_fees}},$profile_fee); + } + } else { + _info("no subscriber billing profile(s) found"); + return undef; } + } - $provider->{subscriber_fees} = []; - foreach my $subscriber_profile (@{NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles::findby_resellerid($provider->{reseller}->{id})}) { - next if (defined $provider_profile and $provider_profile->{id} == $subscriber_profile->{id}); - _info("subscriber billing profile ID $subscriber_profile->{id} found"); - my $profile_fee = {}; - ($profile_fee->{profile}, - $profile_fee->{zone}, - $profile_fee->{fee}, - $profile_fee->{fees}) = _load_fees($subscriber_profile); - push(@{$provider->{subscriber_fees}},$profile_fee); - } - - return 1; - + return $provider; } sub _load_fees { diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/CDR.pm b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/CDR.pm new file mode 100644 index 0000000..723130f --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/CDR.pm @@ -0,0 +1,528 @@ +package NGCP::BulkProcessor::Projects::Massive::Generator::CDR; +use strict; + +## no critic + +use threads::shared qw(); +use Time::HiRes qw(sleep); +use String::MkPasswd qw(); +#use List::Util qw(); +use Data::Rmap qw(); + +use Tie::IxHash; + +use NGCP::BulkProcessor::Globals qw( + $enablemultithreading +); + +use NGCP::BulkProcessor::Projects::Massive::Generator::Settings qw( + $dry + $skip_errors + $deadlock_retries + + $generate_cdr_multithreading + $generate_cdr_numofthreads + $generate_cdr_count + + @providers +); + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn + fileerror +); + +use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw(); + +use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); + +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_xa_db + ping_dbs + destroy_dbs +); + +use NGCP::BulkProcessor::Utils qw(threadid timestamp); # stringtobool check_ipnet trim); +#use NGCP::BulkProcessor::DSSorter qw(sort_by_configs); +#use NGCP::BulkProcessor::RandomString qw(createtmpstring); +use NGCP::BulkProcessor::Array qw(array_to_map); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + generate_cdrs + +); + +my $thread_sleep_secs = 0.1; + +my $RUNNING = 1; +my $COMPLETED = 2; +my $ERROR = 4; +my $STOP = 8; + +my $total_count :shared = 0; + +my $t = time; +my %offnet_domain_subscriber_map : shared = (); + +sub generate_cdrs { + + my $context = {}; + my $result = _generate_cdrs_create_context($context); + + destroy_dbs(); + if ($result) { + if ($enablemultithreading and $generate_cdr_multithreading and $generate_cdr_count > 1) { + $context->{cdr_count} = int($generate_cdr_count / $generate_cdr_numofthreads); + #$context->{sn_increment} = $generate_cdr_numofthreads; + my %processors = (); + for (my $i = 0; $i < $generate_cdr_numofthreads; $i++) { + $context->{cdr_count} += ($generate_cdr_count - $context->{cdr_count} * $generate_cdr_numofthreads) if $i == 0; + _info($context,'starting generator thread ' . ($i + 1) . ' of ' . $generate_cdr_numofthreads); + $context->{sn_offset} = $i; + my $processor = threads->create(\&_generate_cdr,$context); + if (!defined $processor) { + _info($context,'generator thread ' . ($i + 1) . ' of ' . $generate_cdr_numofthreads . ' NOT started'); + } + $processors{$processor->tid()} = $processor; + } + local $SIG{'INT'} = sub { + _info($context,"interrupt signal received"); + $result = 0; + lock $context->{errorstates}; + $context->{errorstates}->{$context->{tid}} = $STOP; + }; + while ((scalar keys %processors) > 0) { + foreach my $processor (values %processors) { + if (defined $processor and $processor->is_joinable()) { + $processor->join(); + delete $processors{$processor->tid()}; + _info($context,'generator thread tid ' . $processor->tid() . ' joined'); + } + } + sleep($thread_sleep_secs); + } + + $result &= (_get_threads_state($context->{errorstates},$context->{tid}) & $COMPLETED) == $COMPLETED; + + } else { + + $context->{cdr_count} = $generate_cdr_count; + #$context->{sn_increment} = 1; + #$context->{sn_offset} = 0; + local $SIG{'INT'} = sub { + _info($context,"interrupt signal received"); + $context->{errorstates}->{$context->{tid}} = $STOP; + }; + $result = _generate_cdr($context); + + } + } + + return $result; +} + +sub _generate_cdr { + + my $context = shift; + my $tid = threadid(); + { + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $RUNNING; + } + $context->{tid} = $tid; + $context->{db} = &get_xa_db(); + + my $cdr_count = 0; + my $broadcast_state; + while (($broadcast_state = _get_threads_state($context->{errorstates})) == 0 + or + (($broadcast_state & $ERROR) == 0 + and ($broadcast_state & $STOP) == 0)) { + + last if $cdr_count >= $context->{cdr_count}; + $cdr_count += 1; + + eval { + next unless _generate_cdr_init_context($context); + }; + if ($@ and not $skip_errors) { + undef $context->{db}; + destroy_dbs(); + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $ERROR; + return 0; + } + + my $retry = 1; + while ($retry > 0) { + eval { + $context->{db}->db_begin(); + + _create_cdr($context); + + { + #lock $db_lock; #concurrent writes to voip_numbers causes deadlocks + lock $total_count; + $total_count += 1; + _info($context,"$total_count CDRs created",($total_count % 10) > 0); + } + + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + }; + if ($err =~ /deadlock/gi and $retry < $deadlock_retries) { + my $sleep = 0.01 * 2**$retry; + _info($context,"retrying in $sleep secs"); + sleep($sleep); + $retry += 1; + } elsif (not $skip_errors) { + undef $context->{db}; + destroy_dbs(); + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $ERROR; + return 0; + } + } else { + $retry = 0; + } + } + } + undef $context->{db}; + destroy_dbs(); + if (($broadcast_state & $ERROR) == $ERROR) { + _info($context,"shutting down (error broadcast)"); + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $STOP; + return 0; + } elsif (($broadcast_state & $STOP) == $STOP) { + _info($context,"shutting down (stop broadcast)"); + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $STOP; + return 0; + } else { + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $COMPLETED; + return 1; + } + +} + + +sub _generate_cdrs_create_context { + my ($context) = @_; + + my $result = 1; + + my %errorstates :shared = (); + my $tid = threadid(); + $context->{tid} = $tid; + $context->{now} = timestamp(); + $context->{errorstates} = \%errorstates; + { + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $RUNNING; + } + $context->{error_count} = 0; + $context->{warning_count} = 0; + + my $result = 1; + + my @reseller_ids = map { $_->{reseller}->{id}; } @providers; + $context->{reseller_ids} = \@reseller_ids; + + my $domain_count = 0; + eval { + ($context->{domain_map},my $ids,my $domains) = array_to_map(NGCP::BulkProcessor::Dao::Trunk::billing::domains::findall(), + sub { return shift->{id}; }, sub { return shift; }, 'first' ); + $domain_count = (scalar keys %{$context->{domain_map}}); + }; + if ($@ or $domain_count == 0) { + _error($context,"cannot find any domains"); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"$domain_count domains cached"); + } + + my $reseller_count = 0; + eval { + ($context->{reseller_map},my $ids,my $resellers) = array_to_map(NGCP::BulkProcessor::Dao::Trunk::billing::resellers::findall(), + sub { return shift->{id}; }, sub { return shift; }, 'first' ); + $reseller_count = (scalar keys %{$context->{reseller_map}}); + }; + if ($@ or $reseller_count == 0) { + _error($context,"cannot find any resellers"); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"$reseller_count resellers cached"); + } + + my $active_count = 0; + eval { + $active_count = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid( + $NGCP::BulkProcessor::Dao::Trunk::billing::contracts::ACTIVE_STATE, + ((scalar @{$context->{reseller_ids}}) > 0 ? $context->{reseller_ids} : undef) + ); + ($context->{min_id},$context->{max_id}) = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::find_minmaxid(undef, + { 'IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::ACTIVE_STATE }, + ((scalar @{$context->{reseller_ids}}) > 0 ? $context->{reseller_ids} : undef) + ); + }; + if ($@ or $active_count == 0) { + _error($context,"cannot find active subscribers"); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"$active_count active subscribers found"); + } + + return $result; + +} + +sub _create_cdr { + my ($context) = @_; + + NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::insert_row($context->{db},$context->{cdr}); + + return 1; + +} + +sub _generate_cdr_init_context { + + my ($context) = @_; + + my $result = 1; + + #my $provider = $providers[rand @providers]; + + my $offnet_in; + my $offnet_out; + + my $source_subscriber; + $source_subscriber = _get_random_subscriber($context) unless $offnet_in; + my $dest_subscriber; + $dest_subscriber = _get_random_subscriber($context,(defined $source_subscriber ? $source_subscriber->{id} : undef)) unless $offnet_out; + + my $source_peering_subscriber_info; + my $source_reseller; + if ($source_subscriber) { + $source_subscriber->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::findby_id($source_subscriber->{contract_id}); + $source_subscriber->{contract}->{contact} = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_id($source_subscriber->{contract}->{contact_id}); + $source_subscriber->{contract}->{prov_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(undef,$source_subscriber->{uuid}); + $source_subscriber->{primary_alias} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberidisprimary($source_subscriber->{contract}->{prov_subscriber}->{id},1); + $source_subscriber->{domain} = $context->{domain_map}->{$source_subscriber->{domain_id}}->{domain}; + $source_reseller = $context->{reseller_map}->{$source_subscriber->{contract}->{contact}->{reseller_id}}; + } else { + $source_peering_subscriber_info = _prepare_offnet_subscriber_info(); + } + + my $dest_peering_subscriber_info; + my $dest_reseller; + if ($dest_subscriber) { + $dest_subscriber->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::findby_id($dest_subscriber->{contract_id}); + $dest_subscriber->{contract}->{contact} = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_id($dest_subscriber->{contract}->{contact_id}); + $dest_subscriber->{contract}->{prov_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(undef,$dest_subscriber->{uuid}); + $dest_subscriber->{primary_alias} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberidisprimary($dest_subscriber->{contract}->{prov_subscriber}->{id},1); + $dest_subscriber->{domain} = $context->{domain_map}->{$dest_subscriber->{domain_id}}->{domain}; + $dest_reseller = $context->{reseller_map}->{$dest_subscriber->{contract}->{contact}->{reseller_id}}; + } else { + $dest_peering_subscriber_info = _prepare_offnet_subscriber_info(); + } + + my $source_ip = '192.168.0.1'; + my $time = time(); + my $duration = 120; + + $context->{cdr} = { + #id => , + #update_time => , + source_user_id => ($source_subscriber ? $source_subscriber->{uuid} : '0'), + source_provider_id => ($source_reseller ? $source_reseller->{contract_id} : '0'), + #source_external_subscriber_id => , + #source_external_contract_id => , + source_account_id => ($source_subscriber ? $source_subscriber->{contract_id} : '0'), + source_user => ($source_subscriber ? $source_subscriber->{username} : $source_peering_subscriber_info->{username}), + source_domain => ($source_subscriber ? $source_subscriber->{domain} : $source_peering_subscriber_info->{domain}), + source_cli => ($source_subscriber ? ($source_subscriber->{primary_alias}->{username} // $source_subscriber->{username}) : $source_peering_subscriber_info->{username}), + #source_clir => '0', + source_ip => $source_ip, + #source_gpp0 => , + #source_gpp1 => , + #source_gpp2 => , + #source_gpp3 => , + #source_gpp4 => , + #source_gpp5 => , + #source_gpp6 => , + #source_gpp7 => , + #source_gpp8 => , + #source_gpp9 => , + destination_user_id => ($dest_subscriber ? $dest_subscriber->{uuid} : '0'), + destination_provider_id => ($dest_reseller ? $dest_reseller->{contract_id} : '0'), + #destination_external_subscriber_id => , + #destination_external_contract_id => , + destination_account_id => ($dest_subscriber ? $dest_subscriber->{contract_id} : '0'), + destination_user => ($dest_subscriber ? $dest_subscriber->{username} : $dest_peering_subscriber_info->{username}), + destination_domain => ($dest_subscriber ? $dest_subscriber->{domain} : $dest_peering_subscriber_info->{domain}), + destination_user_dialed => ($dest_subscriber ? ($dest_subscriber->{primary_alias}->{username} // $dest_subscriber->{username}) : $dest_peering_subscriber_info->{username}), + destination_user_in => ($dest_subscriber ? ($dest_subscriber->{primary_alias}->{username} // $dest_subscriber->{username}) : $dest_peering_subscriber_info->{username}), + destination_domain_in => ($dest_subscriber ? $dest_subscriber->{domain} : $dest_peering_subscriber_info->{domain}), + #destination_gpp0 => , + #destination_gpp1 => , + #destination_gpp2 => , + #destination_gpp3 => , + #destination_gpp4 => , + #destination_gpp5 => , + #destination_gpp6 => , + #destination_gpp7 => , + #destination_gpp8 => , + #destination_gpp9 => , + #peer_auth_user => , + #peer_auth_realm => , + call_type => 'call', + call_status => 'ok', + call_code => '200', + init_time => $time, + start_time => $time, + duration => $duration, + call_id => _generate_call_id(), + #source_carrier_cost => , + #source_reseller_cost => , + #source_customer_cost => , + #source_carrier_free_time => , + #source_reseller_free_time => , + #source_customer_free_time => , + #source_carrier_billing_fee_id => , + #source_reseller_billing_fee_id => , + #source_customer_billing_fee_id => , + #source_carrier_billing_zone_id => , + #source_reseller_billing_zone_id => , + #source_customer_billing_zone_id => , + #destination_carrier_cost => , + #destination_reseller_cost => , + #destination_customer_cost => , + #destination_carrier_free_time => , + #destination_reseller_free_time => , + #destination_customer_free_time => , + #destination_carrier_billing_fee_id => , + #destination_reseller_billing_fee_id => , + #destination_customer_billing_fee_id => , + #destination_carrier_billing_zone_id => , + #destination_reseller_billing_zone_id => , + #destination_customer_billing_zone_id => , + #frag_carrier_onpeak => , + #frag_reseller_onpeak => , + #frag_customer_onpeak => , + #is_fragmented => , + #split => , + #rated_at => , + #rating_status => 'unrated', + #exported_at => , + #export_status => , + }; + + return $result; + +} + +sub _prepare_offnet_subscriber_info { + my ($username_primary_number,$domain) = @_; + lock %offnet_domain_subscriber_map; + my $n = 1 + scalar keys %offnet_domain_subscriber_map; + Data::Rmap::rmap { $_ =~ s//$n/; $_ =~ s//$n/; $_ =~ s//$t/; } ($domain); + $n = 1 + (exists $offnet_domain_subscriber_map{$domain} ? scalar keys %{$offnet_domain_subscriber_map{$domain}} : 0); + Data::Rmap::rmap { $_ =~ s//$n/; $_ =~ s//$n/; $_ =~ s//$t/; } ($username_primary_number); + my $username; + if ('HASH' eq ref $username_primary_number) { + $username = ($username_primary_number->{cc} // '') . ($username_primary_number->{ac} // '') . ($username_primary_number->{sn} // ''); + } else { + $username = $username_primary_number; + } + $offnet_domain_subscriber_map{$domain} = {} if not exists $offnet_domain_subscriber_map{$domain}; + $offnet_domain_subscriber_map{$domain}->{$username} = 1; + return { username => $username, domain => $domain }; +} + +sub _get_random_subscriber { + my ($context,$excluding_id) = @_; + + return NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::find_random( + $context->{db}, + $excluding_id, + { 'IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::ACTIVE_STATE }, + ((scalar @{$context->{reseller_ids}}) > 0 ? $context->{reseller_ids} : undef), + $context->{min_id},$context->{max_id}, + ); +} + +sub _generate_call_id { + return '*TEST*'._random_string(26,'a'..'z','A'..'Z',0..9,'-','.'); +} + +sub _random_string { + my ($length,@chars) = @_; + return join('',@chars[ map{ rand @chars } 1 .. $length ]); +} + +sub _get_threads_state { + my ($errorstates,$tid) = @_; + my $result = 0; + if (defined $errorstates and ref $errorstates eq 'HASH') { + lock $errorstates; + foreach my $threadid (keys %$errorstates) { + if (not defined $tid or $threadid != $tid) { + $result |= $errorstates->{$threadid}; + } + } + } + return $result; +} + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid},$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid},$message,getlogger(__PACKAGE__)); + } +} + +1; \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Provisioning.pm b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Provisioning.pm index 93fa4db..fba29a6 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Provisioning.pm +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Provisioning.pm @@ -12,7 +12,6 @@ use Tie::IxHash; use NGCP::BulkProcessor::Globals qw( $enablemultithreading - $cpucount ); use NGCP::BulkProcessor::Projects::Massive::Generator::Settings qw( @@ -28,6 +27,7 @@ use NGCP::BulkProcessor::Projects::Massive::Generator::Settings qw( $sipusername_length $sippassword_length + @providers ); @@ -105,7 +105,7 @@ my $ERROR = 4; my $STOP = 8; my $total_count :shared = 0; -my $db_lock :shared = undef; +#my $db_lock :shared = undef; sub provision_subscribers { @@ -114,7 +114,7 @@ sub provision_subscribers { destroy_dbs(); if ($result) { - if ($enablemultithreading and $provision_subscriber_multithreading and $cpucount > 1) { + if ($enablemultithreading and $provision_subscriber_multithreading and $provision_subscriber_count > 1) { $context->{subscriber_count} = int($provision_subscriber_count / $provision_subscriber_numofthreads); $context->{sn_increment} = $provision_subscriber_numofthreads; my %processors = (); @@ -185,60 +185,67 @@ sub _provision_subscriber { last if $subscriber_count >= $context->{subscriber_count}; $subscriber_count += 1; - next unless _provision_susbcriber_init_context($context); + next unless _provision_subscriber_init_context($context); my $retry = 1; while ($retry > 0) { - eval { - $context->{db}->db_begin(); - #_info($context,"test" . $subscriber_count); - #die() if (($tid == 1 or $tid == 0) and $subscriber_count == 500); - _create_contact($context); - _create_contract($context); - { - #lock $db_lock; #concurrent writes to voip_numbers causes deadlocks - lock $total_count; - _create_subscriber($context); - _create_aliases($context); - $total_count += 1; - _info($context,"$total_count subscribers created",($total_count % 10) > 0); - } - # _update_preferences($context); - # _set_registrations($context); - # _set_callforwards($context); - # #todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it - # #in this main provisioning loop, or align it in separate run-modes, according to the files given. - # - # } else { - # _warn($context,(scalar @$existing_billing_voip_subscribers) . ' existing billing subscribers found, skipping'); - # } - - if ($dry) { - $context->{db}->db_rollback(0); - } else { - $context->{db}->db_commit(); - } - }; - my $err = $@; - if ($err) { eval { - $context->{db}->db_rollback(1); + $context->{db}->db_begin(); + #_info($context,"test" . $subscriber_count); + #die() if (($tid == 1 or $tid == 0) and $subscriber_count == 500); + + if (NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::countby_ccacsn($context->{db}, + $context->{numbers}->{primary}->{cc}, + $context->{numbers}->{primary}->{ac}, + $context->{numbers}->{primary}->{sn}, + ) == 0) { + + _create_contact($context); + _create_contract($context); + { + #lock $db_lock; #concurrent writes to voip_numbers causes deadlocks + lock $total_count; + _create_subscriber($context); + _create_aliases($context); + $total_count += 1; + _info($context,"$total_count subscribers created",($total_count % 10) > 0); + } + # _update_preferences($context); + # _set_registrations($context); + # _set_callforwards($context); + # #todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it + # #in this main provisioning loop, or align it in separate run-modes, according to the files given. + # + } else { + _info($context,'subscriber with primary number $context->{numbers}->{primary}->{number} already exists, skipping',1); + } + + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } }; - if ($err =~ /deadlock/gi and $retry < $deadlock_retries) { - my $sleep = 0.01 * 2**$retry; - _info($context,"retrying in $sleep secs"); - sleep($sleep); - $retry += 1; - } elsif (not $skip_errors) { - undef $context->{db}; - destroy_dbs(); - lock $context->{errorstates}; - $context->{errorstates}->{$tid} = $ERROR; - return 0; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + }; + if ($err =~ /deadlock/gi and $retry < $deadlock_retries) { + my $sleep = 0.01 * 2**$retry; + _info($context,"retrying in $sleep secs"); + sleep($sleep); + $retry += 1; + } elsif (not $skip_errors) { + undef $context->{db}; + destroy_dbs(); + lock $context->{errorstates}; + $context->{errorstates}->{$tid} = $ERROR; + return 0; + } + } else { + $retry = 0; } - } else { - $retry = 0; - } } } undef $context->{db}; @@ -280,29 +287,50 @@ sub _provision_subscribers_create_context { my $result = 1; + if ((scalar @providers) == 0) { + _error($context,"load/create providers first"); + $result = 0; #even in skip-error mode.. + } + #$context->{providers} foreach my $provider (@providers) { + unless ($provider->{provider_fee}) { + _error($context,"no provider fee for reseller '$provider->{reseller}->{name}' found"); + $result = 0; #even in skip-error mode.. + } + if ((scalar @{$provider->{subscriber_fees}}) == 0) { + _error($context,"no subscriber fees for reseller '$provider->{reseller}->{name}' found"); + $result = 0; #even in skip-error mode.. + } + eval { $provider->{domain}->{prov_domain} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains::findby_domain($provider->{domain}->{domain}); }; if ($@ or not $provider->{domain}->{prov_domain}) { - rowprocessingerror(threadid(),"cannot find provisioning domain '$provider->{domain}->{domain}'",getlogger(__PACKAGE__)); + _error($context,"cannot find provisioning domain '$provider->{domain}->{domain}'"); $result = 0; #even in skip-error mode.. } else { - processing_info(threadid(),"provisioning domain '$provider->{domain}->{domain}' found",getlogger(__PACKAGE__)); + _info($context,"provisioning domain '$provider->{domain}->{domain}' found"); } + $provider->{numbers_per_subscriber} //= 1; + $provider->{numbers_per_subscriber} = 1 if $provider->{numbers_per_subscriber} <= 0; my ($sn_min,$sn_max) = split(/[: -]+/,$provider->{sn},2); my $sn_length = length($sn_min); $sn_length = length($sn_max) if length($sn_max) > $sn_length; if ($sn_length > 0 and $sn_max > $sn_min and $sn_min >= 0) { my @sn_block = map { zerofill($_,$sn_length); } ($sn_min..$sn_max); - - $provider->{sn_block} = \@sn_block; + if (($provision_subscriber_count * $provider->{numbers_per_subscriber}) > scalar @sn_block) { + _error($context,"sn range $provider->{sn} less than numbers needed ($provider->{numbers_per_subscriber} * $provision_subscriber_count)"); + $result = 0; #even in skip-error mode.. + } else { + $provider->{sn_block} = \@sn_block; + } #$provider->{sn_block_size} = scalar @sn_block; } else { - rowprocessingerror(threadid(),"invalid sn block definition for provider '$provider->{sn}'",getlogger(__PACKAGE__)); + _error($context,"invalid sn block definition for provider '$provider->{sn}'"); + $result = 0; #even in skip-error mode.. } } @@ -867,7 +895,7 @@ sub _create_aliases { # #} -sub _provision_susbcriber_init_context { +sub _provision_subscriber_init_context { my ($context) = @_; @@ -923,7 +951,7 @@ sub _provision_susbcriber_init_context { $context->{ncos_level} = undef; my @numbers = (); - foreach (1..($provider->{numbers_per_subscriber} // 1)) { + foreach (1..$provider->{numbers_per_subscriber}) { my $number = {}; my @cc = @{$provider->{cc}}; $number->{cc} = $cc[rand @cc]; @@ -957,6 +985,11 @@ sub _provision_susbcriber_init_context { $context->{numbers}->{primary} = shift(@{$context->{numbers}->{other}}); #return 0 unless scalar @{$context->{numbers}->{other}}; + #if ($number_for_sipusername) { + # $context->{prov_subscriber}->{username} = $context->{numbers}->{primary}->{number}; + # $context->{bill_subscriber}->{username} = $context->{numbers}->{primary}->{number}; + #} + $context->{voip_numbers} = {}; $context->{voip_numbers}->{primary} = undef; $context->{voip_numbers}->{other} = []; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Settings.pm index 0cbd961..0003f78 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/Settings.pm @@ -60,6 +60,10 @@ our @EXPORT_OK = qw( @providers $providers_yml + $generate_cdr_multithreading + $generate_cdr_numofthreads + $generate_cdr_count + ); our $defaultconfig = 'config.cfg'; @@ -86,6 +90,10 @@ our @provider_config = (); our @providers = (); our $providers_yml = undef; +our $generate_cdr_multithreading = $enablemultithreading; +our $generate_cdr_numofthreads = $cpucount; +our $generate_cdr_count = 0; + sub update_settings { my ($data,$configfile) = @_; @@ -121,6 +129,10 @@ sub update_settings { $providers_yml = $data->{providers_yml} if exists $data->{providers_yml}; + $generate_cdr_multithreading = $data->{generate_cdr_multithreading} if exists $data->{generate_cdr_multithreading}; + $generate_cdr_numofthreads = _get_numofthreads($cpucount,$data,'generate_cdr_numofthreads'); + $generate_cdr_count = $data->{generate_cdr_count} if exists $data->{generate_cdr_count}; + return $result; } diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/process.pl b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/process.pl index 38aebb5..6996aea 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/process.pl @@ -62,7 +62,7 @@ use NGCP::BulkProcessor::ConnectorPool qw(destroy_dbs); #use NGCP::BulkProcessor::Projects::Massive::Generator::Dao::Blah qw(); -#use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw(); +use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); @@ -70,6 +70,9 @@ use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); use NGCP::BulkProcessor::Projects::Massive::Generator::Provisioning qw( provision_subscribers ); +use NGCP::BulkProcessor::Projects::Massive::Generator::CDR qw( + generate_cdrs +); use NGCP::BulkProcessor::Projects::Massive::Generator::Api qw( setup_provider ); @@ -91,6 +94,9 @@ push(@TASK_OPTS,$setup_provider_task_opt); my $provision_subscriber_task_opt = 'provision_subscriber'; push(@TASK_OPTS,$provision_subscriber_task_opt); +my $generate_cdr_task_opt = 'generate_cdr'; +push(@TASK_OPTS,$generate_cdr_task_opt); + if (init()) { main(); exit(0); @@ -155,6 +161,13 @@ sub main() { $completion |= 1; } + } elsif (lc($generate_cdr_task_opt) eq lc($task)) { + if (taskinfo($generate_cdr_task_opt,$result,1)) { + next unless check_dry(); + $result &= generate_cdr_task(\@messages); + $completion |= 1; + } + } else { $result = 0; scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); @@ -273,6 +286,30 @@ sub provision_subscriber_task { } + +sub generate_cdr_task { + + my ($messages) = @_; + my ($result) = (0); + eval { + ($result) = generate_cdrs(); + }; + my $err = $@; + my $stats = ":"; + eval { + $stats .= "\n total CDRs: " . + NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::countby_ratingstatus(undef) . ' rows'; + }; + if ($err or !$result) { + push(@$messages,"generate cdrs INCOMPLETE$stats"); + } else { + push(@$messages,"generate cdrs completed$stats"); + } + destroy_dbs(); + return $result; + +} + #END { # # this should not be required explicitly, but prevents Log4Perl's # # "rootlogger not initialized error upon exit.. diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/providers.debug.yml b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/providers.debug.yml new file mode 100644 index 0000000..a0620ed --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/providers.debug.yml @@ -0,0 +1,128 @@ +- + domain: domain1.com + reseller: reseller1 + cc: + - 111 + ac: + - 456 + sn: '000001-999999' + numbers_per_subscriber: 1 + provider_rate: + prepaid: 0 + fees: + - + source: . + destination: . + direction: out + offpeak_follow_interval: 1 + offpeak_follow_rate: 0 + offpeak_init_interval: 1 + offpeak_init_rate: 0 + onpeak_follow_interval: 1 + onpeak_follow_rate: 0 + onpeak_init_interval: 1 + onpeak_init_rate: 0 + - + source: . + destination: ^[^1].+ + direction: out + offpeak_follow_interval: 1 + offpeak_follow_rate: 1 + offpeak_init_interval: 1 + offpeak_init_rate: 1 + onpeak_follow_interval: 1 + onpeak_follow_rate: 1 + onpeak_init_interval: 1 + onpeak_init_rate: 1 + subscriber_rates: + - + prepaid: 0 + fees: + - + source: . + destination: . + direction: out + offpeak_follow_interval: 1 + offpeak_follow_rate: 0 + offpeak_init_interval: 60 + offpeak_init_rate: 0 + onpeak_follow_interval: 1 + onpeak_follow_rate: 0 + onpeak_init_interval: 60 + onpeak_init_rate: 0 + - + source: . + destination: ^[^1].+ + direction: out + offpeak_follow_interval: 1 + offpeak_follow_rate: 1 + offpeak_init_interval: 60 + offpeak_init_rate: 1 + onpeak_follow_interval: 1 + onpeak_follow_rate: 1 + onpeak_init_interval: 60 + onpeak_init_rate: 1 +- + domain: domain2.com + reseller: reseller2 + cc: + - 222 + ac: + - 456 + sn: '000001-999999' + numbers_per_subscriber: 1 + provider_rate: + prepaid: 0 + fees: + - + source: . + destination: . + direction: out + offpeak_follow_interval: 1 + offpeak_follow_rate: 0 + offpeak_init_interval: 1 + offpeak_init_rate: 0 + onpeak_follow_interval: 1 + onpeak_follow_rate: 0 + onpeak_init_interval: 1 + onpeak_init_rate: 0 + - + source: . + destination: ^[^2].+ + direction: out + offpeak_follow_interval: 1 + offpeak_follow_rate: 1 + offpeak_init_interval: 1 + offpeak_init_rate: 1 + onpeak_follow_interval: 1 + onpeak_follow_rate: 1 + onpeak_init_interval: 1 + onpeak_init_rate: 1 + subscriber_rates: + - + prepaid: 0 + fees: + - + source: . + destination: . + direction: out + offpeak_follow_interval: 1 + offpeak_follow_rate: 0 + offpeak_init_interval: 60 + offpeak_init_rate: 0 + onpeak_follow_interval: 1 + onpeak_follow_rate: 0 + onpeak_init_interval: 60 + onpeak_init_rate: 0 + - + source: . + destination: ^[^2].+ + direction: out + offpeak_follow_interval: 1 + offpeak_follow_rate: 1 + offpeak_init_interval: 60 + offpeak_init_rate: 1 + onpeak_follow_interval: 1 + onpeak_follow_rate: 1 + onpeak_init_interval: 60 + onpeak_init_rate: 1 \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/providers.yml b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/providers.yml index e4e7b69..a0620ed 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/providers.yml +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/providers.yml @@ -1,110 +1,128 @@ - - domain: narf1.com - reseller: narf1 - numbers_per_subscriber: 3 + domain: domain1.com + reseller: reseller1 cc: - - 888 - - 999 + - 111 ac: - - 123 - 456 - sn: '000001-100000' -- - domain: narf2.com - reseller: narf2 - cc: - - 888 - - 999 - ac: - - 123 - - 456 - sn: '000001-100000' -- - domain: narf3.com - reseller: narf3 - cc: - - 888 - - 999 - ac: - - 123 - - 456 - sn: '000001-100000' + sn: '000001-999999' + numbers_per_subscriber: 1 provider_rate: prepaid: 0 fees: - - destination: ^888.+ + source: . + destination: . direction: out - offpeak_follow_interval: 5 - offpeak_follow_rate: 1 - offpeak_init_interval: 5 - offpeak_init_rate: 2 - onpeak_follow_interval: 5 - onpeak_follow_rate: 1 - onpeak_init_interval: 5 - onpeak_init_rate: 2 + offpeak_follow_interval: 1 + offpeak_follow_rate: 0 + offpeak_init_interval: 1 + offpeak_init_rate: 0 + onpeak_follow_interval: 1 + onpeak_follow_rate: 0 + onpeak_init_interval: 1 + onpeak_init_rate: 0 - - destination: . - direction: in - offpeak_follow_interval: 5 + source: . + destination: ^[^1].+ + direction: out + offpeak_follow_interval: 1 offpeak_follow_rate: 1 - offpeak_init_interval: 5 + offpeak_init_interval: 1 offpeak_init_rate: 1 - onpeak_follow_interval: 5 + onpeak_follow_interval: 1 onpeak_follow_rate: 1 - onpeak_init_interval: 5 - onpeak_init_rate: 1 - source: ^888.+ + onpeak_init_interval: 1 + onpeak_init_rate: 1 subscriber_rates: - prepaid: 0 fees: - - destination: ^8882.+ + source: . + destination: . direction: out - offpeak_follow_interval: 5 - offpeak_follow_rate: 1 - offpeak_init_interval: 5 - offpeak_init_rate: 6 - onpeak_follow_interval: 5 - onpeak_follow_rate: 1 - onpeak_init_interval: 5 - onpeak_init_rate: 6 + offpeak_follow_interval: 1 + offpeak_follow_rate: 0 + offpeak_init_interval: 60 + offpeak_init_rate: 0 + onpeak_follow_interval: 1 + onpeak_follow_rate: 0 + onpeak_init_interval: 60 + onpeak_init_rate: 0 - - destination: . - direction: in - offpeak_follow_interval: 5 + source: . + destination: ^[^1].+ + direction: out + offpeak_follow_interval: 1 offpeak_follow_rate: 1 - offpeak_init_interval: 5 - offpeak_init_rate: 5 - onpeak_follow_interval: 5 + offpeak_init_interval: 60 + offpeak_init_rate: 1 + onpeak_follow_interval: 1 onpeak_follow_rate: 1 - onpeak_init_interval: 5 - onpeak_init_rate: 5 - source: ^8881.+ + onpeak_init_interval: 60 + onpeak_init_rate: 1 +- + domain: domain2.com + reseller: reseller2 + cc: + - 222 + ac: + - 456 + sn: '000001-999999' + numbers_per_subscriber: 1 + provider_rate: + prepaid: 0 + fees: + - + source: . + destination: . + direction: out + offpeak_follow_interval: 1 + offpeak_follow_rate: 0 + offpeak_init_interval: 1 + offpeak_init_rate: 0 + onpeak_follow_interval: 1 + onpeak_follow_rate: 0 + onpeak_init_interval: 1 + onpeak_init_rate: 0 + - + source: . + destination: ^[^2].+ + direction: out + offpeak_follow_interval: 1 + offpeak_follow_rate: 1 + offpeak_init_interval: 1 + offpeak_init_rate: 1 + onpeak_follow_interval: 1 + onpeak_follow_rate: 1 + onpeak_init_interval: 1 + onpeak_init_rate: 1 + subscriber_rates: - - prepaid: 1 + prepaid: 0 fees: - - destination: ^8882.+ + source: . + destination: . direction: out - offpeak_follow_interval: 5 - offpeak_follow_rate: 1 - offpeak_init_interval: 5 - offpeak_init_rate: 4 - onpeak_follow_interval: 5 - onpeak_follow_rate: 1 - onpeak_init_interval: 5 - onpeak_init_rate: 4 + offpeak_follow_interval: 1 + offpeak_follow_rate: 0 + offpeak_init_interval: 60 + offpeak_init_rate: 0 + onpeak_follow_interval: 1 + onpeak_follow_rate: 0 + onpeak_init_interval: 60 + onpeak_init_rate: 0 - - destination: . - direction: in - offpeak_follow_interval: 5 + source: . + destination: ^[^2].+ + direction: out + offpeak_follow_interval: 1 offpeak_follow_rate: 1 - offpeak_init_interval: 5 - offpeak_init_rate: 3 - onpeak_follow_interval: 5 + offpeak_init_interval: 60 + offpeak_init_rate: 1 + onpeak_follow_interval: 1 onpeak_follow_rate: 1 - onpeak_init_interval: 5 - onpeak_init_rate: 3 - source: ^8881.+ \ No newline at end of file + onpeak_init_interval: 60 + onpeak_init_rate: 1 \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg index 91a1030..e06d5d4 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg @@ -8,7 +8,11 @@ webpassword_length = 8 webusername_length = 8 sippassword_length = 16 sipusername_length = 8 -provision_subscriber_count = 2000 +provision_subscriber_count = 20000 providers_yml = providers.yml +generate_cdr_multithreading = 1 +#generate_cdr_numofthreads = 2 +generate_cdr_count = 50000 + diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.debug.cfg new file mode 100644 index 0000000..7703b51 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.debug.cfg @@ -0,0 +1,17 @@ + +#dry=0 +#skip_errors=0 + +provision_subscriber_multithreading = 1 +#provision_subscriber_numofthreads = 2 +webpassword_length = 8 +webusername_length = 8 +sippassword_length = 16 +sipusername_length = 8 +provision_subscriber_count = 100 + +providers_yml = providers.yml + +generate_cdr_multithreading = 1 +#generate_cdr_numofthreads = 2 +generate_cdr_count = 100