diff --git a/lib/NGCP/BulkProcessor/Calendar.pm b/lib/NGCP/BulkProcessor/Calendar.pm index 4ebd56f4..aa953bc0 100644 --- a/lib/NGCP/BulkProcessor/Calendar.pm +++ b/lib/NGCP/BulkProcessor/Calendar.pm @@ -33,6 +33,7 @@ our @EXPORT_OK = qw( datetime_to_string datetime_from_string set_timezone + current_local ); my $is_fake_time = 0; @@ -78,6 +79,10 @@ sub _current_local { } } +sub current_local { + return DateTime->now(time_zone => $LOCAL); +} + sub infinite_future { #... to '9999-12-31 23:59:59' return DateTime->new(year => 9999, month => 12, day => 31, hour => 23, minute => 59, second => 59, diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_profiles.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_profiles.pm index ac7ceedd..0b2e1f91 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_profiles.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_profiles.pm @@ -21,7 +21,11 @@ our @EXPORT_OK = qw( check_table findby_id + findall findby_resellerid_name_handle + + $DEFAULT_PROFILE_FREE_CASH + $DEFAULT_PROFILE_FREE_TIME ); my $tablename = 'billing_profiles'; @@ -54,6 +58,9 @@ my $expected_fieldnames = [ my $indexes = {}; +our $DEFAULT_PROFILE_FREE_CASH = 0.0; +our $DEFAULT_PROFILE_FREE_TIME = 0; + sub new { my $class = shift; @@ -83,6 +90,21 @@ sub findby_id { } +sub findall { + + my ($load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table; + my $rows = $db->db_get_all_arrayref($stmt); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub findby_resellerid_name_handle { my ($reseller_id,$name,$handle,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm index 8353cb58..2fa57fb2 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm @@ -17,19 +17,27 @@ use NGCP::BulkProcessor::ConnectorPool qw( use NGCP::BulkProcessor::SqlProcessor qw( checktableinfo insert_record + update_record copy_row ); use NGCP::BulkProcessor::SqlRecord qw(); use NGCP::BulkProcessor::Calendar qw(is_infinite_future infinite_future set_timezone); +use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages qw(); + require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); our @EXPORT_OK = qw( gettablename check_table insert_row + update_row findby_contractid - sort_by_end + sort_by_end_desc + sort_by_end_asc + get_new_balance_values + get_free_ratio ); my $tablename = 'contract_balances'; @@ -49,6 +57,8 @@ my $expected_fieldnames = [ 'invoice_id', 'underrun_profiles', 'underrun_lock', + 'initial_cash_balance', + 'initial_free_time_balance', ]; my $indexes = {}; @@ -131,6 +141,15 @@ sub insert_row { } +sub update_row { + + my ($xa_db,$data) = @_; + + check_table(); + return update_record($get_db,$xa_db,__PACKAGE__,$data); + +} + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; @@ -161,10 +180,14 @@ sub buildrecords_fromrows { } -sub sort_by_end ($$) { +sub sort_by_end_asc ($$) { return _sort_by_date('_end',0,@_); } +sub sort_by_end_desc ($$) { + return _sort_by_date('_end',1,@_); +} + sub _sort_by_date { my ($ts_field,$desc,$a,$b) = @_; if ($desc) { @@ -189,6 +212,100 @@ sub _sort_by_date { } +sub get_new_balance_values { + my %params = @_; + my ($contract_create, + $last_balance, + $balance, + $initial_balance, + $carry_over_mode, + $notopup_expiration, + $last_cash_balance, + $last_cash_balance_interval) = @params{qw/ + contract_create + last_balance + balance + initial_balance + carry_over_mode + notopup_expiration + last_cash_balance + last_cash_balance_interval + /}; + my ($cash_balance, $cash_balance_interval, $free_time_balance, $free_time_balance_interval) = (0.0,0.0,0,0); + + $carry_over_mode //= $NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::DEFAULT_CARRY_OVER_MODE; + my $ratio; + if ($last_balance) { + $last_cash_balance //= $last_balance->{cash_balance}; + $last_cash_balance_interval //= $last_balance->{cash_balance_interval}; + if (($NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::CARRY_OVER_MODE eq $carry_over_mode + || ($NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::CARRY_OVER_TIMELY_MODE eq $carry_over_mode && $last_balance->{timely_topup_count} > 0) + ) && (!defined $notopup_expiration || $balance->{_start} < $notopup_expiration)) { + #if (!defined $last_profile) { + # my $bm_last = get_actual_billing_mapping(schema => $schema, contract => $contract, now => $last_balance->start); #end); !? + # $last_profile = $bm_last->billing_mappings->first->billing_profile; + #} + #my $contract_create = NGCP::Panel::Utils::DateTime::set_local_tz($contract->create_timestamp // $contract->modify_timestamp); + $ratio = 1.0; + if ($last_balance->{_start} <= $contract_create && $last_balance->{_end} >= $contract_create) { #$last_balance->end is never +inf here + $ratio = get_free_ratio($contract_create,$last_balance->{_start},$last_balance->{_end}); + } + my $old_free_cash = $ratio * ($last_balance->{_profile}->{interval_free_cash} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_CASH); + $cash_balance = $last_cash_balance; + if ($last_cash_balance_interval < $old_free_cash) { + $cash_balance += $last_cash_balance_interval - $old_free_cash; + } + #$ratio * $last_profile->interval_free_time // _DEFAULT_PROFILE_FREE_TIME + #} else { + # $c->log->debug('discarding contract ' . $contract->id . " cash balance (mode '$carry_over_mode'" . (defined $notopup_expiration ? ', notopup expiration ' . NGCP::Panel::Utils::DateTime::to_string($notopup_expiration) : '') . ')') if $c; + } + $ratio = 1.0; + } else { + $cash_balance = (defined $initial_balance ? $initial_balance : $NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::DEFAULT_INITIAL_BALANCE); + $ratio = get_free_ratio($contract_create,$balance->{_start},$balance->{_end}); + } + + my $free_cash = $ratio * ($balance->{_profile}->{interval_free_cash} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_CASH); + $cash_balance += $free_cash; + $cash_balance_interval = 0.0; + + my $free_time = $ratio * ($balance->{_profile}->{interval_free_time} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_TIME); + $free_time_balance = $free_time; + $free_time_balance_interval = 0; + + #$c->log->debug("ratio: $ratio, free cash: $free_cash, cash balance: $cash_balance, free time: $free_time, free time balance: $free_time_balance"); + + return {cash_balance => sprintf("%.4f",$cash_balance), + initial_cash_balance => sprintf("%.4f",$cash_balance), + cash_balance_interval => sprintf("%.4f",$cash_balance_interval), + free_time_balance => sprintf("%.0f",$free_time_balance), + initial_free_time_balance => sprintf("%.0f",$free_time_balance), + free_time_balance_interval => sprintf("%.0f",$free_time_balance_interval)}; + +} + +sub get_free_ratio { + my ($contract_create,$stime,$etime) = @_; + if (!is_infinite_future($etime)) { + my $ctime = ($contract_create->clone->truncate(to => 'day') > $stime ? $contract_create->clone->truncate(to => 'day') : $contract_create); + my $start_of_next_interval = _add_second($etime->clone,1); + #$c->log->debug("ratio = " . ($start_of_next_interval->epoch - $ctime->epoch) . ' / ' . ($start_of_next_interval->epoch - $stime->epoch)) if $c; + return ($start_of_next_interval->epoch - $ctime->epoch) / ($start_of_next_interval->epoch - $stime->epoch); + } + return 1.0; +} + +sub _add_second { + + my ($dt,$skip_leap_seconds) = @_; + $dt->add(seconds => 1); + while ($skip_leap_seconds and $dt->second() >= 60) { + $dt->add(seconds => 1); + } + return $dt; + +} + sub gettablename { return $tablename; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm index 4ce8932f..d6d12c1b 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm @@ -22,6 +22,9 @@ use NGCP::BulkProcessor::SqlProcessor qw( ); use NGCP::BulkProcessor::SqlRecord qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw(); + require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); our @EXPORT_OK = qw( @@ -32,8 +35,12 @@ our @EXPORT_OK = qw( countby_status_resellerid findby_contactid findby_id + forupdate_id process_records + process_free_cash_contracts + + countby_free_cash $ACTIVE_STATE $TERMINATED_STATE @@ -117,6 +124,24 @@ sub findby_id { } +sub forupdate_id { + + my ($xa_db,$id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ? FOR UPDATE'; + my @params = ($id); + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + sub countby_status_resellerid { my ($status,$reseller_id) = @_; @@ -150,6 +175,21 @@ sub countby_status_resellerid { } +sub countby_free_cash { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(DISTINCT c.id) FROM ' . $table . ' AS c' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings::gettablename()) . ' AS bm ON bm.contract_id = c.id' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::gettablename()) . ' AS bp ON bp.id = bm.billing_profile_id' . + ' WHERE c.status != "terminated" AND bp.interval_free_cash <> 0.0'; + + return $db->db_get_value($stmt); + +} + sub insert_row { my $db = &$get_db(); @@ -222,6 +262,49 @@ sub process_records { ); } +sub process_free_cash_contracts { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + /}; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'FROM ' . $table . ' AS c' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings::gettablename()) . ' AS bm ON bm.contract_id = c.id' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::gettablename()) . ' AS bp ON bp.id = bm.billing_profile_id' . + ' WHERE c.status != "terminated" AND bp.interval_free_cash <> 0.0'; + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,$rowblock,$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_dbs, + multithreading => $multithreading, + tableprocessing_threads => $numofthreads, + select => $db->paginate_sort_query("SELECT DISTINCT c.id " . $stmt,undef,undef,[{ column => 'c.id', numeric => 1, dir => 1 }]), + selectcount => "SELECT COUNT(DISTINCT c.id) " . $stmt, + ); +} sub buildrecords_fromrows { diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/profile_packages.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/profile_packages.pm new file mode 100644 index 00000000..6619d3d1 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/profile_packages.pm @@ -0,0 +1,144 @@ +package NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db + +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + findby_id + findall + + $CARRY_OVER_MODE + $CARRY_OVER_TIMELY_MODE + $DISCARD_MODE + $DEFAULT_CARRY_OVER_MODE + $DEFAULT_INITIAL_BALANCE +); + +my $tablename = 'profile_packages'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'reseller_id', + 'name', + 'description', + 'initial_balance', + 'service_charge', + 'balance_interval_unit', + 'balance_interval_value', + 'balance_interval_start_mode', + 'carry_over_mode', + 'timely_duration_unit', + 'timely_duration_value', + 'notopup_discard_intervals', + 'underrun_lock_threshold', + 'underrun_lock_level', + 'underrun_profile_threshold', + 'topup_lock_level', +]; + +my $indexes = {}; + +our $CARRY_OVER_MODE = 'carry_over'; +our $CARRY_OVER_TIMELY_MODE = 'carry_over_timely'; +our $DISCARD_MODE = 'discard'; +our $DEFAULT_CARRY_OVER_MODE = $CARRY_OVER_MODE; +our $DEFAULT_INITIAL_BALANCE = 0.0; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub findall { + + my ($load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table; + my $rows = $db->db_get_all_arrayref($stmt); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/topup_log.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/topup_log.pm new file mode 100644 index 00000000..efbb7a9f --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/topup_log.pm @@ -0,0 +1,232 @@ +package NGCP::BulkProcessor::Dao::Trunk::billing::topup_log; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowinserted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db + +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row + + insert_record +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + insert_row + + findby_contractidfromto + findby_contractbalanceid + findby_id + + $OK_OUTCOME + $FAILED_OUTCOME + + $VOUCHER_TYPE + $CASH_TYPE + $SET_BALANCE_TYPE +); + +my $tablename = 'topup_log'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'username', + 'timestamp', + 'type', + 'outcome', + 'message', + 'subscriber_id', + 'contract_id', + 'amount', + 'voucher_id', + 'cash_balance_before', + 'cash_balance_after', + 'package_before_id', + 'package_after_id', + 'profile_before_id', + 'profile_after_id', + 'lock_level_before', + 'lock_level_after', + 'contract_balance_before_id', + 'contract_balance_after_id', + 'request_token', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +our $OK_OUTCOME = 'ok'; +our $FAILED_OUTCOME = 'failed'; + +our $VOUCHER_TYPE = 'voucher'; +our $CASH_TYPE = 'cash'; +our $SET_BALANCE_TYPE = 'set_balance'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_contractidfromto { + + my ($contract_id,$from,$to,$outcome,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_id') . ' = ?'; + my @params = ($contract_id); + if ($from) { + $stmt .= ' AND ' . $db->columnidentifier('timestamp') . ' >= ?'; + push(@params,$from->epoch()); + } + if ($to) { + $stmt .= ' AND ' . $db->columnidentifier('timestamp') . ' <= ?'; + push(@params,$to->epoch()); + } + if ($outcome) { + $stmt .= ' AND ' . $db->columnidentifier('outcome') . ' = ?'; + push(@params,$outcome); + } + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_contractbalanceid { + + my ($id,$outcome,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_balance_before_id') . ' = ?'; + my @params = ($id); + if ($outcome) { + $stmt .= ' AND ' . $db->columnidentifier('outcome') . ' = ?'; + push(@params,$outcome); + } + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($contract_id) = @params{qw/ + contract_id + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('timestamp') . ', ' . + $db->columnidentifier('type') . ', ' . + $db->columnidentifier('outcome') .', ' . + $db->columnidentifier('contract_id') . ') VALUES (' . + 'UNIX_TIMESTAMP(NOW()), ' . + '\'' . $SET_BALANCE_TYPE . '\', ' . + '\'' . $FAILED_OUTCOME . '\', ' . + '?)', + $contract_id + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm index 24d156c1..a4f478d8 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/voip_subscribers.pm @@ -37,6 +37,7 @@ our @EXPORT_OK = qw( process_records find_minmaxid find_random + findby_contractid_states $TERMINATED_STATE $ACTIVE_STATE @@ -105,6 +106,34 @@ sub findby_domainid_username_states { } +sub findby_contractid_states { + + my ($xa_db,$contract_id,$states,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_id') . ' = ?'; + my @params = ($contract_id); + if (defined $states and 'HASH' eq ref $states) { + foreach my $in (keys %$states) { + my @values = (defined $states->{$in} and 'ARRAY' eq ref $states->{$in} ? @{$states->{$in}} : ($states->{$in})); + $stmt .= ' AND ' . $db->columnidentifier('status') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $states and length($states) > 0) { + $stmt .= ' AND ' . $db->columnidentifier('status') . ' = ?'; + push(@params,$states); + } + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub find_minmaxid { my ($xa_db,$states,$reseller_id) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/mr38/billing/contract_balances.pm b/lib/NGCP/BulkProcessor/Dao/mr38/billing/contract_balances.pm index ca217e56..72ca6adf 100644 --- a/lib/NGCP/BulkProcessor/Dao/mr38/billing/contract_balances.pm +++ b/lib/NGCP/BulkProcessor/Dao/mr38/billing/contract_balances.pm @@ -29,7 +29,8 @@ our @EXPORT_OK = qw( check_table insert_row findby_contractid - sort_by_end + sort_by_end_desc + sort_by_end_asc ); my $tablename = 'contract_balances'; @@ -163,10 +164,14 @@ sub buildrecords_fromrows { } -sub sort_by_end ($$) { +sub sort_by_end_asc ($$) { return _sort_by_date('_end',0,@_); } +sub sort_by_end_desc ($$) { + return _sort_by_date('_end',1,@_); +} + sub _sort_by_date { my ($ts_field,$desc,$a,$b) = @_; if ($desc) { diff --git a/lib/NGCP/BulkProcessor/Dao/mr457/billing/billing_mappings.pm b/lib/NGCP/BulkProcessor/Dao/mr457/billing/billing_mappings.pm new file mode 100644 index 00000000..41bd80c9 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr457/billing/billing_mappings.pm @@ -0,0 +1,168 @@ +package NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowinserted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + insert_record + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + insert_row + + findby_contractid_ts +); + +my $tablename = 'billing_mappings'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'start_date', + 'end_date', + 'billing_profile_id', + 'contract_id', + 'product_id', + 'network_id', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_contractid_ts { + + my ($xa_db,$contract_id,$dt,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_id') . ' = ?'; + my @params = ($contract_id); + if (defined $dt) { + $stmt .= ' AND (' . $db->columnidentifier('start_date') . ' IS NULL OR ' . $db->columnidentifier('start_date') . ' <= ? ) ' . + 'AND (' . $db->columnidentifier('end_date') . ' IS NULL OR ' . $db->columnidentifier('end_date') . ' >= ? ) ' . + 'ORDER BY ' . $db->columnidentifier('start_date') . ' DESC, ' . $db->columnidentifier('id') . ' DESC LIMIT 1'; + push(@params, $db->datetime_to_string($dt) ); + push(@params, $db->datetime_to_string($dt) ); + } + + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($billing_profile_id, + $contract_id, + $product_id) = @params{qw/ + billing_profile_id + contract_id + product_id + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('billing_profile_id') . ', ' . + $db->columnidentifier('contract_id') . ', ' . + $db->columnidentifier('end_date') . ', ' . + $db->columnidentifier('network_id') . ', ' . + $db->columnidentifier('product_id') . ', ' . + $db->columnidentifier('start_date') . ') VALUES (' . + '?, ' . + '?, ' . + 'NULL, ' . + 'NULL, ' . + '?, ' . + 'NULL)', + $billing_profile_id, + $contract_id, + $product_id, + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/mr457/billing/billing_profiles.pm b/lib/NGCP/BulkProcessor/Dao/mr457/billing/billing_profiles.pm new file mode 100644 index 00000000..9368ccdf --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr457/billing/billing_profiles.pm @@ -0,0 +1,176 @@ +package NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db + +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + findby_id + findall + findby_resellerid_name_handle + + $DEFAULT_PROFILE_FREE_CASH + $DEFAULT_PROFILE_FREE_TIME +); + +my $tablename = 'billing_profiles'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'reseller_id', + 'handle', + 'name', + 'prepaid', + 'interval_charge', + 'interval_free_time', + 'interval_free_cash', + 'interval_unit', + 'interval_count', + 'fraud_interval_limit', + 'fraud_interval_lock', + 'fraud_interval_notify', + 'fraud_daily_limit', + 'fraud_daily_lock', + 'fraud_daily_notify', + 'fraud_use_reseller_rates', + 'currency', + 'status', + 'modify_timestamp', + 'create_timestamp', + 'terminate_timestamp', +]; + +my $indexes = {}; + +our $DEFAULT_PROFILE_FREE_CASH = 0.0; +our $DEFAULT_PROFILE_FREE_TIME = 0; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub findall { + + my ($load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table; + my $rows = $db->db_get_all_arrayref($stmt); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_resellerid_name_handle { + + my ($reseller_id,$name,$handle,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table; + my @params = (); + my @terms = (); + if ($reseller_id) { + push(@terms,$db->columnidentifier('reseller_id') . ' = ?'); + push(@params,$reseller_id); + } + if ($name) { + push(@terms,$db->columnidentifier('name') . ' = ?'); + push(@params,$name); + } + if ($handle) { + push(@terms,$db->columnidentifier('handle') . ' = ?'); + push(@params,$handle); + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/mr457/billing/contract_balances.pm b/lib/NGCP/BulkProcessor/Dao/mr457/billing/contract_balances.pm new file mode 100644 index 00000000..61684b56 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr457/billing/contract_balances.pm @@ -0,0 +1,324 @@ +package NGCP::BulkProcessor::Dao::mr457::billing::contract_balances; +use strict; + +## no critic + +use DateTime qw(); + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowinserted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + insert_record + update_record + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); +use NGCP::BulkProcessor::Calendar qw(is_infinite_future infinite_future set_timezone); + +use NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::profile_packages qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + insert_row + update_row + findby_contractid + sort_by_end_desc + sort_by_end_asc + get_new_balance_values + get_free_ratio +); + +my $tablename = 'contract_balances'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'contract_id', + 'cash_balance', + 'cash_balance_interval', + 'free_time_balance', + 'free_time_balance_interval', + 'topup_count', + 'timely_topup_count', + 'start', + 'end', + 'invoice_id', + 'underrun_profiles', + 'underrun_lock', + #'initial_cash_balance', + #'initial_free_time_balance', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_contractid { + + my ($xa_db,$contract_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_id') . ' = ?'; + my @params = ($contract_id); + + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($contract_id) = @params{qw/ + contract_id + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('cash_balance') . ', ' . + $db->columnidentifier('cash_balance_interval') . ', ' . + $db->columnidentifier('contract_id') . ', ' . + $db->columnidentifier('end') . ', ' . + $db->columnidentifier('free_time_balance') . ', ' . + $db->columnidentifier('free_time_balance_interval') . ', ' . + $db->columnidentifier('start') . ', ' . + $db->columnidentifier('underrun_lock') . ', ' . + $db->columnidentifier('underrun_profiles') . ') VALUES (' . + '0.0, ' . + '0.0, ' . + '?, ' . + 'CONCAT(LAST_DAY(NOW()),\' 23:59:59\'), ' . + '0, ' . + '0, ' . + 'CONCAT(SUBDATE(CURDATE(),(DAY(CURDATE())-1)),\' 00:00:00\'), ' . + 'NULL, ' . + 'NULL)', + $contract_id, + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub update_row { + + my ($xa_db,$data) = @_; + + check_table(); + return update_record($get_db,$xa_db,__PACKAGE__,$data); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + my $db = &$get_db(); + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + my $end = $db->datetime_from_string($record->{end},undef); + if (is_infinite_future($end)) { + $record->{_end} = infinite_future(); + } else { + $record->{_end} = set_timezone($end); + } + + $record->{_start} = $db->datetime_from_string($record->{start},'local'); + + push @records,$record; + } + } + + return \@records; + +} + +sub sort_by_end_asc ($$) { + return _sort_by_date('_end',0,@_); +} + +sub sort_by_end_desc ($$) { + return _sort_by_date('_end',1,@_); +} + +sub _sort_by_date { + my ($ts_field,$desc,$a,$b) = @_; + if ($desc) { + $desc = -1; + } else { + $desc = 1; + } + #use Data::Dumper; + #print Dumper($a); + #print Dumper($b); + my $a_inf = is_infinite_future($a->{$ts_field}); + my $b_inf = is_infinite_future($b->{$ts_field}); + if ($a_inf and $b_inf) { + return 0; + } elsif ($a_inf) { + return 1 * $desc; + } elsif ($b_inf) { + return -1 * $desc; + } else { + return DateTime->compare($a->{$ts_field}, $b->{$ts_field}) * $desc; + } + +} + +sub get_new_balance_values { + my %params = @_; + my ($contract_create, + $last_balance, + $balance, + $initial_balance, + $carry_over_mode, + $notopup_expiration, + $last_cash_balance, + $last_cash_balance_interval) = @params{qw/ + contract_create + last_balance + balance + initial_balance + carry_over_mode + notopup_expiration + last_cash_balance + last_cash_balance_interval + /}; + my ($cash_balance, $cash_balance_interval, $free_time_balance, $free_time_balance_interval) = (0.0,0.0,0,0); + + $carry_over_mode //= $NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::DEFAULT_CARRY_OVER_MODE; + my $ratio; + if ($last_balance) { + $last_cash_balance //= $last_balance->{cash_balance}; + $last_cash_balance_interval //= $last_balance->{cash_balance_interval}; + if (($NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::CARRY_OVER_MODE eq $carry_over_mode + || ($NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::CARRY_OVER_TIMELY_MODE eq $carry_over_mode && $last_balance->{timely_topup_count} > 0) + ) && (!defined $notopup_expiration || $balance->{_start} < $notopup_expiration)) { + #if (!defined $last_profile) { + # my $bm_last = get_actual_billing_mapping(schema => $schema, contract => $contract, now => $last_balance->start); #end); !? + # $last_profile = $bm_last->billing_mappings->first->billing_profile; + #} + #my $contract_create = NGCP::Panel::Utils::DateTime::set_local_tz($contract->create_timestamp // $contract->modify_timestamp); + $ratio = 1.0; + if ($last_balance->{_start} <= $contract_create && $last_balance->{_end} >= $contract_create) { #$last_balance->end is never +inf here + $ratio = get_free_ratio($contract_create,$last_balance->{_start},$last_balance->{_end}); + } + my $old_free_cash = $ratio * ($last_balance->{_profile}->{interval_free_cash} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_CASH); + $cash_balance = $last_cash_balance; + if ($last_cash_balance_interval < $old_free_cash) { + $cash_balance += $last_cash_balance_interval - $old_free_cash; + } + #$ratio * $last_profile->interval_free_time // _DEFAULT_PROFILE_FREE_TIME + #} else { + # $c->log->debug('discarding contract ' . $contract->id . " cash balance (mode '$carry_over_mode'" . (defined $notopup_expiration ? ', notopup expiration ' . NGCP::Panel::Utils::DateTime::to_string($notopup_expiration) : '') . ')') if $c; + } + $ratio = 1.0; + } else { + $cash_balance = (defined $initial_balance ? $initial_balance : $NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::DEFAULT_INITIAL_BALANCE); + $ratio = get_free_ratio($contract_create,$balance->{_start},$balance->{_end}); + } + + my $free_cash = $ratio * ($balance->{_profile}->{interval_free_cash} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_CASH); + $cash_balance += $free_cash; + $cash_balance_interval = 0.0; + + my $free_time = $ratio * ($balance->{_profile}->{interval_free_time} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_TIME); + $free_time_balance = $free_time; + $free_time_balance_interval = 0; + + #$c->log->debug("ratio: $ratio, free cash: $free_cash, cash balance: $cash_balance, free time: $free_time, free time balance: $free_time_balance"); + + return {cash_balance => sprintf("%.4f",$cash_balance), + initial_cash_balance => sprintf("%.4f",$cash_balance), + cash_balance_interval => sprintf("%.4f",$cash_balance_interval), + free_time_balance => sprintf("%.0f",$free_time_balance), + initial_free_time_balance => sprintf("%.0f",$free_time_balance), + free_time_balance_interval => sprintf("%.0f",$free_time_balance_interval)}; + +} + +sub get_free_ratio { + my ($contract_create,$stime,$etime) = @_; + if (!is_infinite_future($etime)) { + my $ctime = ($contract_create->clone->truncate(to => 'day') > $stime ? $contract_create->clone->truncate(to => 'day') : $contract_create); + my $start_of_next_interval = _add_second($etime->clone,1); + #$c->log->debug("ratio = " . ($start_of_next_interval->epoch - $ctime->epoch) . ' / ' . ($start_of_next_interval->epoch - $stime->epoch)) if $c; + return ($start_of_next_interval->epoch - $ctime->epoch) / ($start_of_next_interval->epoch - $stime->epoch); + } + return 1.0; +} + +sub _add_second { + + my ($dt,$skip_leap_seconds) = @_; + $dt->add(seconds => 1); + while ($skip_leap_seconds and $dt->second() >= 60) { + $dt->add(seconds => 1); + } + return $dt; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/mr457/billing/contracts.pm b/lib/NGCP/BulkProcessor/Dao/mr457/billing/contracts.pm new file mode 100644 index 00000000..7cc7cb99 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr457/billing/contracts.pm @@ -0,0 +1,345 @@ +package NGCP::BulkProcessor::Dao::mr457::billing::contracts; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowinserted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db + destroy_dbs +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + insert_record + copy_row + + process_table +); +use NGCP::BulkProcessor::SqlRecord qw(); + +use NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + insert_row + + countby_status_resellerid + findby_contactid + findby_id + forupdate_id + + process_records + process_free_cash_contracts + + countby_free_cash + + $ACTIVE_STATE + $TERMINATED_STATE +); + +my $tablename = 'contracts'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'customer_id', + 'contact_id', + 'order_id', + 'profile_package_id', + 'status', + 'external_id', + 'modify_timestamp', + 'create_timestamp', + 'activate_timestamp', + 'terminate_timestamp', + 'max_subscribers', + 'send_invoice', + 'subscriber_email_template_id', + 'passreset_email_template_id', + 'invoice_email_template_id', + 'invoice_template_id', + 'vat_rate', + 'add_vat', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +our $ACTIVE_STATE = 'active'; +our $TERMINATED_STATE = 'terminated'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_contactid { + + my ($contact_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contact_id') . ' = ?'; + my @params = ($contact_id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub forupdate_id { + + my ($xa_db,$id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ? FOR UPDATE'; + my @params = ($id); + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub countby_status_resellerid { + + my ($status,$reseller_id) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' AS contract' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::mr457::billing::contacts::gettablename()) . ' AS contact ON contract.contact_id = contact.id'; + my @params = (); + my @terms = (); + if ($status) { + push(@terms,'contract.status = ?'); + push(@params,$status); + } + if ($reseller_id) { + if ('ARRAY' eq ref $reseller_id) { + push(@terms,'contact.reseller_id IN (' . substr(',?' x scalar @$reseller_id,1) . ')'); + push(@params,@$reseller_id); + } else { + push(@terms,'contact.reseller_id = ?'); + push(@params,$reseller_id); + } + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + + return $db->db_get_value($stmt,@params); + +} + +sub countby_free_cash { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(DISTINCT c.id) FROM ' . $table . ' AS c' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings::gettablename()) . ' AS bm ON bm.contract_id = c.id' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles::gettablename()) . ' AS bp ON bp.id = bm.billing_profile_id' . + ' WHERE c.status != "terminated" AND bp.interval_free_cash <> 0.0'; + + return $db->db_get_value($stmt); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($contact_id) = @params{qw/ + contact_id + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('contact_id') . ', ' . + $db->columnidentifier('create_timestamp') . ', ' . + $db->columnidentifier('modify_timestamp') . ', ' . + $db->columnidentifier('status') . ') VALUES (' . + '?, ' . + 'NOW(), ' . + 'NOW(), ' . + '\'' . $ACTIVE_STATE . '\')', + $contact_id, + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub process_records { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + check_table(); + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_dbs, + multithreading => $multithreading, + tableprocessing_threads => $numofthreads, + ); +} + +sub process_free_cash_contracts { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + /}; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'FROM ' . $table . ' AS c' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings::gettablename()) . ' AS bm ON bm.contract_id = c.id' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles::gettablename()) . ' AS bp ON bp.id = bm.billing_profile_id' . + ' WHERE c.status != "terminated" AND bp.interval_free_cash <> 0.0'; + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,$rowblock,$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_dbs, + multithreading => $multithreading, + tableprocessing_threads => $numofthreads, + select => $db->paginate_sort_query("SELECT DISTINCT c.id " . $stmt,undef,undef,[{ column => 'c.id', numeric => 1, dir => 1 }]), + selectcount => "SELECT COUNT(DISTINCT c.id) " . $stmt, + ); +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/mr457/billing/profile_packages.pm b/lib/NGCP/BulkProcessor/Dao/mr457/billing/profile_packages.pm new file mode 100644 index 00000000..a583d0dd --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr457/billing/profile_packages.pm @@ -0,0 +1,144 @@ +package NGCP::BulkProcessor::Dao::mr457::billing::profile_packages; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db + +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + findby_id + findall + + $CARRY_OVER_MODE + $CARRY_OVER_TIMELY_MODE + $DISCARD_MODE + $DEFAULT_CARRY_OVER_MODE + $DEFAULT_INITIAL_BALANCE +); + +my $tablename = 'profile_packages'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'reseller_id', + 'name', + 'description', + 'initial_balance', + 'service_charge', + 'balance_interval_unit', + 'balance_interval_value', + 'balance_interval_start_mode', + 'carry_over_mode', + 'timely_duration_unit', + 'timely_duration_value', + 'notopup_discard_intervals', + 'underrun_lock_threshold', + 'underrun_lock_level', + 'underrun_profile_threshold', + 'topup_lock_level', +]; + +my $indexes = {}; + +our $CARRY_OVER_MODE = 'carry_over'; +our $CARRY_OVER_TIMELY_MODE = 'carry_over_timely'; +our $DISCARD_MODE = 'discard'; +our $DEFAULT_CARRY_OVER_MODE = $CARRY_OVER_MODE; +our $DEFAULT_INITIAL_BALANCE = 0.0; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub findall { + + my ($load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table; + my $rows = $db->db_get_all_arrayref($stmt); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/mr457/billing/topup_log.pm b/lib/NGCP/BulkProcessor/Dao/mr457/billing/topup_log.pm new file mode 100644 index 00000000..2c95b7e0 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr457/billing/topup_log.pm @@ -0,0 +1,232 @@ +package NGCP::BulkProcessor::Dao::mr457::billing::topup_log; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowinserted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db + +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row + + insert_record +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + insert_row + + findby_contractidfromto + findby_contractbalanceid + findby_id + + $OK_OUTCOME + $FAILED_OUTCOME + + $VOUCHER_TYPE + $CASH_TYPE + +); + +my $tablename = 'topup_log'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'username', + 'timestamp', + 'type', + 'outcome', + 'message', + 'subscriber_id', + 'contract_id', + 'amount', + 'voucher_id', + 'cash_balance_before', + 'cash_balance_after', + 'package_before_id', + 'package_after_id', + 'profile_before_id', + 'profile_after_id', + 'lock_level_before', + 'lock_level_after', + 'contract_balance_before_id', + 'contract_balance_after_id', + 'request_token', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +our $OK_OUTCOME = 'ok'; +our $FAILED_OUTCOME = 'failed'; + +our $VOUCHER_TYPE = 'voucher'; +our $CASH_TYPE = 'cash'; +#our $SET_BALANCE_TYPE = 'set_balance'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_contractidfromto { + + my ($contract_id,$from,$to,$outcome,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_id') . ' = ?'; + my @params = ($contract_id); + if ($from) { + $stmt .= ' AND ' . $db->columnidentifier('timestamp') . ' >= ?'; + push(@params,$from->epoch()); + } + if ($to) { + $stmt .= ' AND ' . $db->columnidentifier('timestamp') . ' <= ?'; + push(@params,$to->epoch()); + } + if ($outcome) { + $stmt .= ' AND ' . $db->columnidentifier('outcome') . ' = ?'; + push(@params,$outcome); + } + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_contractbalanceid { + + my ($id,$outcome,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_balance_before_id') . ' = ?'; + my @params = ($id); + if ($outcome) { + $stmt .= ' AND ' . $db->columnidentifier('outcome') . ' = ?'; + push(@params,$outcome); + } + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($contract_id) = @params{qw/ + contract_id + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('timestamp') . ', ' . + $db->columnidentifier('type') . ', ' . + $db->columnidentifier('outcome') .', ' . + $db->columnidentifier('contract_id') . ') VALUES (' . + 'UNIX_TIMESTAMP(NOW()), ' . + '\'' . $CASH_TYPE . '\', ' . + '\'' . $FAILED_OUTCOME . '\', ' . + '?)', + $contract_id + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Check.pm b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Check.pm index 2eafb915..77d6fb4f 100644 --- a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Check.pm +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Check.pm @@ -8,21 +8,34 @@ no strict 'refs'; use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw(); use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw(); -#use NGCP::BulkProcessor::RestRequests::mr38::Contracts qw(); -#use NGCP::BulkProcessor::RestRequests::mr38::Customers qw(); -#use NGCP::BulkProcessor::RestRequests::mr38::BillingProfiles qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::contracts qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::contract_balances qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::topup_log qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::profile_packages qw(); + +#use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw(); +#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); +#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); +#use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw(); require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( - check_billing_db_tables + check_fix_contract_balance_gaps_tables + check_fix_free_cash_tables ); #check_rest_get_items my $NOK = 'NOK'; my $OK = 'ok'; -sub check_billing_db_tables { +sub check_fix_contract_balance_gaps_tables { my ($messages) = @_; @@ -38,10 +51,53 @@ sub check_billing_db_tables { ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr38::billing::contract_balances'); $result &= $check_result; push(@$messages,$message); - #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr38::billing::lnp_providers'); - #if (not $check_result) { - # ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers'); - #} + return $result; + +} + +sub check_fix_free_cash_tables { + + my ($messages) = @_; + + my $result = 1; + my $check_result; + my $message; + + my $message_prefix = 'NGCP billing db tables - '; + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::contracts'); + $result &= $check_result; push(@$messages,$message); + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::contract_balances'); + $result &= $check_result; push(@$messages,$message); + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings'); + $result &= $check_result; push(@$messages,$message); + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles'); + $result &= $check_result; push(@$messages,$message); + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::topup_log'); + $result &= $check_result; push(@$messages,$message); + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::profile_packages'); + $result &= $check_result; push(@$messages,$message); + + #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::domains'); + #$result &= $check_result; push(@$messages,$message); + #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::resellers'); + #$result &= $check_result; push(@$messages,$message); + #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers'); + #$result &= $check_result; push(@$messages,$message); + #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::contracts'); + #$result &= $check_result; push(@$messages,$message); + #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::contacts'); + #$result &= $check_result; push(@$messages,$message); + #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers'); + #$result &= $check_result; push(@$messages,$message); + #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases'); + #$result &= $check_result; push(@$messages,$message); + #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::accounting::cdr'); #$result &= $check_result; push(@$messages,$message); return $result; diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Contracts.pm b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Contracts.pm index e45d4fdb..1ad613d9 100644 --- a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Contracts.pm +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Contracts.pm @@ -7,12 +7,22 @@ use threads::shared qw(); #use List::Util qw(); use DateTime qw(); +use NGCP::BulkProcessor::Globals qw( + $system_abbreviation +); + use NGCP::BulkProcessor::Projects::Disaster::Balances::Settings qw( $dry $skip_errors $fix_contract_balance_gaps_multithreading $fix_contract_balance_gaps_numofthreads + + $fix_free_cash_multithreading + $fix_free_cash_numofthreads + + $write_topup_log + $apply_negative_delta ); #$set_preference_bulk_numofthreads @@ -30,7 +40,21 @@ use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw(); use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw(); use NGCP::BulkProcessor::Dao::mr38::billing::billing_mappings qw(); - +use NGCP::BulkProcessor::Dao::mr457::billing::contracts qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::contract_balances qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::topup_log qw(); +use NGCP::BulkProcessor::Dao::mr457::billing::profile_packages qw(); + +#use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw(); +#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); +#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); +#use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw(); use NGCP::BulkProcessor::ConnectorPool qw( get_xa_db @@ -41,13 +65,19 @@ use NGCP::BulkProcessor::ConnectorPool qw( ); use NGCP::BulkProcessor::Utils qw(threadid); +use NGCP::BulkProcessor::Array qw(array_to_map); +use NGCP::BulkProcessor::Calendar qw(current_local is_infinite_future); +use NGCP::BulkProcessor::SqlConnectors::MySQLDB qw(); require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( fix_contract_balance_gaps + fix_free_cash ); +my $create_sample_cdr = 0; + sub fix_contract_balance_gaps { my $static_context = {}; @@ -92,20 +122,13 @@ sub fix_contract_balance_gaps { ),$warning_count); } - -sub _check_insert_tables { - - #NGCP::BulkProcessor::Dao::mr38::provisioning::voip_usr_preferences::check_table(); - -} - sub _fix_contract_balance_gaps { my ($context) = @_; eval { $context->{db}->db_begin(); my $last_balance = undef; - foreach my $contract_balance (sort NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::sort_by_end @{$context->{contract_balances}}) { + foreach my $contract_balance (sort NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::sort_by_end_asc @{$context->{contract_balances}}) { #print " " . $contract_balance->{id} . " " . $contract_balance->{_start} . ' ' . $contract_balance->{_end} . "\n"; if (defined $last_balance) { my $gap_start = $last_balance->{_end}->clone->add(seconds => 1); @@ -193,7 +216,7 @@ sub _insert_contract_balances { sub _fix_contract_balance_gaps_checks { my ($context) = @_; - my $result = _checks($context); + my $result = 1; return $result; } @@ -202,114 +225,464 @@ sub _reset_fix_contract_balance_gaps_context { my ($context,$contract,$rownum) = @_; - my $result = _reset_context($context,$contract,$rownum); + my $result = 1; - $context->{contract_balances} = NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::findby_contractid($context->{db},$context->{contract}->{id}); + $context->{rownum} = $rownum; - #$context->{barring_profile} = $imported_subscriber->{barring_profile}; - #$context->{ncos_level} = $context->{ncos_level_map}->{$context->{barring_profile}}; + $context->{contract} = $contract; - #delete $context->{adm_ncos_id_preference_id}; + $context->{contract_balances} = NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::findby_contractid($context->{db},$context->{contract}->{id}); return $result; } -sub _checks { + + + + + + + + +sub fix_free_cash { + + my $static_context = {}; + my $result = _fix_free_cash_checks($static_context); + + destroy_dbs(); + my $warning_count :shared = 0; + return ($result && NGCP::BulkProcessor::Dao::mr457::billing::contracts::process_free_cash_contracts( + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + my $rownum = $row_offset; + foreach my $row (@$records) { + $rownum++; + next unless _reset_fix_free_cash_context($context,$row->[0],$rownum); + _fix_free_cash($context); + } + + #return 0; + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_xa_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + # below is not mandatory.. + _check_insert_tables(); + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + load_recursive => 0, + multithreading => $fix_free_cash_multithreading, + numofthreads => $fix_free_cash_numofthreads, + ),$warning_count); +} + +sub _fix_free_cash { my ($context) = @_; - my $result = 1; - #my $optioncount = 0; - #eval { - # $optioncount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::FeatureOption::countby_subscribernumber_option(); - #}; - #if ($@ or $optioncount == 0) { - # rowprocessingerror(threadid(),'please import subscriber features first',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} - #my $userpasswordcount = 0; - #eval { - # $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::countby_fqdn(); - #}; - #if ($@ or $userpasswordcount == 0) { - # rowprocessingerror(threadid(),'please import user passwords first',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} - #my $subscribercount = 0; - #my $subscriber_barring_profiles = []; - #eval { - # $subscribercount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::countby_subscribernumber(); - # $subscriber_barring_profiles = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::list_barringprofiles(); - #}; - #if ($@ or $subscribercount == 0) { - # rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} + eval { - return $result; + $context->{db}->set_transaction_isolation($NGCP::BulkProcessor::SqlConnectors::MySQLDB::READ_COMMITTED); + $context->{db}->db_begin(); + my $contract_id = $context->{contract}->{id}; + my $contract = NGCP::BulkProcessor::Dao::mr457::billing::contracts::forupdate_id($context->{db},$contract_id); + $context->{contract} = $contract; + my $actual_profile = $context->{actual_profile}; + + my @balances = sort NGCP::BulkProcessor::Dao::mr457::billing::contract_balances::sort_by_end_desc + @{NGCP::BulkProcessor::Dao::mr457::billing::contract_balances::findby_contractid($context->{db},$contract_id)}; + my $balance = $balances[0]; + my $mapping = NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings::findby_contractid_ts($context->{db}, + $contract_id,$balance->{_start})->[0]; + if ($mapping) { + my $billing_profile = $context->{billing_profile_map}->{$mapping->{billing_profile_id}}; + my $free_cash = $billing_profile->{interval_free_cash} // 0.0; + my $next_free_cash = $actual_profile->{interval_free_cash} // 0.0; + my $free_cash_carry_over = $next_free_cash; + if ($balance->{cash_balance_interval} < $free_cash) { + $free_cash_carry_over += $balance->{cash_balance_interval} - $free_cash; + } + my $delta = $next_free_cash - ($balance->{cash_balance} + $free_cash_carry_over); + + if ($delta > 0.0 or ($delta < 0.0 and $apply_negative_delta)) { + NGCP::BulkProcessor::Dao::mr457::billing::contract_balances::update_row($context->{db},{ + id => $balance->{id}, + cash_balance => ($balance->{cash_balance} + $delta), + }); + + NGCP::BulkProcessor::Dao::mr457::billing::topup_log::insert_row($context->{db},{ + timestamp => $context->{now}->epoch(), + type => $NGCP::BulkProcessor::Dao::mr457::billing::topup_log::CASH_TYPE, #SET_BALANCE_TYPE, + outcome => $NGCP::BulkProcessor::Dao::mr457::billing::topup_log::OK_OUTCOME, + contract_id => $contract_id, + amount => $delta, + cash_balance_before => $balance->{cash_balance}, + cash_balance_after => ($balance->{cash_balance} + $delta), + package_before_id => $contract->{profile_package_id}, + package_after_id => $contract->{profile_package_id}, + profile_before_id => $context->{actual_profile}->{id}, + profile_after_id => $context->{actual_profile}->{id}, + contract_balance_before_id => $balance->{id}, + contract_balance_after_id => $balance->{id}, + request_token => $system_abbreviation, + }) if $write_topup_log; + + _info($context,"($context->{rownum}) " . "contract id $contract_id cash balance is $balance->{cash_balance} cents, adding $delta cents to match free cash of $context->{actual_profile}->{interval_free_cash} cents ($context->{actual_profile}->{name}) for next interval" + . (is_infinite_future($balance->{_end}) ? '' : ' ' . $balance->{_end}->clone->add(seconds => 1))); + + if ($create_sample_cdr and not is_infinite_future($balance->{_end}) and _generate_cdr_init_context($context,$balance->{_end}->clone->add(seconds => 1))) { + NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::insert_row($context->{db},$context->{cdr}); + } + } else { + _info($context,"($context->{rownum}) " . "contract id $contract_id cash balance is $balance->{cash_balance} cents, SKIP adding $delta cents to match free cash of $context->{actual_profile}->{interval_free_cash} cents ($context->{actual_profile}->{name}) for next interval" + . (is_infinite_future($balance->{_end}) ? '' : ' ' . $balance->{_end}->clone->add(seconds => 1))); + } + + } else { + if ($skip_errors) { + _warn($context,"($context->{rownum}) " . "no billing mapping at $balance->{_start} for contract id $contract_id"); + } else { + _error($context,"($context->{rownum}) " . "no billing mapping at $balance->{_start} for contract id $contract_id"); + } + } + + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } + + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + }; + if ($skip_errors) { + _warn($context,"($context->{rownum}) " . 'database error with contract id ' . $context->{contract}->{id} . ': ' . $err); + } else { + _error($context,"($context->{rownum}) " . 'database error with contract id ' . $context->{contract}->{id} . ': ' . $err); + } + } } -sub _reset_context { - my ($context,$contract,$rownum) = @_; +sub _fix_free_cash_checks { + my ($context) = @_; my $result = 1; + $context->{now} = current_local(); + + my $profile_count = 0; + eval { + ($context->{billing_profile_map},my $ids,my $profiles) = array_to_map(NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles::findall(), + sub { return shift->{id}; }, sub { return shift; }, 'first' ); + $profile_count = (scalar keys %{$context->{billing_profile_map}}); + }; + if ($@ or $profile_count == 0) { + _error($context,"cannot find any billing profiles"); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"$profile_count billing profiles cached"); + } + + my $package_count = 0; + eval { + ($context->{profile_package_map},my $ids,my $packages) = array_to_map(NGCP::BulkProcessor::Dao::mr457::billing::profile_packages::findall(), + sub { return shift->{id}; }, sub { return shift; }, 'first' ); + $package_count = (scalar keys %{$context->{profile_package_map}}); + }; + if ($@) { + _error($context,"cannot find profile packages"); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"$package_count profile packages cached"); + } + + if ($create_sample_cdr) { + my $domain_count = 0; + eval { + ($context->{domain_map},my $ids,my $domains) = array_to_map(NGCP::BulkProcessor::Dao::Trunk::billing::domains::findall(), + sub { return shift->{id}; }, sub { return shift; }, 'first' ); + $domain_count = (scalar keys %{$context->{domain_map}}); + }; + if ($@ or $domain_count == 0) { + _error($context,"cannot find any domains"); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"$domain_count domains cached"); + } + + my $reseller_count = 0; + eval { + ($context->{reseller_map},my $ids,my $resellers) = array_to_map(NGCP::BulkProcessor::Dao::Trunk::billing::resellers::findall(), + sub { return shift->{id}; }, sub { return shift; }, 'first' ); + $reseller_count = (scalar keys %{$context->{reseller_map}}); + }; + if ($@ or $reseller_count == 0) { + _error($context,"cannot find any resellers"); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"$reseller_count resellers cached"); + } + + my $active_count = 0; + eval { + $active_count = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid( + $NGCP::BulkProcessor::Dao::Trunk::billing::contracts::ACTIVE_STATE, + undef + ); + ($context->{min_id},$context->{max_id}) = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::find_minmaxid(undef, + { 'IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::ACTIVE_STATE }, + undef + ); + }; + if ($@ or $active_count == 0) { + _error($context,"cannot find active subscribers"); + $result = 0; #even in skip-error mode.. + } else { + _info($context,"$active_count active subscribers found"); + } + } + + my $contract_free_cash_count = 0; + eval { + $contract_free_cash_count = NGCP::BulkProcessor::Dao::mr457::billing::contracts::countby_free_cash(); + }; + if ($@ or $contract_free_cash_count == 0) { + rowprocessingerror(threadid(),'no contracts with free cash billing profiles',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } + + return $result; +} + +sub _reset_fix_free_cash_context { + + my ($context,$contract_id,$rownum) = @_; + + my $result = 0; + $context->{rownum} = $rownum; + my $contract = NGCP::BulkProcessor::Dao::mr457::billing::contracts::findby_id($contract_id); + $context->{contract} = $contract; - #$context->{cli} = $imported_subscriber->subscribernumber(); - #$context->{e164} = {}; - #$context->{e164}->{cc} = substr($context->{cli},0,3); - #$context->{e164}->{ac} = ''; - #$context->{e164}->{sn} = substr($context->{cli},3); - - #$context->{subscriberdelta} = $imported_subscriber->{delta}; - - #my $userpassword = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::findby_fqdn($context->{cli}); - #if (defined $userpassword) { - # $context->{username} = (defined $subsciber_username_prefix ? $subsciber_username_prefix : '') . $userpassword->{username}; - # $context->{password} = $userpassword->{password}; - # $context->{userpassworddelta} = $userpassword->{delta}; - #} else { - # # once full username+passwords is available: - # delete $context->{username}; - # delete $context->{password}; - # delete $context->{userpassworddelta}; - # if ($context->{subscriberdelta} eq - # $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::deleted_delta) { - # - # } else { - # $result &= 0; - # - # # for now, as username+passwords are incomplete: - # #$context->{username} = $context->{e164}->{sn}; - # #$context->{password} = $context->{username}; - # #$context->{userpassworddelta} = $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::updated_delta; - # - # if ($skip_errors) { - # # for now, as username+passwords are incomplete: - # _warn($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli}); - # } else { - # _error($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli}); - # } - # } - #} - # - #delete $context->{billing_voip_subscriber}; - #delete $context->{provisioning_voip_subscriber}; + # the goal is to adjust the latest contract_balances.cash_balance, so the + # next contract_balance will start with the exact free cash expected: + + my $actual_profile; + my $mapping = NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings::findby_contractid_ts($context->{db}, + $contract_id,$context->{now})->[0]; # the actual profile is considered for the next balance interval + if ($mapping) { + $actual_profile = $context->{billing_profile_map}->{$mapping->{billing_profile_id}}; + if (($actual_profile->{interval_free_cash} // 0.0) != 0.0) { + my $carry_over_mode = $NGCP::BulkProcessor::Dao::mr457::billing::profile_packages::DEFAULT_CARRY_OVER_MODE; + $carry_over_mode = $context->{profile_package_map}->{$contract->{profile_package_id}}->{carry_over_mode} if $contract->{profile_package_id}; + if ($carry_over_mode ne $NGCP::BulkProcessor::Dao::mr457::billing::profile_packages::DISCARD_MODE) { + $result = 1; + } else { + _info($context,"($context->{rownum}) " . 'contract id ' . $contract_id . ' skipped, is in ' . $carry_over_mode . ' carry over mode'); + } + } else { + _info($context,"($context->{rownum}) " . 'contract id ' . $contract_id . ' skipped, used a billing profile with free cash in the past'); + } + } else { + if ($skip_errors) { + _warn($context,"($context->{rownum}) " . "no billing mapping at $context->{now} for contract id $contract_id"); + } else { + _error($context,"($context->{rownum}) " . "no billing mapping at $context->{now} for contract id $contract_id"); + } + } + + $context->{actual_profile} = $actual_profile; return $result; } +sub _generate_cdr_init_context { + my ($context,$time) = @_; + #my $result = 1; + #my $provider = $providers[rand @providers]; + + my $source_subscriber; + $source_subscriber = _get_subscriber($context); + my $dest_subscriber; + $dest_subscriber = undef; + + return 0 unless $source_subscriber; + + my $source_peering_subscriber_info; + my $source_reseller; + if ($source_subscriber) { + $source_subscriber->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::findby_id($source_subscriber->{contract_id}); + $source_subscriber->{contract}->{contact} = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_id($source_subscriber->{contract}->{contact_id}); + $source_subscriber->{contract}->{prov_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(undef,$source_subscriber->{uuid}); + $source_subscriber->{primary_alias} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberidisprimary($source_subscriber->{contract}->{prov_subscriber}->{id},1); + $source_subscriber->{domain} = $context->{domain_map}->{$source_subscriber->{domain_id}}->{domain}; + $source_reseller = $context->{reseller_map}->{$source_subscriber->{contract}->{contact}->{reseller_id}}; + } else { + $source_peering_subscriber_info = _prepare_offnet_subscriber_info("source","offnet.com"); + } + + my $dest_peering_subscriber_info; + my $dest_reseller; + if ($dest_subscriber) { + $dest_subscriber->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::findby_id($dest_subscriber->{contract_id}); + $dest_subscriber->{contract}->{contact} = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_id($dest_subscriber->{contract}->{contact_id}); + $dest_subscriber->{contract}->{prov_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(undef,$dest_subscriber->{uuid}); + $dest_subscriber->{primary_alias} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberidisprimary($dest_subscriber->{contract}->{prov_subscriber}->{id},1); + $dest_subscriber->{domain} = $context->{domain_map}->{$dest_subscriber->{domain_id}}->{domain}; + $dest_reseller = $context->{reseller_map}->{$dest_subscriber->{contract}->{contact}->{reseller_id}}; + } else { + $dest_peering_subscriber_info = _prepare_offnet_subscriber_info("destination","offnet.com"); + } + + my $source_ip = '192.168.0.1'; + #my $time = time(); + my $duration = 0; + + $context->{cdr} = { + #id => , + #update_time => , + source_user_id => ($source_subscriber ? $source_subscriber->{uuid} : '0'), + source_provider_id => ($source_reseller ? $source_reseller->{contract_id} : '0'), + #source_external_subscriber_id => , + #source_external_contract_id => , + source_account_id => ($source_subscriber ? $source_subscriber->{contract_id} : '0'), + source_user => ($source_subscriber ? $source_subscriber->{username} : $source_peering_subscriber_info->{username}), + source_domain => ($source_subscriber ? $source_subscriber->{domain} : $source_peering_subscriber_info->{domain}), + source_cli => ($source_subscriber ? ($source_subscriber->{primary_alias}->{username} // $source_subscriber->{username}) : $source_peering_subscriber_info->{username}), + #source_clir => '0', + source_ip => $source_ip, + #source_gpp0 => , + #source_gpp1 => , + #source_gpp2 => , + #source_gpp3 => , + #source_gpp4 => , + #source_gpp5 => , + #source_gpp6 => , + #source_gpp7 => , + #source_gpp8 => , + #source_gpp9 => , + destination_user_id => ($dest_subscriber ? $dest_subscriber->{uuid} : '0'), + destination_provider_id => ($dest_reseller ? $dest_reseller->{contract_id} : '0'), + #destination_external_subscriber_id => , + #destination_external_contract_id => , + destination_account_id => ($dest_subscriber ? $dest_subscriber->{contract_id} : '0'), + destination_user => ($dest_subscriber ? $dest_subscriber->{username} : $dest_peering_subscriber_info->{username}), + destination_domain => ($dest_subscriber ? $dest_subscriber->{domain} : $dest_peering_subscriber_info->{domain}), + destination_user_dialed => ($dest_subscriber ? ($dest_subscriber->{primary_alias}->{username} // $dest_subscriber->{username}) : $dest_peering_subscriber_info->{username}), + destination_user_in => ($dest_subscriber ? ($dest_subscriber->{primary_alias}->{username} // $dest_subscriber->{username}) : $dest_peering_subscriber_info->{username}), + destination_domain_in => ($dest_subscriber ? $dest_subscriber->{domain} : $dest_peering_subscriber_info->{domain}), + #destination_gpp0 => , + #destination_gpp1 => , + #destination_gpp2 => , + #destination_gpp3 => , + #destination_gpp4 => , + #destination_gpp5 => , + #destination_gpp6 => , + #destination_gpp7 => , + #destination_gpp8 => , + #destination_gpp9 => , + #peer_auth_user => , + #peer_auth_realm => , + call_type => 'call', + call_status => 'ok', + call_code => '200', + init_time => $time->epoch, + start_time => $time->epoch, + duration => $duration, + call_id => _generate_call_id(), + #source_carrier_cost => , + #source_reseller_cost => , + #source_customer_cost => , + #source_carrier_free_time => , + #source_reseller_free_time => , + #source_customer_free_time => , + #source_carrier_billing_fee_id => , + #source_reseller_billing_fee_id => , + #source_customer_billing_fee_id => , + #source_carrier_billing_zone_id => , + #source_reseller_billing_zone_id => , + #source_customer_billing_zone_id => , + #destination_carrier_cost => , + #destination_reseller_cost => , + #destination_customer_cost => , + #destination_carrier_free_time => , + #destination_reseller_free_time => , + #destination_customer_free_time => , + #destination_carrier_billing_fee_id => , + #destination_reseller_billing_fee_id => , + #destination_customer_billing_fee_id => , + #destination_carrier_billing_zone_id => , + #destination_reseller_billing_zone_id => , + #destination_customer_billing_zone_id => , + #frag_carrier_onpeak => , + #frag_reseller_onpeak => , + #frag_customer_onpeak => , + #is_fragmented => , + #split => , + #rated_at => , + #rating_status => 'unrated', + #exported_at => , + #export_status => , + }; + + return 1; + +} + +sub _get_subscriber { + my ($context,$excluding_id) = @_; + + return NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::findby_contractid_states( + $context->{db}, + $context->{contract}->{id}, + { 'IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::ACTIVE_STATE }, + )->[0]; +} + +sub _prepare_offnet_subscriber_info { + my ($username_primary_number,$domain) = @_; + return { username => $username_primary_number, domain => $domain }; +} + +sub _generate_call_id { + return '*TEST*'._random_string(26,'a'..'z','A'..'Z',0..9,'-','.'); +} + +sub _random_string { + my ($length,@chars) = @_; + return join('',@chars[ map{ rand @chars } 1 .. $length ]); +} + +sub _check_insert_tables { + + #NGCP::BulkProcessor::Dao::mr38::provisioning::voip_usr_preferences::check_table(); + +} sub _error { diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Settings.pm index da7ea7d6..c51dca26 100644 --- a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Settings.pm @@ -26,7 +26,7 @@ use NGCP::BulkProcessor::LoadConfig qw( split_tuple parse_regexp ); -use NGCP::BulkProcessor::Utils qw(prompt); +use NGCP::BulkProcessor::Utils qw(prompt stringtobool); #format_number check_ipnet require Exporter; @@ -44,6 +44,12 @@ our @EXPORT_OK = qw( $fix_contract_balance_gaps_multithreading $fix_contract_balance_gaps_numofthreads + + $fix_free_cash_multithreading + $fix_free_cash_numofthreads + + $write_topup_log + $apply_negative_delta ); our $defaultconfig = 'config.cfg'; @@ -56,6 +62,12 @@ our $skip_errors = 0; our $fix_contract_balance_gaps_multithreading = $enablemultithreading; our $fix_contract_balance_gaps_numofthreads = $cpucount; +our $fix_free_cash_multithreading = $enablemultithreading; +our $fix_free_cash_numofthreads = $cpucount; + +our $write_topup_log = 0; +our $apply_negative_delta = 0; + sub update_settings { my ($data,$configfile) = @_; @@ -75,6 +87,12 @@ sub update_settings { $fix_contract_balance_gaps_multithreading = $data->{fix_contract_balance_gaps_multithreading} if exists $data->{fix_contract_balance_gaps_multithreading}; $fix_contract_balance_gaps_numofthreads = _get_numofthreads($cpucount,$data,'fix_contract_balance_gaps_numofthreads'); + $fix_free_cash_multithreading = $data->{fix_free_cash_multithreading} if exists $data->{fix_free_cash_multithreading}; + $fix_free_cash_numofthreads = _get_numofthreads($cpucount,$data,'fix_free_cash_numofthreads'); + + $write_topup_log = stringtobool($data->{write_topup_log}) if exists $data->{write_topup_log}; + $apply_negative_delta = stringtobool($data->{apply_negative_delta}) if exists $data->{apply_negative_delta}; + return $result; } diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.cfg b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.cfg index 0b874d5b..8988e0b1 100644 --- a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.cfg @@ -9,28 +9,28 @@ jobservers = 127.0.0.1:4730 #provisioning_conf = /etc/ngcp-panel/provisioning.conf ##NGCP MySQL connectivity - "accounting" db: -accounting_host = 127.0.0.1 +accounting_host = somehost accounting_port = 3306 accounting_databasename = accounting accounting_username = root accounting_password = ##NGCP MySQL connectivity - "billing" db: -billing_host = 192.168.0.74 +billing_host = somehost billing_port = 3306 billing_databasename = billing billing_username = root billing_password = ##NGCP MySQL connectivity - "provisioning" db: -provisioning_host = 192.168.0.74 +provisioning_host = somehost provisioning_port = 3306 provisioning_databasename = provisioning provisioning_username = root provisioning_password = ##NGCP MySQL connectivity - "kamailio" db: -kamailio_host = 192.168.0.74 +kamailio_host = somehost kamailio_port = 3306 kamailio_databasename = kamailio kamailio_username = root @@ -44,7 +44,7 @@ xa_username = root xa_password = ##NGCP REST-API connectivity: -ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_uri = https://somehost:1443 ngcprestapi_username = administrator ngcprestapi_password = administrator ngcprestapi_realm = api_admin_http diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.debug.cfg new file mode 100644 index 00000000..d45ffe9b --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.debug.cfg @@ -0,0 +1,62 @@ +##general settings: +working_path = /home/rkrenn/temp/soco +cpucount = 4 +enablemultithreading = 0 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +#provisioning_conf = /etc/ngcp-panel/provisioning.conf + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = 192.168.0.84 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = 192.168.0.84 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = 192.168.0.84 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = 192.168.0.84 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = 192.168.0.84 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = DEBUG +screenloglevel = INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/process.pl b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/process.pl index 425984ce..5093192c 100644 --- a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/process.pl @@ -50,16 +50,21 @@ use NGCP::BulkProcessor::Mail qw( cleanupmsgfiles ); -use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw(); -use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw(); +#use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw(); +#use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw(); + +#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); +#use NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances qw(); use NGCP::BulkProcessor::Projects::Disaster::Balances::Check qw( - check_billing_db_tables + check_fix_contract_balance_gaps_tables + check_fix_free_cash_tables ); #check_rest_get_items use NGCP::BulkProcessor::Projects::Disaster::Balances::Contracts qw( fix_contract_balance_gaps + fix_free_cash ); #use NGCP::BulkProcessor::Projects::Disaster::Balances::Api qw( @@ -77,15 +82,21 @@ my @TASK_OPTS = (); my $tasks = []; -my $check_task_opt = 'check'; -push(@TASK_OPTS,$check_task_opt); +my $check_fix_contract_balance_gaps_task_opt = 'check_fix_gaps'; +push(@TASK_OPTS,$check_fix_contract_balance_gaps_task_opt); + +my $check_fix_free_cash_task_opt = 'check_fix_free_cash'; +push(@TASK_OPTS,$check_fix_free_cash_task_opt); my $cleanup_task_opt = 'cleanup'; push(@TASK_OPTS,$cleanup_task_opt); -my $fix_contract_balance_gaps_task_opt = 'fix_contract_balance_gaps'; +my $fix_contract_balance_gaps_task_opt = 'fix_gaps'; push(@TASK_OPTS,$fix_contract_balance_gaps_task_opt); +my $fix_free_cash_task_opt = 'fix_free_cash'; +push(@TASK_OPTS,$fix_free_cash_task_opt); + if (init()) { main(); exit(0); @@ -127,8 +138,10 @@ sub main() { scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; foreach my $task (@$tasks) { - if (lc($check_task_opt) eq lc($task)) { - $result &= check_task(\@messages) if taskinfo($check_task_opt,$result); + if (lc($check_fix_contract_balance_gaps_task_opt) eq lc($task)) { + $result &= check_fix_contract_balance_gaps_task(\@messages) if taskinfo($check_fix_contract_balance_gaps_task_opt,$result); + } elsif (lc($check_fix_free_cash_task_opt) eq lc($task)) { + $result &= check_fix_free_cash_task(\@messages) if taskinfo($check_fix_free_cash_task_opt,$result); } elsif (lc($cleanup_task_opt) eq lc($task)) { $result &= cleanup_task(\@messages) if taskinfo($cleanup_task_opt,$result); @@ -139,6 +152,13 @@ sub main() { $completion |= 1; } + } elsif (lc($fix_free_cash_task_opt) eq lc($task)) { + if (taskinfo($fix_free_cash_task_opt,$result)) { + next unless check_dry(); + $result &= fix_free_cash_task(\@messages); + $completion |= 1; + } + } else { $result = 0; scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); @@ -166,10 +186,27 @@ sub taskinfo { return $result; } -sub check_task { +sub check_fix_contract_balance_gaps_task { my ($messages) = @_; my @check_messages = (); - my $result = check_billing_db_tables(\@check_messages); + my $result = check_fix_contract_balance_gaps_tables(\@check_messages); + #$result &= .. + push(@$messages,join("\n",@check_messages)); + + #@check_messages = (); + #$result = check_provisioning_db_tables(\@check_messages); + ##$result &= .. + #push(@$messages,join("\n",@check_messages)); + + + destroy_dbs(); + return $result; +} + +sub check_fix_free_cash_task { + my ($messages) = @_; + my @check_messages = (); + my $result = check_fix_free_cash_tables(\@check_messages); #$result &= .. push(@$messages,join("\n",@check_messages)); @@ -234,6 +271,34 @@ sub fix_contract_balance_gaps_task { } + +sub fix_free_cash_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = fix_free_cash(); + #if ($batch) { + # ($result,$warning_count) = set_barring_profiles_batch(); + #} else { + # ($result,$warning_count) = set_barring_profiles(); + #} + }; + my $err = $@; + my $stats = ($skip_errors ? ": $warning_count warnings" : ''); + eval { + + }; + if ($err or !$result) { + push(@$messages,"fix free cash INCOMPLETE$stats"); + } else { + push(@$messages,"fix free cash completed$stats"); + } + destroy_dbs(); #every task should leave with closed connections. + return $result; + +} + #END { # # this should not be required explicitly, but prevents Log4Perl's # # "rootlogger not initialized error upon exit.. diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.cfg index 7c776e0f..7e3e3391 100644 --- a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.cfg @@ -2,3 +2,11 @@ #dry=0 #skip_errors=0 +fix_contract_balance_gaps_numofthreads=2 +fix_contract_balance_gaps_multithreading=1 + +fix_free_cash_numofthreads=2 +fix_free_cash_multithreading=1 + +write_topup_log=0 +apply_negative_delta=0 \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.debug.cfg new file mode 100644 index 00000000..816958e0 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.debug.cfg @@ -0,0 +1,9 @@ + +#dry=0 +#skip_errors=0 + +fix_contract_balance_gaps_numofthreads=2 +fix_contract_balance_gaps_multithreading=1 + +fix_free_cash_numofthreads=2 +fix_free_cash_multithreading=1 diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg index e06d5d40..8b18e86e 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg @@ -8,11 +8,10 @@ webpassword_length = 8 webusername_length = 8 sippassword_length = 16 sipusername_length = 8 -provision_subscriber_count = 20000 +provision_subscriber_count = 100000 providers_yml = providers.yml generate_cdr_multithreading = 1 #generate_cdr_numofthreads = 2 -generate_cdr_count = 50000 - +generate_cdr_count = 500000 diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.debug.cfg index 7703b511..dc5157ad 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.debug.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.debug.cfg @@ -14,4 +14,4 @@ providers_yml = providers.yml generate_cdr_multithreading = 1 #generate_cdr_numofthreads = 2 -generate_cdr_count = 100 +generate_cdr_count = 500 diff --git a/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm b/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm index 7c4c1576..1429599e 100644 --- a/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm +++ b/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm @@ -66,6 +66,8 @@ our $ITEM_REL_PARAM = 'item_rel'; my $request_charset = 'utf-8'; my $response_charset = 'utf-8'; +my $timeout = 5*60; + sub _get_api_cert_dir { return $working_path . $API_CERT_DIR; } @@ -129,6 +131,7 @@ sub _setup_ua { verify_hostname => 0, SSL_verify_mode => 0, ); + $ua->timeout($timeout) if $timeout; if ($self->{username}) { $ua->credentials($netloc, $self->{realm}, $self->{username}, $self->{password}); } diff --git a/lib/NGCP/BulkProcessor/SqlConnector.pm b/lib/NGCP/BulkProcessor/SqlConnector.pm index cfe75469..264c026f 100644 --- a/lib/NGCP/BulkProcessor/SqlConnector.pm +++ b/lib/NGCP/BulkProcessor/SqlConnector.pm @@ -63,6 +63,15 @@ sub new { } +sub set_transaction_isolation { + + my ($self,$level) = @_; + + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); + return undef; + +} + sub _gettemptablename { my $self = shift; my $temp_tablename = 'TMP_TBL_' . $self->{tid} . '_'; @@ -122,6 +131,16 @@ sub columnidentifier { my $columnname = shift; my (@params) = @_; + return join('.',map { $self->_columnidentifier($_,@params); } split(/\./,$columnname,-1)); + +} + +sub _columnidentifier { + + my $self = shift; + my $columnname = shift; + my (@params) = @_; + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); return undef; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm index 84d841bf..ae76fcd9 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm @@ -149,7 +149,7 @@ sub tableidentifier { } -sub columnidentifier { +sub _columnidentifier { my $self = shift; my $columnname = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm index 9e9e2dc2..7b255ec2 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/MySQLDB.pm @@ -25,7 +25,7 @@ use NGCP::BulkProcessor::SqlConnector; require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::SqlConnector); -our @EXPORT_OK = qw(get_tableidentifier); +our @EXPORT_OK = qw(get_tableidentifier $READ_COMMITTED); my $defaulthost = '127.0.0.1'; my $defaultport = '3306'; @@ -55,6 +55,8 @@ my $rowblock_transactional = 1; my $serialization_level = ''; #'SERIALIZABLE' +our $READ_COMMITTED = 'READ COMMITTED'; + sub new { my $class = shift; @@ -101,7 +103,7 @@ sub tableidentifier { } -sub columnidentifier { +sub _columnidentifier { my $self = shift; my $columnname = shift; @@ -248,6 +250,13 @@ sub db_connect { } +sub set_transaction_isolation { + + my ($self,$level) = @_; + return $self->db_do("SET TRANSACTION ISOLATION LEVEL $level"); + +} + sub vacuum { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm index c5bab61a..17e0c2e4 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/OracleDB.pm @@ -103,7 +103,7 @@ sub tableidentifier { } -sub columnidentifier { +sub _columnidentifier { my $self = shift; my $columnname = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm index 4e84497e..af5a027f 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/PostgreSQLDB.pm @@ -98,7 +98,7 @@ sub tableidentifier { } -sub columnidentifier { +sub _columnidentifier { my $self = shift; my $columnname = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm index d03cc50d..7e545af6 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/SQLServerDB.pm @@ -98,7 +98,7 @@ sub tableidentifier { return $tablename; } -sub columnidentifier { +sub _columnidentifier { my $self = shift; my $columnname = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm index a38bf5c8..f1c4efa3 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm @@ -115,7 +115,7 @@ sub tableidentifier { } -sub columnidentifier { +sub _columnidentifier { my $self = shift; my $columnname = shift;