TT#30671 TT#33351 balance free cash restauration tool

Change-Id: I738868ae7e8e0e9b665464596aac86ec30856238
changes/52/19152/14
Rene Krenn 8 years ago
parent b8111d9d91
commit f8f1e28349

@ -33,6 +33,7 @@ our @EXPORT_OK = qw(
datetime_to_string
datetime_from_string
set_timezone
current_local
);
my $is_fake_time = 0;
@ -78,6 +79,10 @@ sub _current_local {
}
}
sub current_local {
return DateTime->now(time_zone => $LOCAL);
}
sub infinite_future {
#... to '9999-12-31 23:59:59'
return DateTime->new(year => 9999, month => 12, day => 31, hour => 23, minute => 59, second => 59,

@ -21,7 +21,11 @@ our @EXPORT_OK = qw(
check_table
findby_id
findall
findby_resellerid_name_handle
$DEFAULT_PROFILE_FREE_CASH
$DEFAULT_PROFILE_FREE_TIME
);
my $tablename = 'billing_profiles';
@ -54,6 +58,9 @@ my $expected_fieldnames = [
my $indexes = {};
our $DEFAULT_PROFILE_FREE_CASH = 0.0;
our $DEFAULT_PROFILE_FREE_TIME = 0;
sub new {
my $class = shift;
@ -83,6 +90,21 @@ sub findby_id {
}
sub findall {
my ($load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my $rows = $db->db_get_all_arrayref($stmt);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_resellerid_name_handle {
my ($reseller_id,$name,$handle,$load_recursive) = @_;

@ -17,19 +17,27 @@ use NGCP::BulkProcessor::ConnectorPool qw(
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
update_record
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Calendar qw(is_infinite_future infinite_future set_timezone);
use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
update_row
findby_contractid
sort_by_end
sort_by_end_desc
sort_by_end_asc
get_new_balance_values
get_free_ratio
);
my $tablename = 'contract_balances';
@ -49,6 +57,8 @@ my $expected_fieldnames = [
'invoice_id',
'underrun_profiles',
'underrun_lock',
'initial_cash_balance',
'initial_free_time_balance',
];
my $indexes = {};
@ -131,6 +141,15 @@ sub insert_row {
}
sub update_row {
my ($xa_db,$data) = @_;
check_table();
return update_record($get_db,$xa_db,__PACKAGE__,$data);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
@ -161,10 +180,14 @@ sub buildrecords_fromrows {
}
sub sort_by_end ($$) {
sub sort_by_end_asc ($$) {
return _sort_by_date('_end',0,@_);
}
sub sort_by_end_desc ($$) {
return _sort_by_date('_end',1,@_);
}
sub _sort_by_date {
my ($ts_field,$desc,$a,$b) = @_;
if ($desc) {
@ -189,6 +212,100 @@ sub _sort_by_date {
}
sub get_new_balance_values {
my %params = @_;
my ($contract_create,
$last_balance,
$balance,
$initial_balance,
$carry_over_mode,
$notopup_expiration,
$last_cash_balance,
$last_cash_balance_interval) = @params{qw/
contract_create
last_balance
balance
initial_balance
carry_over_mode
notopup_expiration
last_cash_balance
last_cash_balance_interval
/};
my ($cash_balance, $cash_balance_interval, $free_time_balance, $free_time_balance_interval) = (0.0,0.0,0,0);
$carry_over_mode //= $NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::DEFAULT_CARRY_OVER_MODE;
my $ratio;
if ($last_balance) {
$last_cash_balance //= $last_balance->{cash_balance};
$last_cash_balance_interval //= $last_balance->{cash_balance_interval};
if (($NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::CARRY_OVER_MODE eq $carry_over_mode
|| ($NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::CARRY_OVER_TIMELY_MODE eq $carry_over_mode && $last_balance->{timely_topup_count} > 0)
) && (!defined $notopup_expiration || $balance->{_start} < $notopup_expiration)) {
#if (!defined $last_profile) {
# my $bm_last = get_actual_billing_mapping(schema => $schema, contract => $contract, now => $last_balance->start); #end); !?
# $last_profile = $bm_last->billing_mappings->first->billing_profile;
#}
#my $contract_create = NGCP::Panel::Utils::DateTime::set_local_tz($contract->create_timestamp // $contract->modify_timestamp);
$ratio = 1.0;
if ($last_balance->{_start} <= $contract_create && $last_balance->{_end} >= $contract_create) { #$last_balance->end is never +inf here
$ratio = get_free_ratio($contract_create,$last_balance->{_start},$last_balance->{_end});
}
my $old_free_cash = $ratio * ($last_balance->{_profile}->{interval_free_cash} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_CASH);
$cash_balance = $last_cash_balance;
if ($last_cash_balance_interval < $old_free_cash) {
$cash_balance += $last_cash_balance_interval - $old_free_cash;
}
#$ratio * $last_profile->interval_free_time // _DEFAULT_PROFILE_FREE_TIME
#} else {
# $c->log->debug('discarding contract ' . $contract->id . " cash balance (mode '$carry_over_mode'" . (defined $notopup_expiration ? ', notopup expiration ' . NGCP::Panel::Utils::DateTime::to_string($notopup_expiration) : '') . ')') if $c;
}
$ratio = 1.0;
} else {
$cash_balance = (defined $initial_balance ? $initial_balance : $NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::DEFAULT_INITIAL_BALANCE);
$ratio = get_free_ratio($contract_create,$balance->{_start},$balance->{_end});
}
my $free_cash = $ratio * ($balance->{_profile}->{interval_free_cash} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_CASH);
$cash_balance += $free_cash;
$cash_balance_interval = 0.0;
my $free_time = $ratio * ($balance->{_profile}->{interval_free_time} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_TIME);
$free_time_balance = $free_time;
$free_time_balance_interval = 0;
#$c->log->debug("ratio: $ratio, free cash: $free_cash, cash balance: $cash_balance, free time: $free_time, free time balance: $free_time_balance");
return {cash_balance => sprintf("%.4f",$cash_balance),
initial_cash_balance => sprintf("%.4f",$cash_balance),
cash_balance_interval => sprintf("%.4f",$cash_balance_interval),
free_time_balance => sprintf("%.0f",$free_time_balance),
initial_free_time_balance => sprintf("%.0f",$free_time_balance),
free_time_balance_interval => sprintf("%.0f",$free_time_balance_interval)};
}
sub get_free_ratio {
my ($contract_create,$stime,$etime) = @_;
if (!is_infinite_future($etime)) {
my $ctime = ($contract_create->clone->truncate(to => 'day') > $stime ? $contract_create->clone->truncate(to => 'day') : $contract_create);
my $start_of_next_interval = _add_second($etime->clone,1);
#$c->log->debug("ratio = " . ($start_of_next_interval->epoch - $ctime->epoch) . ' / ' . ($start_of_next_interval->epoch - $stime->epoch)) if $c;
return ($start_of_next_interval->epoch - $ctime->epoch) / ($start_of_next_interval->epoch - $stime->epoch);
}
return 1.0;
}
sub _add_second {
my ($dt,$skip_leap_seconds) = @_;
$dt->add(seconds => 1);
while ($skip_leap_seconds and $dt->second() >= 60) {
$dt->add(seconds => 1);
}
return $dt;
}
sub gettablename {
return $tablename;

@ -22,6 +22,9 @@ use NGCP::BulkProcessor::SqlProcessor qw(
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
@ -32,8 +35,12 @@ our @EXPORT_OK = qw(
countby_status_resellerid
findby_contactid
findby_id
forupdate_id
process_records
process_free_cash_contracts
countby_free_cash
$ACTIVE_STATE
$TERMINATED_STATE
@ -117,6 +124,24 @@ sub findby_id {
}
sub forupdate_id {
my ($xa_db,$id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ? FOR UPDATE';
my @params = ($id);
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub countby_status_resellerid {
my ($status,$reseller_id) = @_;
@ -150,6 +175,21 @@ sub countby_status_resellerid {
}
sub countby_free_cash {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(DISTINCT c.id) FROM ' . $table . ' AS c' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings::gettablename()) . ' AS bm ON bm.contract_id = c.id' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::gettablename()) . ' AS bp ON bp.id = bm.billing_profile_id' .
' WHERE c.status != "terminated" AND bp.interval_free_cash <> 0.0';
return $db->db_get_value($stmt);
}
sub insert_row {
my $db = &$get_db();
@ -222,6 +262,49 @@ sub process_records {
);
}
sub process_free_cash_contracts {
my %params = @_;
my ($process_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads) = @params{qw/
process_code
static_context
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
/};
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'FROM ' . $table . ' AS c' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings::gettablename()) . ' AS bm ON bm.contract_id = c.id' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::gettablename()) . ' AS bp ON bp.id = bm.billing_profile_id' .
' WHERE c.status != "terminated" AND bp.interval_free_cash <> 0.0';
return process_table(
get_db => $get_db,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,$rowblock,$row_offset);
},
static_context => $static_context,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_dbs_code => \&destroy_dbs,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
select => $db->paginate_sort_query("SELECT DISTINCT c.id " . $stmt,undef,undef,[{ column => 'c.id', numeric => 1, dir => 1 }]),
selectcount => "SELECT COUNT(DISTINCT c.id) " . $stmt,
);
}
sub buildrecords_fromrows {

@ -0,0 +1,144 @@
package NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
findby_id
findall
$CARRY_OVER_MODE
$CARRY_OVER_TIMELY_MODE
$DISCARD_MODE
$DEFAULT_CARRY_OVER_MODE
$DEFAULT_INITIAL_BALANCE
);
my $tablename = 'profile_packages';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'reseller_id',
'name',
'description',
'initial_balance',
'service_charge',
'balance_interval_unit',
'balance_interval_value',
'balance_interval_start_mode',
'carry_over_mode',
'timely_duration_unit',
'timely_duration_value',
'notopup_discard_intervals',
'underrun_lock_threshold',
'underrun_lock_level',
'underrun_profile_threshold',
'topup_lock_level',
];
my $indexes = {};
our $CARRY_OVER_MODE = 'carry_over';
our $CARRY_OVER_TIMELY_MODE = 'carry_over_timely';
our $DISCARD_MODE = 'discard';
our $DEFAULT_CARRY_OVER_MODE = $CARRY_OVER_MODE;
our $DEFAULT_INITIAL_BALANCE = 0.0;
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub findall {
my ($load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my $rows = $db->db_get_all_arrayref($stmt);
return buildrecords_fromrows($rows,$load_recursive);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,232 @@
package NGCP::BulkProcessor::Dao::Trunk::billing::topup_log;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
insert_record
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
findby_contractidfromto
findby_contractbalanceid
findby_id
$OK_OUTCOME
$FAILED_OUTCOME
$VOUCHER_TYPE
$CASH_TYPE
$SET_BALANCE_TYPE
);
my $tablename = 'topup_log';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'username',
'timestamp',
'type',
'outcome',
'message',
'subscriber_id',
'contract_id',
'amount',
'voucher_id',
'cash_balance_before',
'cash_balance_after',
'package_before_id',
'package_after_id',
'profile_before_id',
'profile_after_id',
'lock_level_before',
'lock_level_after',
'contract_balance_before_id',
'contract_balance_after_id',
'request_token',
];
my $indexes = {};
my $insert_unique_fields = [];
our $OK_OUTCOME = 'ok';
our $FAILED_OUTCOME = 'failed';
our $VOUCHER_TYPE = 'voucher';
our $CASH_TYPE = 'cash';
our $SET_BALANCE_TYPE = 'set_balance';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_contractidfromto {
my ($contract_id,$from,$to,$outcome,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
if ($from) {
$stmt .= ' AND ' . $db->columnidentifier('timestamp') . ' >= ?';
push(@params,$from->epoch());
}
if ($to) {
$stmt .= ' AND ' . $db->columnidentifier('timestamp') . ' <= ?';
push(@params,$to->epoch());
}
if ($outcome) {
$stmt .= ' AND ' . $db->columnidentifier('outcome') . ' = ?';
push(@params,$outcome);
}
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_contractbalanceid {
my ($id,$outcome,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_balance_before_id') . ' = ?';
my @params = ($id);
if ($outcome) {
$stmt .= ' AND ' . $db->columnidentifier('outcome') . ' = ?';
push(@params,$outcome);
}
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($contract_id) = @params{qw/
contract_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('timestamp') . ', ' .
$db->columnidentifier('type') . ', ' .
$db->columnidentifier('outcome') .', ' .
$db->columnidentifier('contract_id') . ') VALUES (' .
'UNIX_TIMESTAMP(NOW()), ' .
'\'' . $SET_BALANCE_TYPE . '\', ' .
'\'' . $FAILED_OUTCOME . '\', ' .
'?)',
$contract_id
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -37,6 +37,7 @@ our @EXPORT_OK = qw(
process_records
find_minmaxid
find_random
findby_contractid_states
$TERMINATED_STATE
$ACTIVE_STATE
@ -105,6 +106,34 @@ sub findby_domainid_username_states {
}
sub findby_contractid_states {
my ($xa_db,$contract_id,$states,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
if (defined $states and 'HASH' eq ref $states) {
foreach my $in (keys %$states) {
my @values = (defined $states->{$in} and 'ARRAY' eq ref $states->{$in} ? @{$states->{$in}} : ($states->{$in}));
$stmt .= ' AND ' . $db->columnidentifier('status') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
push(@params,@values);
}
} elsif (defined $states and length($states) > 0) {
$stmt .= ' AND ' . $db->columnidentifier('status') . ' = ?';
push(@params,$states);
}
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub find_minmaxid {
my ($xa_db,$states,$reseller_id) = @_;

@ -29,7 +29,8 @@ our @EXPORT_OK = qw(
check_table
insert_row
findby_contractid
sort_by_end
sort_by_end_desc
sort_by_end_asc
);
my $tablename = 'contract_balances';
@ -163,10 +164,14 @@ sub buildrecords_fromrows {
}
sub sort_by_end ($$) {
sub sort_by_end_asc ($$) {
return _sort_by_date('_end',0,@_);
}
sub sort_by_end_desc ($$) {
return _sort_by_date('_end',1,@_);
}
sub _sort_by_date {
my ($ts_field,$desc,$a,$b) = @_;
if ($desc) {

@ -0,0 +1,168 @@
package NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
findby_contractid_ts
);
my $tablename = 'billing_mappings';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'start_date',
'end_date',
'billing_profile_id',
'contract_id',
'product_id',
'network_id',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_contractid_ts {
my ($xa_db,$contract_id,$dt,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
if (defined $dt) {
$stmt .= ' AND (' . $db->columnidentifier('start_date') . ' IS NULL OR ' . $db->columnidentifier('start_date') . ' <= ? ) ' .
'AND (' . $db->columnidentifier('end_date') . ' IS NULL OR ' . $db->columnidentifier('end_date') . ' >= ? ) ' .
'ORDER BY ' . $db->columnidentifier('start_date') . ' DESC, ' . $db->columnidentifier('id') . ' DESC LIMIT 1';
push(@params, $db->datetime_to_string($dt) );
push(@params, $db->datetime_to_string($dt) );
}
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($billing_profile_id,
$contract_id,
$product_id) = @params{qw/
billing_profile_id
contract_id
product_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('billing_profile_id') . ', ' .
$db->columnidentifier('contract_id') . ', ' .
$db->columnidentifier('end_date') . ', ' .
$db->columnidentifier('network_id') . ', ' .
$db->columnidentifier('product_id') . ', ' .
$db->columnidentifier('start_date') . ') VALUES (' .
'?, ' .
'?, ' .
'NULL, ' .
'NULL, ' .
'?, ' .
'NULL)',
$billing_profile_id,
$contract_id,
$product_id,
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,176 @@
package NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
findby_id
findall
findby_resellerid_name_handle
$DEFAULT_PROFILE_FREE_CASH
$DEFAULT_PROFILE_FREE_TIME
);
my $tablename = 'billing_profiles';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'reseller_id',
'handle',
'name',
'prepaid',
'interval_charge',
'interval_free_time',
'interval_free_cash',
'interval_unit',
'interval_count',
'fraud_interval_limit',
'fraud_interval_lock',
'fraud_interval_notify',
'fraud_daily_limit',
'fraud_daily_lock',
'fraud_daily_notify',
'fraud_use_reseller_rates',
'currency',
'status',
'modify_timestamp',
'create_timestamp',
'terminate_timestamp',
];
my $indexes = {};
our $DEFAULT_PROFILE_FREE_CASH = 0.0;
our $DEFAULT_PROFILE_FREE_TIME = 0;
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub findall {
my ($load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my $rows = $db->db_get_all_arrayref($stmt);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_resellerid_name_handle {
my ($reseller_id,$name,$handle,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my @params = ();
my @terms = ();
if ($reseller_id) {
push(@terms,$db->columnidentifier('reseller_id') . ' = ?');
push(@params,$reseller_id);
}
if ($name) {
push(@terms,$db->columnidentifier('name') . ' = ?');
push(@params,$name);
}
if ($handle) {
push(@terms,$db->columnidentifier('handle') . ' = ?');
push(@params,$handle);
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,324 @@
package NGCP::BulkProcessor::Dao::mr457::billing::contract_balances;
use strict;
## no critic
use DateTime qw();
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
update_record
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Calendar qw(is_infinite_future infinite_future set_timezone);
use NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles qw();
use NGCP::BulkProcessor::Dao::mr457::billing::profile_packages qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
update_row
findby_contractid
sort_by_end_desc
sort_by_end_asc
get_new_balance_values
get_free_ratio
);
my $tablename = 'contract_balances';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'contract_id',
'cash_balance',
'cash_balance_interval',
'free_time_balance',
'free_time_balance_interval',
'topup_count',
'timely_topup_count',
'start',
'end',
'invoice_id',
'underrun_profiles',
'underrun_lock',
#'initial_cash_balance',
#'initial_free_time_balance',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_contractid {
my ($xa_db,$contract_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($contract_id) = @params{qw/
contract_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('cash_balance') . ', ' .
$db->columnidentifier('cash_balance_interval') . ', ' .
$db->columnidentifier('contract_id') . ', ' .
$db->columnidentifier('end') . ', ' .
$db->columnidentifier('free_time_balance') . ', ' .
$db->columnidentifier('free_time_balance_interval') . ', ' .
$db->columnidentifier('start') . ', ' .
$db->columnidentifier('underrun_lock') . ', ' .
$db->columnidentifier('underrun_profiles') . ') VALUES (' .
'0.0, ' .
'0.0, ' .
'?, ' .
'CONCAT(LAST_DAY(NOW()),\' 23:59:59\'), ' .
'0, ' .
'0, ' .
'CONCAT(SUBDATE(CURDATE(),(DAY(CURDATE())-1)),\' 00:00:00\'), ' .
'NULL, ' .
'NULL)',
$contract_id,
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub update_row {
my ($xa_db,$data) = @_;
check_table();
return update_record($get_db,$xa_db,__PACKAGE__,$data);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
my $db = &$get_db();
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
my $end = $db->datetime_from_string($record->{end},undef);
if (is_infinite_future($end)) {
$record->{_end} = infinite_future();
} else {
$record->{_end} = set_timezone($end);
}
$record->{_start} = $db->datetime_from_string($record->{start},'local');
push @records,$record;
}
}
return \@records;
}
sub sort_by_end_asc ($$) {
return _sort_by_date('_end',0,@_);
}
sub sort_by_end_desc ($$) {
return _sort_by_date('_end',1,@_);
}
sub _sort_by_date {
my ($ts_field,$desc,$a,$b) = @_;
if ($desc) {
$desc = -1;
} else {
$desc = 1;
}
#use Data::Dumper;
#print Dumper($a);
#print Dumper($b);
my $a_inf = is_infinite_future($a->{$ts_field});
my $b_inf = is_infinite_future($b->{$ts_field});
if ($a_inf and $b_inf) {
return 0;
} elsif ($a_inf) {
return 1 * $desc;
} elsif ($b_inf) {
return -1 * $desc;
} else {
return DateTime->compare($a->{$ts_field}, $b->{$ts_field}) * $desc;
}
}
sub get_new_balance_values {
my %params = @_;
my ($contract_create,
$last_balance,
$balance,
$initial_balance,
$carry_over_mode,
$notopup_expiration,
$last_cash_balance,
$last_cash_balance_interval) = @params{qw/
contract_create
last_balance
balance
initial_balance
carry_over_mode
notopup_expiration
last_cash_balance
last_cash_balance_interval
/};
my ($cash_balance, $cash_balance_interval, $free_time_balance, $free_time_balance_interval) = (0.0,0.0,0,0);
$carry_over_mode //= $NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::DEFAULT_CARRY_OVER_MODE;
my $ratio;
if ($last_balance) {
$last_cash_balance //= $last_balance->{cash_balance};
$last_cash_balance_interval //= $last_balance->{cash_balance_interval};
if (($NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::CARRY_OVER_MODE eq $carry_over_mode
|| ($NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::CARRY_OVER_TIMELY_MODE eq $carry_over_mode && $last_balance->{timely_topup_count} > 0)
) && (!defined $notopup_expiration || $balance->{_start} < $notopup_expiration)) {
#if (!defined $last_profile) {
# my $bm_last = get_actual_billing_mapping(schema => $schema, contract => $contract, now => $last_balance->start); #end); !?
# $last_profile = $bm_last->billing_mappings->first->billing_profile;
#}
#my $contract_create = NGCP::Panel::Utils::DateTime::set_local_tz($contract->create_timestamp // $contract->modify_timestamp);
$ratio = 1.0;
if ($last_balance->{_start} <= $contract_create && $last_balance->{_end} >= $contract_create) { #$last_balance->end is never +inf here
$ratio = get_free_ratio($contract_create,$last_balance->{_start},$last_balance->{_end});
}
my $old_free_cash = $ratio * ($last_balance->{_profile}->{interval_free_cash} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_CASH);
$cash_balance = $last_cash_balance;
if ($last_cash_balance_interval < $old_free_cash) {
$cash_balance += $last_cash_balance_interval - $old_free_cash;
}
#$ratio * $last_profile->interval_free_time // _DEFAULT_PROFILE_FREE_TIME
#} else {
# $c->log->debug('discarding contract ' . $contract->id . " cash balance (mode '$carry_over_mode'" . (defined $notopup_expiration ? ', notopup expiration ' . NGCP::Panel::Utils::DateTime::to_string($notopup_expiration) : '') . ')') if $c;
}
$ratio = 1.0;
} else {
$cash_balance = (defined $initial_balance ? $initial_balance : $NGCP::BulkProcessor::Dao::Trunk::billing::profile_packages::DEFAULT_INITIAL_BALANCE);
$ratio = get_free_ratio($contract_create,$balance->{_start},$balance->{_end});
}
my $free_cash = $ratio * ($balance->{_profile}->{interval_free_cash} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_CASH);
$cash_balance += $free_cash;
$cash_balance_interval = 0.0;
my $free_time = $ratio * ($balance->{_profile}->{interval_free_time} // $NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::DEFAULT_PROFILE_FREE_TIME);
$free_time_balance = $free_time;
$free_time_balance_interval = 0;
#$c->log->debug("ratio: $ratio, free cash: $free_cash, cash balance: $cash_balance, free time: $free_time, free time balance: $free_time_balance");
return {cash_balance => sprintf("%.4f",$cash_balance),
initial_cash_balance => sprintf("%.4f",$cash_balance),
cash_balance_interval => sprintf("%.4f",$cash_balance_interval),
free_time_balance => sprintf("%.0f",$free_time_balance),
initial_free_time_balance => sprintf("%.0f",$free_time_balance),
free_time_balance_interval => sprintf("%.0f",$free_time_balance_interval)};
}
sub get_free_ratio {
my ($contract_create,$stime,$etime) = @_;
if (!is_infinite_future($etime)) {
my $ctime = ($contract_create->clone->truncate(to => 'day') > $stime ? $contract_create->clone->truncate(to => 'day') : $contract_create);
my $start_of_next_interval = _add_second($etime->clone,1);
#$c->log->debug("ratio = " . ($start_of_next_interval->epoch - $ctime->epoch) . ' / ' . ($start_of_next_interval->epoch - $stime->epoch)) if $c;
return ($start_of_next_interval->epoch - $ctime->epoch) / ($start_of_next_interval->epoch - $stime->epoch);
}
return 1.0;
}
sub _add_second {
my ($dt,$skip_leap_seconds) = @_;
$dt->add(seconds => 1);
while ($skip_leap_seconds and $dt->second() >= 60) {
$dt->add(seconds => 1);
}
return $dt;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,345 @@
package NGCP::BulkProcessor::Dao::mr457::billing::contracts;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
destroy_dbs
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
copy_row
process_table
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings qw();
use NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
countby_status_resellerid
findby_contactid
findby_id
forupdate_id
process_records
process_free_cash_contracts
countby_free_cash
$ACTIVE_STATE
$TERMINATED_STATE
);
my $tablename = 'contracts';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'customer_id',
'contact_id',
'order_id',
'profile_package_id',
'status',
'external_id',
'modify_timestamp',
'create_timestamp',
'activate_timestamp',
'terminate_timestamp',
'max_subscribers',
'send_invoice',
'subscriber_email_template_id',
'passreset_email_template_id',
'invoice_email_template_id',
'invoice_template_id',
'vat_rate',
'add_vat',
];
my $indexes = {};
my $insert_unique_fields = [];
our $ACTIVE_STATE = 'active';
our $TERMINATED_STATE = 'terminated';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_contactid {
my ($contact_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contact_id') . ' = ?';
my @params = ($contact_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub forupdate_id {
my ($xa_db,$id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ? FOR UPDATE';
my @params = ($id);
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub countby_status_resellerid {
my ($status,$reseller_id) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' AS contract' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::mr457::billing::contacts::gettablename()) . ' AS contact ON contract.contact_id = contact.id';
my @params = ();
my @terms = ();
if ($status) {
push(@terms,'contract.status = ?');
push(@params,$status);
}
if ($reseller_id) {
if ('ARRAY' eq ref $reseller_id) {
push(@terms,'contact.reseller_id IN (' . substr(',?' x scalar @$reseller_id,1) . ')');
push(@params,@$reseller_id);
} else {
push(@terms,'contact.reseller_id = ?');
push(@params,$reseller_id);
}
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
return $db->db_get_value($stmt,@params);
}
sub countby_free_cash {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(DISTINCT c.id) FROM ' . $table . ' AS c' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings::gettablename()) . ' AS bm ON bm.contract_id = c.id' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles::gettablename()) . ' AS bp ON bp.id = bm.billing_profile_id' .
' WHERE c.status != "terminated" AND bp.interval_free_cash <> 0.0';
return $db->db_get_value($stmt);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($contact_id) = @params{qw/
contact_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('contact_id') . ', ' .
$db->columnidentifier('create_timestamp') . ', ' .
$db->columnidentifier('modify_timestamp') . ', ' .
$db->columnidentifier('status') . ') VALUES (' .
'?, ' .
'NOW(), ' .
'NOW(), ' .
'\'' . $ACTIVE_STATE . '\')',
$contact_id,
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub process_records {
my %params = @_;
my ($process_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads,
$load_recursive) = @params{qw/
process_code
static_context
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
load_recursive
/};
check_table();
return process_table(
get_db => $get_db,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset);
},
static_context => $static_context,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_dbs_code => \&destroy_dbs,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
);
}
sub process_free_cash_contracts {
my %params = @_;
my ($process_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads) = @params{qw/
process_code
static_context
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
/};
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'FROM ' . $table . ' AS c' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings::gettablename()) . ' AS bm ON bm.contract_id = c.id' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles::gettablename()) . ' AS bp ON bp.id = bm.billing_profile_id' .
' WHERE c.status != "terminated" AND bp.interval_free_cash <> 0.0';
return process_table(
get_db => $get_db,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,$rowblock,$row_offset);
},
static_context => $static_context,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_dbs_code => \&destroy_dbs,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
select => $db->paginate_sort_query("SELECT DISTINCT c.id " . $stmt,undef,undef,[{ column => 'c.id', numeric => 1, dir => 1 }]),
selectcount => "SELECT COUNT(DISTINCT c.id) " . $stmt,
);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,144 @@
package NGCP::BulkProcessor::Dao::mr457::billing::profile_packages;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
findby_id
findall
$CARRY_OVER_MODE
$CARRY_OVER_TIMELY_MODE
$DISCARD_MODE
$DEFAULT_CARRY_OVER_MODE
$DEFAULT_INITIAL_BALANCE
);
my $tablename = 'profile_packages';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'reseller_id',
'name',
'description',
'initial_balance',
'service_charge',
'balance_interval_unit',
'balance_interval_value',
'balance_interval_start_mode',
'carry_over_mode',
'timely_duration_unit',
'timely_duration_value',
'notopup_discard_intervals',
'underrun_lock_threshold',
'underrun_lock_level',
'underrun_profile_threshold',
'topup_lock_level',
];
my $indexes = {};
our $CARRY_OVER_MODE = 'carry_over';
our $CARRY_OVER_TIMELY_MODE = 'carry_over_timely';
our $DISCARD_MODE = 'discard';
our $DEFAULT_CARRY_OVER_MODE = $CARRY_OVER_MODE;
our $DEFAULT_INITIAL_BALANCE = 0.0;
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub findall {
my ($load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table;
my $rows = $db->db_get_all_arrayref($stmt);
return buildrecords_fromrows($rows,$load_recursive);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,232 @@
package NGCP::BulkProcessor::Dao::mr457::billing::topup_log;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
insert_record
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
findby_contractidfromto
findby_contractbalanceid
findby_id
$OK_OUTCOME
$FAILED_OUTCOME
$VOUCHER_TYPE
$CASH_TYPE
);
my $tablename = 'topup_log';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'username',
'timestamp',
'type',
'outcome',
'message',
'subscriber_id',
'contract_id',
'amount',
'voucher_id',
'cash_balance_before',
'cash_balance_after',
'package_before_id',
'package_after_id',
'profile_before_id',
'profile_after_id',
'lock_level_before',
'lock_level_after',
'contract_balance_before_id',
'contract_balance_after_id',
'request_token',
];
my $indexes = {};
my $insert_unique_fields = [];
our $OK_OUTCOME = 'ok';
our $FAILED_OUTCOME = 'failed';
our $VOUCHER_TYPE = 'voucher';
our $CASH_TYPE = 'cash';
#our $SET_BALANCE_TYPE = 'set_balance';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_contractidfromto {
my ($contract_id,$from,$to,$outcome,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
if ($from) {
$stmt .= ' AND ' . $db->columnidentifier('timestamp') . ' >= ?';
push(@params,$from->epoch());
}
if ($to) {
$stmt .= ' AND ' . $db->columnidentifier('timestamp') . ' <= ?';
push(@params,$to->epoch());
}
if ($outcome) {
$stmt .= ' AND ' . $db->columnidentifier('outcome') . ' = ?';
push(@params,$outcome);
}
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_contractbalanceid {
my ($id,$outcome,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_balance_before_id') . ' = ?';
my @params = ($id);
if ($outcome) {
$stmt .= ' AND ' . $db->columnidentifier('outcome') . ' = ?';
push(@params,$outcome);
}
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($contract_id) = @params{qw/
contract_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('timestamp') . ', ' .
$db->columnidentifier('type') . ', ' .
$db->columnidentifier('outcome') .', ' .
$db->columnidentifier('contract_id') . ') VALUES (' .
'UNIX_TIMESTAMP(NOW()), ' .
'\'' . $CASH_TYPE . '\', ' .
'\'' . $FAILED_OUTCOME . '\', ' .
'?)',
$contract_id
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -8,21 +8,34 @@ no strict 'refs';
use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw();
use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw();
#use NGCP::BulkProcessor::RestRequests::mr38::Contracts qw();
#use NGCP::BulkProcessor::RestRequests::mr38::Customers qw();
#use NGCP::BulkProcessor::RestRequests::mr38::BillingProfiles qw();
use NGCP::BulkProcessor::Dao::mr457::billing::contracts qw();
use NGCP::BulkProcessor::Dao::mr457::billing::contract_balances qw();
use NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings qw();
use NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles qw();
use NGCP::BulkProcessor::Dao::mr457::billing::topup_log qw();
use NGCP::BulkProcessor::Dao::mr457::billing::profile_packages qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
#use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw();
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
check_billing_db_tables
check_fix_contract_balance_gaps_tables
check_fix_free_cash_tables
);
#check_rest_get_items
my $NOK = 'NOK';
my $OK = 'ok';
sub check_billing_db_tables {
sub check_fix_contract_balance_gaps_tables {
my ($messages) = @_;
@ -38,10 +51,53 @@ sub check_billing_db_tables {
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr38::billing::contract_balances');
$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr38::billing::lnp_providers');
#if (not $check_result) {
# ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers');
#}
return $result;
}
sub check_fix_free_cash_tables {
my ($messages) = @_;
my $result = 1;
my $check_result;
my $message;
my $message_prefix = 'NGCP billing db tables - ';
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::contracts');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::contract_balances');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::topup_log');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr457::billing::profile_packages');
$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::domains');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::resellers');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::contracts');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::contacts');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::accounting::cdr');
#$result &= $check_result; push(@$messages,$message);
return $result;

@ -7,12 +7,22 @@ use threads::shared qw();
#use List::Util qw();
use DateTime qw();
use NGCP::BulkProcessor::Globals qw(
$system_abbreviation
);
use NGCP::BulkProcessor::Projects::Disaster::Balances::Settings qw(
$dry
$skip_errors
$fix_contract_balance_gaps_multithreading
$fix_contract_balance_gaps_numofthreads
$fix_free_cash_multithreading
$fix_free_cash_numofthreads
$write_topup_log
$apply_negative_delta
);
#$set_preference_bulk_numofthreads
@ -30,7 +40,21 @@ use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw();
use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw();
use NGCP::BulkProcessor::Dao::mr38::billing::billing_mappings qw();
use NGCP::BulkProcessor::Dao::mr457::billing::contracts qw();
use NGCP::BulkProcessor::Dao::mr457::billing::contract_balances qw();
use NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings qw();
use NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles qw();
use NGCP::BulkProcessor::Dao::mr457::billing::topup_log qw();
use NGCP::BulkProcessor::Dao::mr457::billing::profile_packages qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
#use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw();
use NGCP::BulkProcessor::ConnectorPool qw(
get_xa_db
@ -41,13 +65,19 @@ use NGCP::BulkProcessor::ConnectorPool qw(
);
use NGCP::BulkProcessor::Utils qw(threadid);
use NGCP::BulkProcessor::Array qw(array_to_map);
use NGCP::BulkProcessor::Calendar qw(current_local is_infinite_future);
use NGCP::BulkProcessor::SqlConnectors::MySQLDB qw();
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
fix_contract_balance_gaps
fix_free_cash
);
my $create_sample_cdr = 0;
sub fix_contract_balance_gaps {
my $static_context = {};
@ -92,20 +122,13 @@ sub fix_contract_balance_gaps {
),$warning_count);
}
sub _check_insert_tables {
#NGCP::BulkProcessor::Dao::mr38::provisioning::voip_usr_preferences::check_table();
}
sub _fix_contract_balance_gaps {
my ($context) = @_;
eval {
$context->{db}->db_begin();
my $last_balance = undef;
foreach my $contract_balance (sort NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::sort_by_end @{$context->{contract_balances}}) {
foreach my $contract_balance (sort NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::sort_by_end_asc @{$context->{contract_balances}}) {
#print " " . $contract_balance->{id} . " " . $contract_balance->{_start} . ' ' . $contract_balance->{_end} . "\n";
if (defined $last_balance) {
my $gap_start = $last_balance->{_end}->clone->add(seconds => 1);
@ -193,7 +216,7 @@ sub _insert_contract_balances {
sub _fix_contract_balance_gaps_checks {
my ($context) = @_;
my $result = _checks($context);
my $result = 1;
return $result;
}
@ -202,114 +225,464 @@ sub _reset_fix_contract_balance_gaps_context {
my ($context,$contract,$rownum) = @_;
my $result = _reset_context($context,$contract,$rownum);
my $result = 1;
$context->{contract_balances} = NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::findby_contractid($context->{db},$context->{contract}->{id});
$context->{rownum} = $rownum;
#$context->{barring_profile} = $imported_subscriber->{barring_profile};
#$context->{ncos_level} = $context->{ncos_level_map}->{$context->{barring_profile}};
$context->{contract} = $contract;
#delete $context->{adm_ncos_id_preference_id};
$context->{contract_balances} = NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::findby_contractid($context->{db},$context->{contract}->{id});
return $result;
}
sub _checks {
sub fix_free_cash {
my $static_context = {};
my $result = _fix_free_cash_checks($static_context);
destroy_dbs();
my $warning_count :shared = 0;
return ($result && NGCP::BulkProcessor::Dao::mr457::billing::contracts::process_free_cash_contracts(
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
my $rownum = $row_offset;
foreach my $row (@$records) {
$rownum++;
next unless _reset_fix_free_cash_context($context,$row->[0],$rownum);
_fix_free_cash($context);
}
#return 0;
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
# below is not mandatory..
_check_insert_tables();
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
load_recursive => 0,
multithreading => $fix_free_cash_multithreading,
numofthreads => $fix_free_cash_numofthreads,
),$warning_count);
}
sub _fix_free_cash {
my ($context) = @_;
my $result = 1;
#my $optioncount = 0;
#eval {
# $optioncount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::FeatureOption::countby_subscribernumber_option();
#};
#if ($@ or $optioncount == 0) {
# rowprocessingerror(threadid(),'please import subscriber features first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
#my $userpasswordcount = 0;
#eval {
# $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::countby_fqdn();
#};
#if ($@ or $userpasswordcount == 0) {
# rowprocessingerror(threadid(),'please import user passwords first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
#my $subscribercount = 0;
#my $subscriber_barring_profiles = [];
#eval {
# $subscribercount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::countby_subscribernumber();
# $subscriber_barring_profiles = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::list_barringprofiles();
#};
#if ($@ or $subscribercount == 0) {
# rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
eval {
return $result;
$context->{db}->set_transaction_isolation($NGCP::BulkProcessor::SqlConnectors::MySQLDB::READ_COMMITTED);
$context->{db}->db_begin();
my $contract_id = $context->{contract}->{id};
my $contract = NGCP::BulkProcessor::Dao::mr457::billing::contracts::forupdate_id($context->{db},$contract_id);
$context->{contract} = $contract;
my $actual_profile = $context->{actual_profile};
my @balances = sort NGCP::BulkProcessor::Dao::mr457::billing::contract_balances::sort_by_end_desc
@{NGCP::BulkProcessor::Dao::mr457::billing::contract_balances::findby_contractid($context->{db},$contract_id)};
my $balance = $balances[0];
my $mapping = NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings::findby_contractid_ts($context->{db},
$contract_id,$balance->{_start})->[0];
if ($mapping) {
my $billing_profile = $context->{billing_profile_map}->{$mapping->{billing_profile_id}};
my $free_cash = $billing_profile->{interval_free_cash} // 0.0;
my $next_free_cash = $actual_profile->{interval_free_cash} // 0.0;
my $free_cash_carry_over = $next_free_cash;
if ($balance->{cash_balance_interval} < $free_cash) {
$free_cash_carry_over += $balance->{cash_balance_interval} - $free_cash;
}
my $delta = $next_free_cash - ($balance->{cash_balance} + $free_cash_carry_over);
if ($delta > 0.0 or ($delta < 0.0 and $apply_negative_delta)) {
NGCP::BulkProcessor::Dao::mr457::billing::contract_balances::update_row($context->{db},{
id => $balance->{id},
cash_balance => ($balance->{cash_balance} + $delta),
});
NGCP::BulkProcessor::Dao::mr457::billing::topup_log::insert_row($context->{db},{
timestamp => $context->{now}->epoch(),
type => $NGCP::BulkProcessor::Dao::mr457::billing::topup_log::CASH_TYPE, #SET_BALANCE_TYPE,
outcome => $NGCP::BulkProcessor::Dao::mr457::billing::topup_log::OK_OUTCOME,
contract_id => $contract_id,
amount => $delta,
cash_balance_before => $balance->{cash_balance},
cash_balance_after => ($balance->{cash_balance} + $delta),
package_before_id => $contract->{profile_package_id},
package_after_id => $contract->{profile_package_id},
profile_before_id => $context->{actual_profile}->{id},
profile_after_id => $context->{actual_profile}->{id},
contract_balance_before_id => $balance->{id},
contract_balance_after_id => $balance->{id},
request_token => $system_abbreviation,
}) if $write_topup_log;
_info($context,"($context->{rownum}) " . "contract id $contract_id cash balance is $balance->{cash_balance} cents, adding $delta cents to match free cash of $context->{actual_profile}->{interval_free_cash} cents ($context->{actual_profile}->{name}) for next interval"
. (is_infinite_future($balance->{_end}) ? '' : ' ' . $balance->{_end}->clone->add(seconds => 1)));
if ($create_sample_cdr and not is_infinite_future($balance->{_end}) and _generate_cdr_init_context($context,$balance->{_end}->clone->add(seconds => 1))) {
NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::insert_row($context->{db},$context->{cdr});
}
} else {
_info($context,"($context->{rownum}) " . "contract id $contract_id cash balance is $balance->{cash_balance} cents, SKIP adding $delta cents to match free cash of $context->{actual_profile}->{interval_free_cash} cents ($context->{actual_profile}->{name}) for next interval"
. (is_infinite_future($balance->{_end}) ? '' : ' ' . $balance->{_end}->clone->add(seconds => 1)));
}
} else {
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . "no billing mapping at $balance->{_start} for contract id $contract_id");
} else {
_error($context,"($context->{rownum}) " . "no billing mapping at $balance->{_start} for contract id $contract_id");
}
}
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
}
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
};
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . 'database error with contract id ' . $context->{contract}->{id} . ': ' . $err);
} else {
_error($context,"($context->{rownum}) " . 'database error with contract id ' . $context->{contract}->{id} . ': ' . $err);
}
}
}
sub _reset_context {
my ($context,$contract,$rownum) = @_;
sub _fix_free_cash_checks {
my ($context) = @_;
my $result = 1;
$context->{now} = current_local();
my $profile_count = 0;
eval {
($context->{billing_profile_map},my $ids,my $profiles) = array_to_map(NGCP::BulkProcessor::Dao::mr457::billing::billing_profiles::findall(),
sub { return shift->{id}; }, sub { return shift; }, 'first' );
$profile_count = (scalar keys %{$context->{billing_profile_map}});
};
if ($@ or $profile_count == 0) {
_error($context,"cannot find any billing profiles");
$result = 0; #even in skip-error mode..
} else {
_info($context,"$profile_count billing profiles cached");
}
my $package_count = 0;
eval {
($context->{profile_package_map},my $ids,my $packages) = array_to_map(NGCP::BulkProcessor::Dao::mr457::billing::profile_packages::findall(),
sub { return shift->{id}; }, sub { return shift; }, 'first' );
$package_count = (scalar keys %{$context->{profile_package_map}});
};
if ($@) {
_error($context,"cannot find profile packages");
$result = 0; #even in skip-error mode..
} else {
_info($context,"$package_count profile packages cached");
}
if ($create_sample_cdr) {
my $domain_count = 0;
eval {
($context->{domain_map},my $ids,my $domains) = array_to_map(NGCP::BulkProcessor::Dao::Trunk::billing::domains::findall(),
sub { return shift->{id}; }, sub { return shift; }, 'first' );
$domain_count = (scalar keys %{$context->{domain_map}});
};
if ($@ or $domain_count == 0) {
_error($context,"cannot find any domains");
$result = 0; #even in skip-error mode..
} else {
_info($context,"$domain_count domains cached");
}
my $reseller_count = 0;
eval {
($context->{reseller_map},my $ids,my $resellers) = array_to_map(NGCP::BulkProcessor::Dao::Trunk::billing::resellers::findall(),
sub { return shift->{id}; }, sub { return shift; }, 'first' );
$reseller_count = (scalar keys %{$context->{reseller_map}});
};
if ($@ or $reseller_count == 0) {
_error($context,"cannot find any resellers");
$result = 0; #even in skip-error mode..
} else {
_info($context,"$reseller_count resellers cached");
}
my $active_count = 0;
eval {
$active_count = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(
$NGCP::BulkProcessor::Dao::Trunk::billing::contracts::ACTIVE_STATE,
undef
);
($context->{min_id},$context->{max_id}) = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::find_minmaxid(undef,
{ 'IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::ACTIVE_STATE },
undef
);
};
if ($@ or $active_count == 0) {
_error($context,"cannot find active subscribers");
$result = 0; #even in skip-error mode..
} else {
_info($context,"$active_count active subscribers found");
}
}
my $contract_free_cash_count = 0;
eval {
$contract_free_cash_count = NGCP::BulkProcessor::Dao::mr457::billing::contracts::countby_free_cash();
};
if ($@ or $contract_free_cash_count == 0) {
rowprocessingerror(threadid(),'no contracts with free cash billing profiles',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
return $result;
}
sub _reset_fix_free_cash_context {
my ($context,$contract_id,$rownum) = @_;
my $result = 0;
$context->{rownum} = $rownum;
my $contract = NGCP::BulkProcessor::Dao::mr457::billing::contracts::findby_id($contract_id);
$context->{contract} = $contract;
#$context->{cli} = $imported_subscriber->subscribernumber();
#$context->{e164} = {};
#$context->{e164}->{cc} = substr($context->{cli},0,3);
#$context->{e164}->{ac} = '';
#$context->{e164}->{sn} = substr($context->{cli},3);
#$context->{subscriberdelta} = $imported_subscriber->{delta};
#my $userpassword = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::findby_fqdn($context->{cli});
#if (defined $userpassword) {
# $context->{username} = (defined $subsciber_username_prefix ? $subsciber_username_prefix : '') . $userpassword->{username};
# $context->{password} = $userpassword->{password};
# $context->{userpassworddelta} = $userpassword->{delta};
#} else {
# # once full username+passwords is available:
# delete $context->{username};
# delete $context->{password};
# delete $context->{userpassworddelta};
# if ($context->{subscriberdelta} eq
# $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::deleted_delta) {
#
# } else {
# $result &= 0;
#
# # for now, as username+passwords are incomplete:
# #$context->{username} = $context->{e164}->{sn};
# #$context->{password} = $context->{username};
# #$context->{userpassworddelta} = $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::updated_delta;
#
# if ($skip_errors) {
# # for now, as username+passwords are incomplete:
# _warn($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli});
# } else {
# _error($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli});
# }
# }
#}
#
#delete $context->{billing_voip_subscriber};
#delete $context->{provisioning_voip_subscriber};
# the goal is to adjust the latest contract_balances.cash_balance, so the
# next contract_balance will start with the exact free cash expected:
my $actual_profile;
my $mapping = NGCP::BulkProcessor::Dao::mr457::billing::billing_mappings::findby_contractid_ts($context->{db},
$contract_id,$context->{now})->[0]; # the actual profile is considered for the next balance interval
if ($mapping) {
$actual_profile = $context->{billing_profile_map}->{$mapping->{billing_profile_id}};
if (($actual_profile->{interval_free_cash} // 0.0) != 0.0) {
my $carry_over_mode = $NGCP::BulkProcessor::Dao::mr457::billing::profile_packages::DEFAULT_CARRY_OVER_MODE;
$carry_over_mode = $context->{profile_package_map}->{$contract->{profile_package_id}}->{carry_over_mode} if $contract->{profile_package_id};
if ($carry_over_mode ne $NGCP::BulkProcessor::Dao::mr457::billing::profile_packages::DISCARD_MODE) {
$result = 1;
} else {
_info($context,"($context->{rownum}) " . 'contract id ' . $contract_id . ' skipped, is in ' . $carry_over_mode . ' carry over mode');
}
} else {
_info($context,"($context->{rownum}) " . 'contract id ' . $contract_id . ' skipped, used a billing profile with free cash in the past');
}
} else {
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . "no billing mapping at $context->{now} for contract id $contract_id");
} else {
_error($context,"($context->{rownum}) " . "no billing mapping at $context->{now} for contract id $contract_id");
}
}
$context->{actual_profile} = $actual_profile;
return $result;
}
sub _generate_cdr_init_context {
my ($context,$time) = @_;
#my $result = 1;
#my $provider = $providers[rand @providers];
my $source_subscriber;
$source_subscriber = _get_subscriber($context);
my $dest_subscriber;
$dest_subscriber = undef;
return 0 unless $source_subscriber;
my $source_peering_subscriber_info;
my $source_reseller;
if ($source_subscriber) {
$source_subscriber->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::findby_id($source_subscriber->{contract_id});
$source_subscriber->{contract}->{contact} = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_id($source_subscriber->{contract}->{contact_id});
$source_subscriber->{contract}->{prov_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(undef,$source_subscriber->{uuid});
$source_subscriber->{primary_alias} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberidisprimary($source_subscriber->{contract}->{prov_subscriber}->{id},1);
$source_subscriber->{domain} = $context->{domain_map}->{$source_subscriber->{domain_id}}->{domain};
$source_reseller = $context->{reseller_map}->{$source_subscriber->{contract}->{contact}->{reseller_id}};
} else {
$source_peering_subscriber_info = _prepare_offnet_subscriber_info("source","offnet.com");
}
my $dest_peering_subscriber_info;
my $dest_reseller;
if ($dest_subscriber) {
$dest_subscriber->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::findby_id($dest_subscriber->{contract_id});
$dest_subscriber->{contract}->{contact} = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_id($dest_subscriber->{contract}->{contact_id});
$dest_subscriber->{contract}->{prov_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(undef,$dest_subscriber->{uuid});
$dest_subscriber->{primary_alias} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberidisprimary($dest_subscriber->{contract}->{prov_subscriber}->{id},1);
$dest_subscriber->{domain} = $context->{domain_map}->{$dest_subscriber->{domain_id}}->{domain};
$dest_reseller = $context->{reseller_map}->{$dest_subscriber->{contract}->{contact}->{reseller_id}};
} else {
$dest_peering_subscriber_info = _prepare_offnet_subscriber_info("destination","offnet.com");
}
my $source_ip = '192.168.0.1';
#my $time = time();
my $duration = 0;
$context->{cdr} = {
#id => ,
#update_time => ,
source_user_id => ($source_subscriber ? $source_subscriber->{uuid} : '0'),
source_provider_id => ($source_reseller ? $source_reseller->{contract_id} : '0'),
#source_external_subscriber_id => ,
#source_external_contract_id => ,
source_account_id => ($source_subscriber ? $source_subscriber->{contract_id} : '0'),
source_user => ($source_subscriber ? $source_subscriber->{username} : $source_peering_subscriber_info->{username}),
source_domain => ($source_subscriber ? $source_subscriber->{domain} : $source_peering_subscriber_info->{domain}),
source_cli => ($source_subscriber ? ($source_subscriber->{primary_alias}->{username} // $source_subscriber->{username}) : $source_peering_subscriber_info->{username}),
#source_clir => '0',
source_ip => $source_ip,
#source_gpp0 => ,
#source_gpp1 => ,
#source_gpp2 => ,
#source_gpp3 => ,
#source_gpp4 => ,
#source_gpp5 => ,
#source_gpp6 => ,
#source_gpp7 => ,
#source_gpp8 => ,
#source_gpp9 => ,
destination_user_id => ($dest_subscriber ? $dest_subscriber->{uuid} : '0'),
destination_provider_id => ($dest_reseller ? $dest_reseller->{contract_id} : '0'),
#destination_external_subscriber_id => ,
#destination_external_contract_id => ,
destination_account_id => ($dest_subscriber ? $dest_subscriber->{contract_id} : '0'),
destination_user => ($dest_subscriber ? $dest_subscriber->{username} : $dest_peering_subscriber_info->{username}),
destination_domain => ($dest_subscriber ? $dest_subscriber->{domain} : $dest_peering_subscriber_info->{domain}),
destination_user_dialed => ($dest_subscriber ? ($dest_subscriber->{primary_alias}->{username} // $dest_subscriber->{username}) : $dest_peering_subscriber_info->{username}),
destination_user_in => ($dest_subscriber ? ($dest_subscriber->{primary_alias}->{username} // $dest_subscriber->{username}) : $dest_peering_subscriber_info->{username}),
destination_domain_in => ($dest_subscriber ? $dest_subscriber->{domain} : $dest_peering_subscriber_info->{domain}),
#destination_gpp0 => ,
#destination_gpp1 => ,
#destination_gpp2 => ,
#destination_gpp3 => ,
#destination_gpp4 => ,
#destination_gpp5 => ,
#destination_gpp6 => ,
#destination_gpp7 => ,
#destination_gpp8 => ,
#destination_gpp9 => ,
#peer_auth_user => ,
#peer_auth_realm => ,
call_type => 'call',
call_status => 'ok',
call_code => '200',
init_time => $time->epoch,
start_time => $time->epoch,
duration => $duration,
call_id => _generate_call_id(),
#source_carrier_cost => ,
#source_reseller_cost => ,
#source_customer_cost => ,
#source_carrier_free_time => ,
#source_reseller_free_time => ,
#source_customer_free_time => ,
#source_carrier_billing_fee_id => ,
#source_reseller_billing_fee_id => ,
#source_customer_billing_fee_id => ,
#source_carrier_billing_zone_id => ,
#source_reseller_billing_zone_id => ,
#source_customer_billing_zone_id => ,
#destination_carrier_cost => ,
#destination_reseller_cost => ,
#destination_customer_cost => ,
#destination_carrier_free_time => ,
#destination_reseller_free_time => ,
#destination_customer_free_time => ,
#destination_carrier_billing_fee_id => ,
#destination_reseller_billing_fee_id => ,
#destination_customer_billing_fee_id => ,
#destination_carrier_billing_zone_id => ,
#destination_reseller_billing_zone_id => ,
#destination_customer_billing_zone_id => ,
#frag_carrier_onpeak => ,
#frag_reseller_onpeak => ,
#frag_customer_onpeak => ,
#is_fragmented => ,
#split => ,
#rated_at => ,
#rating_status => 'unrated',
#exported_at => ,
#export_status => ,
};
return 1;
}
sub _get_subscriber {
my ($context,$excluding_id) = @_;
return NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::findby_contractid_states(
$context->{db},
$context->{contract}->{id},
{ 'IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::ACTIVE_STATE },
)->[0];
}
sub _prepare_offnet_subscriber_info {
my ($username_primary_number,$domain) = @_;
return { username => $username_primary_number, domain => $domain };
}
sub _generate_call_id {
return '*TEST*'._random_string(26,'a'..'z','A'..'Z',0..9,'-','.');
}
sub _random_string {
my ($length,@chars) = @_;
return join('',@chars[ map{ rand @chars } 1 .. $length ]);
}
sub _check_insert_tables {
#NGCP::BulkProcessor::Dao::mr38::provisioning::voip_usr_preferences::check_table();
}
sub _error {

@ -26,7 +26,7 @@ use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(prompt);
use NGCP::BulkProcessor::Utils qw(prompt stringtobool);
#format_number check_ipnet
require Exporter;
@ -44,6 +44,12 @@ our @EXPORT_OK = qw(
$fix_contract_balance_gaps_multithreading
$fix_contract_balance_gaps_numofthreads
$fix_free_cash_multithreading
$fix_free_cash_numofthreads
$write_topup_log
$apply_negative_delta
);
our $defaultconfig = 'config.cfg';
@ -56,6 +62,12 @@ our $skip_errors = 0;
our $fix_contract_balance_gaps_multithreading = $enablemultithreading;
our $fix_contract_balance_gaps_numofthreads = $cpucount;
our $fix_free_cash_multithreading = $enablemultithreading;
our $fix_free_cash_numofthreads = $cpucount;
our $write_topup_log = 0;
our $apply_negative_delta = 0;
sub update_settings {
my ($data,$configfile) = @_;
@ -75,6 +87,12 @@ sub update_settings {
$fix_contract_balance_gaps_multithreading = $data->{fix_contract_balance_gaps_multithreading} if exists $data->{fix_contract_balance_gaps_multithreading};
$fix_contract_balance_gaps_numofthreads = _get_numofthreads($cpucount,$data,'fix_contract_balance_gaps_numofthreads');
$fix_free_cash_multithreading = $data->{fix_free_cash_multithreading} if exists $data->{fix_free_cash_multithreading};
$fix_free_cash_numofthreads = _get_numofthreads($cpucount,$data,'fix_free_cash_numofthreads');
$write_topup_log = stringtobool($data->{write_topup_log}) if exists $data->{write_topup_log};
$apply_negative_delta = stringtobool($data->{apply_negative_delta}) if exists $data->{apply_negative_delta};
return $result;
}

@ -9,28 +9,28 @@ jobservers = 127.0.0.1:4730
#provisioning_conf = /etc/ngcp-panel/provisioning.conf
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 127.0.0.1
accounting_host = somehost
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = 192.168.0.74
billing_host = somehost
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = 192.168.0.74
provisioning_host = somehost
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = 192.168.0.74
kamailio_host = somehost
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
@ -44,7 +44,7 @@ xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_uri = https://somehost:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http

@ -0,0 +1,62 @@
##general settings:
working_path = /home/rkrenn/temp/soco
cpucount = 4
enablemultithreading = 0
##gearman/service listener config:
jobservers = 127.0.0.1:4730
#provisioning_conf = /etc/ngcp-panel/provisioning.conf
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 192.168.0.84
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = 192.168.0.84
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = 192.168.0.84
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = 192.168.0.84
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = 192.168.0.84
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = DEBUG
screenloglevel = INFO
emailloglevel = OFF

@ -50,16 +50,21 @@ use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
);
use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw();
use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw();
#use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw();
#use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances qw();
use NGCP::BulkProcessor::Projects::Disaster::Balances::Check qw(
check_billing_db_tables
check_fix_contract_balance_gaps_tables
check_fix_free_cash_tables
);
#check_rest_get_items
use NGCP::BulkProcessor::Projects::Disaster::Balances::Contracts qw(
fix_contract_balance_gaps
fix_free_cash
);
#use NGCP::BulkProcessor::Projects::Disaster::Balances::Api qw(
@ -77,15 +82,21 @@ my @TASK_OPTS = ();
my $tasks = [];
my $check_task_opt = 'check';
push(@TASK_OPTS,$check_task_opt);
my $check_fix_contract_balance_gaps_task_opt = 'check_fix_gaps';
push(@TASK_OPTS,$check_fix_contract_balance_gaps_task_opt);
my $check_fix_free_cash_task_opt = 'check_fix_free_cash';
push(@TASK_OPTS,$check_fix_free_cash_task_opt);
my $cleanup_task_opt = 'cleanup';
push(@TASK_OPTS,$cleanup_task_opt);
my $fix_contract_balance_gaps_task_opt = 'fix_contract_balance_gaps';
my $fix_contract_balance_gaps_task_opt = 'fix_gaps';
push(@TASK_OPTS,$fix_contract_balance_gaps_task_opt);
my $fix_free_cash_task_opt = 'fix_free_cash';
push(@TASK_OPTS,$fix_free_cash_task_opt);
if (init()) {
main();
exit(0);
@ -127,8 +138,10 @@ sub main() {
scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors;
foreach my $task (@$tasks) {
if (lc($check_task_opt) eq lc($task)) {
$result &= check_task(\@messages) if taskinfo($check_task_opt,$result);
if (lc($check_fix_contract_balance_gaps_task_opt) eq lc($task)) {
$result &= check_fix_contract_balance_gaps_task(\@messages) if taskinfo($check_fix_contract_balance_gaps_task_opt,$result);
} elsif (lc($check_fix_free_cash_task_opt) eq lc($task)) {
$result &= check_fix_free_cash_task(\@messages) if taskinfo($check_fix_free_cash_task_opt,$result);
} elsif (lc($cleanup_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages) if taskinfo($cleanup_task_opt,$result);
@ -139,6 +152,13 @@ sub main() {
$completion |= 1;
}
} elsif (lc($fix_free_cash_task_opt) eq lc($task)) {
if (taskinfo($fix_free_cash_task_opt,$result)) {
next unless check_dry();
$result &= fix_free_cash_task(\@messages);
$completion |= 1;
}
} else {
$result = 0;
scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
@ -166,10 +186,27 @@ sub taskinfo {
return $result;
}
sub check_task {
sub check_fix_contract_balance_gaps_task {
my ($messages) = @_;
my @check_messages = ();
my $result = check_billing_db_tables(\@check_messages);
my $result = check_fix_contract_balance_gaps_tables(\@check_messages);
#$result &= ..
push(@$messages,join("\n",@check_messages));
#@check_messages = ();
#$result = check_provisioning_db_tables(\@check_messages);
##$result &= ..
#push(@$messages,join("\n",@check_messages));
destroy_dbs();
return $result;
}
sub check_fix_free_cash_task {
my ($messages) = @_;
my @check_messages = ();
my $result = check_fix_free_cash_tables(\@check_messages);
#$result &= ..
push(@$messages,join("\n",@check_messages));
@ -234,6 +271,34 @@ sub fix_contract_balance_gaps_task {
}
sub fix_free_cash_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = fix_free_cash();
#if ($batch) {
# ($result,$warning_count) = set_barring_profiles_batch();
#} else {
# ($result,$warning_count) = set_barring_profiles();
#}
};
my $err = $@;
my $stats = ($skip_errors ? ": $warning_count warnings" : '');
eval {
};
if ($err or !$result) {
push(@$messages,"fix free cash INCOMPLETE$stats");
} else {
push(@$messages,"fix free cash completed$stats");
}
destroy_dbs(); #every task should leave with closed connections.
return $result;
}
#END {
# # this should not be required explicitly, but prevents Log4Perl's
# # "rootlogger not initialized error upon exit..

@ -2,3 +2,11 @@
#dry=0
#skip_errors=0
fix_contract_balance_gaps_numofthreads=2
fix_contract_balance_gaps_multithreading=1
fix_free_cash_numofthreads=2
fix_free_cash_multithreading=1
write_topup_log=0
apply_negative_delta=0

@ -0,0 +1,9 @@
#dry=0
#skip_errors=0
fix_contract_balance_gaps_numofthreads=2
fix_contract_balance_gaps_multithreading=1
fix_free_cash_numofthreads=2
fix_free_cash_multithreading=1

@ -8,11 +8,10 @@ webpassword_length = 8
webusername_length = 8
sippassword_length = 16
sipusername_length = 8
provision_subscriber_count = 20000
provision_subscriber_count = 100000
providers_yml = providers.yml
generate_cdr_multithreading = 1
#generate_cdr_numofthreads = 2
generate_cdr_count = 50000
generate_cdr_count = 500000

@ -14,4 +14,4 @@ providers_yml = providers.yml
generate_cdr_multithreading = 1
#generate_cdr_numofthreads = 2
generate_cdr_count = 100
generate_cdr_count = 500

@ -66,6 +66,8 @@ our $ITEM_REL_PARAM = 'item_rel';
my $request_charset = 'utf-8';
my $response_charset = 'utf-8';
my $timeout = 5*60;
sub _get_api_cert_dir {
return $working_path . $API_CERT_DIR;
}
@ -129,6 +131,7 @@ sub _setup_ua {
verify_hostname => 0,
SSL_verify_mode => 0,
);
$ua->timeout($timeout) if $timeout;
if ($self->{username}) {
$ua->credentials($netloc, $self->{realm}, $self->{username}, $self->{password});
}

@ -63,6 +63,15 @@ sub new {
}
sub set_transaction_isolation {
my ($self,$level) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return undef;
}
sub _gettemptablename {
my $self = shift;
my $temp_tablename = 'TMP_TBL_' . $self->{tid} . '_';
@ -122,6 +131,16 @@ sub columnidentifier {
my $columnname = shift;
my (@params) = @_;
return join('.',map { $self->_columnidentifier($_,@params); } split(/\./,$columnname,-1));
}
sub _columnidentifier {
my $self = shift;
my $columnname = shift;
my (@params) = @_;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return undef;

@ -149,7 +149,7 @@ sub tableidentifier {
}
sub columnidentifier {
sub _columnidentifier {
my $self = shift;
my $columnname = shift;

@ -25,7 +25,7 @@ use NGCP::BulkProcessor::SqlConnector;
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlConnector);
our @EXPORT_OK = qw(get_tableidentifier);
our @EXPORT_OK = qw(get_tableidentifier $READ_COMMITTED);
my $defaulthost = '127.0.0.1';
my $defaultport = '3306';
@ -55,6 +55,8 @@ my $rowblock_transactional = 1;
my $serialization_level = ''; #'SERIALIZABLE'
our $READ_COMMITTED = 'READ COMMITTED';
sub new {
my $class = shift;
@ -101,7 +103,7 @@ sub tableidentifier {
}
sub columnidentifier {
sub _columnidentifier {
my $self = shift;
my $columnname = shift;
@ -248,6 +250,13 @@ sub db_connect {
}
sub set_transaction_isolation {
my ($self,$level) = @_;
return $self->db_do("SET TRANSACTION ISOLATION LEVEL $level");
}
sub vacuum {
my $self = shift;

@ -103,7 +103,7 @@ sub tableidentifier {
}
sub columnidentifier {
sub _columnidentifier {
my $self = shift;
my $columnname = shift;

@ -98,7 +98,7 @@ sub tableidentifier {
}
sub columnidentifier {
sub _columnidentifier {
my $self = shift;
my $columnname = shift;

@ -98,7 +98,7 @@ sub tableidentifier {
return $tablename;
}
sub columnidentifier {
sub _columnidentifier {
my $self = shift;
my $columnname = shift;

@ -115,7 +115,7 @@ sub tableidentifier {
}
sub columnidentifier {
sub _columnidentifier {
my $self = shift;
my $columnname = shift;

Loading…
Cancel
Save