TT#5569 contract_balance gap recovery for mr3.8

+initial process.pl
+contract balance gap fix thread
 +contract iteration
 +context init
 +context teardown
 +detectig gaps and overlaps
 +insert missing contract balances:
  no packages or free time considered for now

+calendar implementation
+drop URI::Encode dependency
+for now, adopt to run with mr3.8 tables only.

Change-Id: I48e0de59bb703adc0d28092c1a93a5b47432bc03
(cherry picked from commit c7521621ae)
changes/37/10237/2
Rene Krenn 9 years ago
parent fcb89fa0bb
commit 8a61eb12ce

1
debian/control vendored

@ -15,7 +15,6 @@ Replaces:
Breaks:
ngcp-bulk-processor-pro,
Depends:
libany-uri-escape-perl,
libarchive-zip-perl,
libconfig-any-perl,
libdata-dump-perl,

@ -1,4 +1,4 @@
package NGCP::BulkProcessor::FakeTime;
package NGCP::BulkProcessor::Calendar;
use strict;
## no critic
@ -28,11 +28,18 @@ our @EXPORT_OK = qw(
fake_current_unix
infinite_future
is_infinite_future
infinite_past
is_infinite_past
datetime_to_string
datetime_from_string
set_timezone
);
my $is_fake_time = 0;
my $timezone_cache = {};
my $UTC = DateTime::TimeZone->new(name => 'UTC');
my $LOCAL = DateTime::TimeZone->new(name => 'local');
my $FLOATING = DateTime::TimeZone::Floating->new();
sub set_fake_time {
my ($o) = @_;
@ -65,13 +72,9 @@ sub fake_current_unix {
sub _current_local {
if ($is_fake_time) {
return DateTime->from_epoch(epoch => Time::Warp::time,
time_zone => DateTime::TimeZone->new(name => 'local')
);
return DateTime->from_epoch(epoch => Time::Warp::time, time_zone => $LOCAL);
} else {
return DateTime->now(
time_zone => DateTime::TimeZone->new(name => 'local')
);
return DateTime->now(time_zone => $LOCAL);
}
}
@ -81,7 +84,7 @@ sub infinite_future {
#applying the 'local' timezone takes too long -> "The current implementation of DateTime::TimeZone
#will use a huge amount of memory calculating all the DST changes from now until the future date.
#Use UTC or the floating time zone and you will be safe."
time_zone => DateTime::TimeZone->new(name => 'UTC')
time_zone => $UTC
#- with floating timezones, the long conversion takes place when comparing with a 'local' dt
#- the error due to leap years/seconds is not relevant in comparisons
);
@ -92,6 +95,19 @@ sub is_infinite_future {
return $dt->year >= 9999;
}
sub infinite_past {
#mysql 5.5: The supported range is '1000-01-01 00:00:00' ...
return DateTime->new(year => 1000, month => 1, day => 1, hour => 0, minute => 0, second => 0,
time_zone => $UTC
);
#$dt->epoch calls should be okay if perl >= 5.12.0
}
sub is_infinite_past {
my $dt = shift;
return $dt->year <= 1000;
}
sub datetime_to_string {
my ($dt) = @_;
return unless defined ($dt);
@ -101,11 +117,28 @@ sub datetime_to_string {
}
sub datetime_from_string {
my $s = shift;
my ($s,$tz) = @_;
$s =~ s/^(\d{4}\-\d{2}\-\d{2})\s+(\d.+)$/$1T$2/;
my $ts = DateTime::Format::ISO8601->parse_datetime($s);
$ts->set_time_zone( DateTime::TimeZone->new(name => 'local') );
return $ts;
return set_timezone($ts,$tz);
}
sub set_timezone {
my ($dt,$tz) = @_;
return unless defined ($dt);
if (defined $tz and length($tz) > 0) {
my $timezone;
if (exists $timezone_cache->{$tz}) {
$timezone = $timezone_cache->{$tz};
} else {
$timezone = DateTime::TimeZone->new(name => $tz);
$timezone_cache->{$tz} = $timezone;
}
$dt->set_time_zone( $timezone );
} else { #floating otherwise.
$dt->set_time_zone( $FLOATING );
}
return $dt;
}
sub _set_fake_time {

@ -25,6 +25,8 @@ our @EXPORT_OK = qw(
gettablename
check_table
insert_row
findby_contractid_ts
);
my $tablename = 'billing_mappings';
@ -56,6 +58,32 @@ sub new {
}
sub findby_contractid_ts {
my ($xa_db,$contract_id,$dt,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
if (defined $dt) {
$stmt .= ' AND (' . $db->columnidentifier('start_date') . ' IS NULL OR ' . $db->columnidentifier('start_date') . ' <= ? ) ' .
'AND (' . $db->columnidentifier('end_date') . ' IS NULL OR ' . $db->columnidentifier('end_date') . ' >= ? ) ' .
'ORDER BY ' . $db->columnidentifier('start_date') . ' DESC, ' . $db->columnidentifier('id') . ' DESC LIMIT 1';
push(@params, $db->datetime_to_string($dt) );
push(@params, $db->datetime_to_string($dt) );
}
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_row {
my $db = &$get_db();

@ -3,6 +3,8 @@ use strict;
## no critic
use DateTime qw();
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
@ -18,6 +20,7 @@ use NGCP::BulkProcessor::SqlProcessor qw(
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Calendar qw(is_infinite_future infinite_future set_timezone);
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
@ -25,6 +28,8 @@ our @EXPORT_OK = qw(
gettablename
check_table
insert_row
findby_contractid
sort_by_end
);
my $tablename = 'contract_balances';
@ -62,6 +67,25 @@ sub new {
}
sub findby_contractid {
my ($xa_db,$contract_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_row {
my $db = &$get_db();
@ -115,10 +139,19 @@ sub buildrecords_fromrows {
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
my $db = &$get_db();
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
my $end = $db->datetime_from_string($record->{end},undef);
if (is_infinite_future($end)) {
$record->{_end} = infinite_future();
} else {
$record->{_end} = set_timezone($end);
}
$record->{_start} = $db->datetime_from_string($record->{start},'local');
push @records,$record;
}
@ -128,6 +161,34 @@ sub buildrecords_fromrows {
}
sub sort_by_end ($$) {
return _sort_by_date('_end',0,@_);
}
sub _sort_by_date {
my ($ts_field,$desc,$a,$b) = @_;
if ($desc) {
$desc = -1;
} else {
$desc = 1;
}
#use Data::Dumper;
#print Dumper($a);
#print Dumper($b);
my $a_inf = is_infinite_future($a->{$ts_field});
my $b_inf = is_infinite_future($b->{$ts_field});
if ($a_inf and $b_inf) {
return 0;
} elsif ($a_inf) {
return 1 * $desc;
} elsif ($b_inf) {
return -1 * $desc;
} else {
return DateTime->compare($a->{$ts_field}, $b->{$ts_field}) * $desc;
}
}
sub gettablename {
return $tablename;

@ -10,12 +10,15 @@ use NGCP::BulkProcessor::Logging qw(
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
destroy_dbs
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
copy_row
process_table
);
use NGCP::BulkProcessor::SqlRecord qw();
@ -28,6 +31,8 @@ our @EXPORT_OK = qw(
countby_status_resellerid
process_records
$ACTIVE_STATE
$TERMINATED_STATE
);
@ -139,6 +144,44 @@ sub insert_row {
}
sub process_records {
my %params = @_;
my ($process_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads,
$load_recursive) = @params{qw/
process_code
static_context
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
load_recursive
/};
check_table();
return process_table(
get_db => $get_db,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset);
},
static_context => $static_context,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_dbs_code => \&destroy_dbs,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;

@ -0,0 +1,168 @@
package NGCP::BulkProcessor::Dao::mr38::billing::billing_mappings;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
findby_contractid_ts
);
my $tablename = 'billing_mappings';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'start_date',
'end_date',
'billing_profile_id',
'contract_id',
'product_id',
#'network_id',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_contractid_ts {
my ($xa_db,$contract_id,$dt,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
if (defined $dt) {
$stmt .= ' AND (' . $db->columnidentifier('start_date') . ' IS NULL OR ' . $db->columnidentifier('start_date') . ' <= ? ) ' .
'AND (' . $db->columnidentifier('end_date') . ' IS NULL OR ' . $db->columnidentifier('end_date') . ' >= ? ) ' .
'ORDER BY ' . $db->columnidentifier('start_date') . ' DESC, ' . $db->columnidentifier('id') . ' DESC LIMIT 1';
push(@params, $db->datetime_to_string($dt) );
push(@params, $db->datetime_to_string($dt) );
}
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($billing_profile_id,
$contract_id,
$product_id) = @params{qw/
billing_profile_id
contract_id
product_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('billing_profile_id') . ', ' .
$db->columnidentifier('contract_id') . ', ' .
$db->columnidentifier('end_date') . ', ' .
#$db->columnidentifier('network_id') . ', ' .
$db->columnidentifier('product_id') . ', ' .
$db->columnidentifier('start_date') . ') VALUES (' .
'?, ' .
'?, ' .
'NULL, ' .
#'NULL, ' .
'?, ' .
'NULL)',
$billing_profile_id,
$contract_id,
$product_id,
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,121 @@
package NGCP::BulkProcessor::Dao::mr38::billing::billing_profiles;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
findby_id
);
my $tablename = 'billing_profiles';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'reseller_id',
'handle',
'name',
'prepaid',
'interval_charge',
'interval_free_time',
'interval_free_cash',
'interval_unit',
'interval_count',
'fraud_interval_limit',
'fraud_interval_lock',
'fraud_interval_notify',
'fraud_daily_limit',
'fraud_daily_lock',
'fraud_daily_notify',
'fraud_use_reseller_rates',
'currency',
'status',
'modify_timestamp',
'create_timestamp',
'terminate_timestamp',
];
my $indexes = {};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,209 @@
package NGCP::BulkProcessor::Dao::mr38::billing::contract_balances;
use strict;
## no critic
use DateTime qw();
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
copy_row
);
use NGCP::BulkProcessor::SqlRecord qw();
use NGCP::BulkProcessor::Calendar qw(is_infinite_future infinite_future set_timezone);
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
findby_contractid
sort_by_end
);
my $tablename = 'contract_balances';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'contract_id',
'cash_balance',
'cash_balance_interval',
'free_time_balance',
'free_time_balance_interval',
#'topup_count',
#'timely_topup_count',
'start',
'end',
'invoice_id',
#'underrun_profiles',
#'underrun_lock',
];
my $indexes = {};
my $insert_unique_fields = [];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub findby_contractid {
my ($xa_db,$contract_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($contract_id) = @params{qw/
contract_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('cash_balance') . ', ' .
$db->columnidentifier('cash_balance_interval') . ', ' .
$db->columnidentifier('contract_id') . ', ' .
$db->columnidentifier('end') . ', ' .
$db->columnidentifier('free_time_balance') . ', ' .
$db->columnidentifier('free_time_balance_interval') . ', ' .
$db->columnidentifier('start') . ') VALUES (' .
#$db->columnidentifier('start') . ', ' .
#$db->columnidentifier('underrun_lock') . ', ' .
#$db->columnidentifier('underrun_profiles') . ') VALUES (' .
'0.0, ' .
'0.0, ' .
'?, ' .
'CONCAT(LAST_DAY(NOW()),\' 23:59:59\'), ' .
'0, ' .
'0, ' .
'CONCAT(SUBDATE(CURDATE(),(DAY(CURDATE())-1)),\' 00:00:00\')',
#'CONCAT(SUBDATE(CURDATE(),(DAY(CURDATE())-1)),\' 00:00:00\'), ' .
#'NULL, ' .
#'NULL)',
$contract_id,
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
my $db = &$get_db();
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
my $end = $db->datetime_from_string($record->{end},undef);
if (is_infinite_future($end)) {
$record->{_end} = infinite_future();
} else {
$record->{_end} = set_timezone($end);
}
$record->{_start} = $db->datetime_from_string($record->{start},'local');
push @records,$record;
}
}
return \@records;
}
sub sort_by_end ($$) {
return _sort_by_date('_end',0,@_);
}
sub _sort_by_date {
my ($ts_field,$desc,$a,$b) = @_;
if ($desc) {
$desc = -1;
} else {
$desc = 1;
}
#use Data::Dumper;
#print Dumper($a);
#print Dumper($b);
my $a_inf = is_infinite_future($a->{$ts_field});
my $b_inf = is_infinite_future($b->{$ts_field});
if ($a_inf and $b_inf) {
return 0;
} elsif ($a_inf) {
return 1 * $desc;
} elsif ($b_inf) {
return -1 * $desc;
} else {
return DateTime->compare($a->{$ts_field}, $b->{$ts_field}) * $desc;
}
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,221 @@
package NGCP::BulkProcessor::Dao::mr38::billing::contracts;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowinserted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_billing_db
destroy_dbs
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
insert_record
copy_row
process_table
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
insert_row
countby_status_resellerid
process_records
$ACTIVE_STATE
$TERMINATED_STATE
);
my $tablename = 'contracts';
my $get_db = \&get_billing_db;
my $expected_fieldnames = [
'id',
'customer_id',
'contact_id',
'order_id',
#'profile_package_id',
'status',
'external_id',
'modify_timestamp',
'create_timestamp',
'activate_timestamp',
'terminate_timestamp',
'max_subscribers',
'send_invoice',
'subscriber_email_template_id',
'passreset_email_template_id',
'invoice_email_template_id',
'invoice_template_id',
'vat_rate',
'add_vat',
];
my $indexes = {};
my $insert_unique_fields = [];
our $ACTIVE_STATE = 'active';
our $TERMINATED_STATE = 'terminated';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub countby_status_resellerid {
my ($status,$reseller_id) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' AS contract' .
' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::contacts::gettablename()) . ' AS contact ON contract.contact_id = contact.id';
my @params = ();
my @terms = ();
if ($status) {
push(@terms,'contract.status = ?');
push(@params,$status);
}
if ($reseller_id) {
push(@terms,'contact.reseller_id = ?');
push(@params,$reseller_id);
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
return $db->db_get_value($stmt,@params);
}
sub insert_row {
my $db = &$get_db();
my $xa_db = shift // $db;
if ('HASH' eq ref $_[0]) {
my ($data,$insert_ignore) = @_;
check_table();
if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
} else {
my %params = @_;
my ($contact_id) = @params{qw/
contact_id
/};
if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' .
$db->columnidentifier('contact_id') . ', ' .
$db->columnidentifier('create_timestamp') . ', ' .
$db->columnidentifier('modify_timestamp') . ', ' .
$db->columnidentifier('status') . ') VALUES (' .
'?, ' .
'NOW(), ' .
'NOW(), ' .
'\'' . $ACTIVE_STATE . '\')',
$contact_id,
)) {
rowinserted($db,$tablename,getlogger(__PACKAGE__));
return $xa_db->db_last_insert_id();
}
}
return undef;
}
sub process_records {
my %params = @_;
my ($process_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads,
$load_recursive) = @params{qw/
process_code
static_context
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
load_recursive
/};
check_table();
return process_table(
get_db => $get_db,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset);
},
static_context => $static_context,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_dbs_code => \&destroy_dbs,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,110 @@
package NGCP::BulkProcessor::Projects::Disaster::Balances::Check;
use strict;
## no critic
no strict 'refs';
use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw();
use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw();
#use NGCP::BulkProcessor::RestRequests::mr38::Contracts qw();
#use NGCP::BulkProcessor::RestRequests::mr38::Customers qw();
#use NGCP::BulkProcessor::RestRequests::mr38::BillingProfiles qw();
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
check_billing_db_tables
);
#check_rest_get_items
my $NOK = 'NOK';
my $OK = 'ok';
sub check_billing_db_tables {
my ($messages) = @_;
my $result = 1;
my $check_result;
my $message;
my $message_prefix = 'NGCP billing db tables - ';
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr38::billing::contracts');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr38::billing::contract_balances');
$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr38::billing::lnp_providers');
#if (not $check_result) {
# ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers');
#}
#$result &= $check_result; push(@$messages,$message);
return $result;
}
sub _check_table {
my ($message_prefix,$module) = @_;
my $result = 0;
my $message = ($message_prefix // '') . &{$module . '::gettablename'}() . ': ';
eval {
$result = &{$module . '::check_table'}();
};
if (@$ or not $result) {
return (0,$message . $NOK);
} else {
return (1,$message . $OK);
}
}
#sub check_rest_get_items {
#
# my ($messages) = @_;
#
# my $result = 1;
# my $check_result;
# my $message;
#
# my $message_prefix = 'NGCP id\'s/constants - ';
#
# ($check_result,$message, my $reseller) = _check_rest_get_item($message_prefix,
# 'NGCP::BulkProcessor::RestRequests::mr38::Resellers',
# $reseller_id,
# 'name');
# $result &= $check_result; push(@$messages,$message);
#
#
# return $result;
#
#}
sub _check_rest_get_item {
my ($message_prefix,$module,$id,$item_name_field,$get_method,$item_path_method) = @_;
my $item = undef;
$get_method //= 'get_item';
$item_path_method //= 'get_item_path';
my $message = ($message_prefix // '') . &{$module . '::' . $item_path_method}($id) . ': ';
return (0,$message . $NOK,$item) unless $id;
eval {
$item = &{$module . '::' . $get_method}($id);
};
if (@$ or not defined $item or ('ARRAY' eq ref $item and (scalar @$item) != 1)) {
return (0,$message . $NOK,$item);
} else {
$item = $item->[0] if ('ARRAY' eq ref $item and (scalar @$item) == 1);
return (1,$message . "'" . $item->{$item_name_field} . "' " . $OK,$item);
}
}
1;

@ -0,0 +1,340 @@
package NGCP::BulkProcessor::Projects::Disaster::Balances::Contracts;
use strict;
## no critic
use threads::shared qw();
#use List::Util qw();
use DateTime qw();
use NGCP::BulkProcessor::Projects::Disaster::Balances::Settings qw(
$dry
$skip_errors
$fix_contract_balance_gaps_multithreading
$fix_contract_balance_gaps_numofthreads
);
#$set_preference_bulk_numofthreads
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
processing_debug
);
use NGCP::BulkProcessor::LogError qw(
rowprocessingerror
rowprocessingwarn
);
use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw();
use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw();
use NGCP::BulkProcessor::Dao::mr38::billing::billing_mappings qw();
use NGCP::BulkProcessor::ConnectorPool qw(
get_xa_db
);
use NGCP::BulkProcessor::ConnectorPool qw(
destroy_dbs
);
use NGCP::BulkProcessor::Utils qw(threadid);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
fix_contract_balance_gaps
);
sub fix_contract_balance_gaps {
my $static_context = {};
my $result = _fix_contract_balance_gaps_checks($static_context);
destroy_dbs();
my $warning_count :shared = 0;
return ($result && NGCP::BulkProcessor::Dao::mr38::billing::contracts::process_records(
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
my $rownum = $row_offset;
foreach my $contract (@$records) {
$rownum++;
next unless _reset_fix_contract_balance_gaps_context($context,$contract,$rownum);
_fix_contract_balance_gaps($context);
}
#return 0;
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
# below is not mandatory..
_check_insert_tables();
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
load_recursive => 0,
multithreading => $fix_contract_balance_gaps_multithreading,
numofthreads => $fix_contract_balance_gaps_numofthreads,
),$warning_count);
}
sub _check_insert_tables {
#NGCP::BulkProcessor::Dao::mr38::provisioning::voip_usr_preferences::check_table();
}
sub _fix_contract_balance_gaps {
my ($context) = @_;
eval {
$context->{db}->db_begin();
my $last_balance = undef;
foreach my $contract_balance (sort NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::sort_by_end @{$context->{contract_balances}}) {
#print " " . $contract_balance->{id} . " " . $contract_balance->{_start} . ' ' . $contract_balance->{_end} . "\n";
if (defined $last_balance) {
my $gap_start = $last_balance->{_end}->clone->add(seconds => 1);
my $gap_end = $contract_balance->{_start};
my $date_comparison = DateTime->compare($gap_start, $gap_end);
if ($date_comparison > 0) {
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . 'contract balances overlap for contract id ' . $context->{contract}->{id} . ' detected: '.
$gap_start . ' - ' . $gap_end);
} else {
_error($context,"($context->{rownum}) " . 'contract balances overlap for contract id ' . $context->{contract}->{id} . ' detected: '.
$gap_start . ' - ' . $gap_end);
}
} elsif ($date_comparison < 0) {
_info($context,"($context->{rownum}) " . 'contract balances gap for contract id ' . $context->{contract}->{id} . ' detected: '.
$gap_start . ' - ' . $gap_end);
_insert_contract_balances($context,$gap_start,$gap_end->clone->subtract(seconds => 1),$contract_balance);
}
}
$last_balance = $contract_balance;
}
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
}
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
};
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . 'database error with contract id ' . $context->{contract}->{id} . ': ' . $err);
} else {
_error($context,"($context->{rownum}) " . 'database error with contract id ' . $context->{contract}->{id} . ': ' . $err);
}
}
}
sub _insert_contract_balances {
my ($context,$gap_start,$gap_end,$contract_balance) = @_;
my $start = $gap_start;
my $last_end;
my $end;
while (($end = $start->clone->add(months => 1)->subtract(seconds => 1)) <= $gap_end) {
my $billing_mapping = NGCP::BulkProcessor::Dao::mr38::billing::billing_mappings::findby_contractid_ts($context->{db},$context->{contract}->{id},$start)->[0];
if (defined $billing_mapping) {
#todo: check if billing profile is postpaid, has zero free_time and free_cash.
#todo: contracts with profile packages defining intervals other than 1 month are not supported atm.
#todo: dynamically choose mr38/4x contract_balance table dao.
$last_end = $end;
#_insert_contract_balances($context,$gap_start,$gap_end,$contract_balance,$billing_mapping);
my $id = NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::insert_row($context->{db},{
contract_id => $context->{contract}->{id},
start => $context->{db}->datetime_to_string($start),
end => $context->{db}->datetime_to_string($end),
cash_balance => 0,
free_time_balance => 0,
});
_info($context,"($context->{rownum}) " . 'contract balance id ' . $id . ' for contract id ' . $context->{contract}->{id} . ' inserted: '.
$start . ' - ' . $end);
} else {
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . 'no billing mapping for contract id ' . $context->{contract}->{id} . ', t = ' . $start . ' found ');
} else {
_error($context,"($context->{rownum}) " . 'no billing mapping for contract id ' . $context->{contract}->{id} . ', t = ' . $start . ' found ');
}
}
$start = $end->clone->add(seconds => 1);
}
if (not defined $last_end or DateTime->compare($last_end, $gap_end) != 0) {
if ($skip_errors) {
_warn($context,"($context->{rownum}) " . 'contract balances gap for contract id ' . $context->{contract}->{id} . ' cannot be filled with monthly intervals');
} else {
_error($context,"($context->{rownum}) " . 'contract balances gap for contract id ' . $context->{contract}->{id} . ' cannot be filled with monthly intervals');
}
}
}
sub _fix_contract_balance_gaps_checks {
my ($context) = @_;
my $result = _checks($context);
return $result;
}
sub _reset_fix_contract_balance_gaps_context {
my ($context,$contract,$rownum) = @_;
my $result = _reset_context($context,$contract,$rownum);
$context->{contract_balances} = NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::findby_contractid($context->{db},$context->{contract}->{id});
#$context->{barring_profile} = $imported_subscriber->{barring_profile};
#$context->{ncos_level} = $context->{ncos_level_map}->{$context->{barring_profile}};
#delete $context->{adm_ncos_id_preference_id};
return $result;
}
sub _checks {
my ($context) = @_;
my $result = 1;
#my $optioncount = 0;
#eval {
# $optioncount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::FeatureOption::countby_subscribernumber_option();
#};
#if ($@ or $optioncount == 0) {
# rowprocessingerror(threadid(),'please import subscriber features first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
#my $userpasswordcount = 0;
#eval {
# $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::countby_fqdn();
#};
#if ($@ or $userpasswordcount == 0) {
# rowprocessingerror(threadid(),'please import user passwords first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
#my $subscribercount = 0;
#my $subscriber_barring_profiles = [];
#eval {
# $subscribercount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::countby_subscribernumber();
# $subscriber_barring_profiles = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::list_barringprofiles();
#};
#if ($@ or $subscribercount == 0) {
# rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
return $result;
}
sub _reset_context {
my ($context,$contract,$rownum) = @_;
my $result = 1;
$context->{rownum} = $rownum;
$context->{contract} = $contract;
#$context->{cli} = $imported_subscriber->subscribernumber();
#$context->{e164} = {};
#$context->{e164}->{cc} = substr($context->{cli},0,3);
#$context->{e164}->{ac} = '';
#$context->{e164}->{sn} = substr($context->{cli},3);
#$context->{subscriberdelta} = $imported_subscriber->{delta};
#my $userpassword = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::findby_fqdn($context->{cli});
#if (defined $userpassword) {
# $context->{username} = (defined $subsciber_username_prefix ? $subsciber_username_prefix : '') . $userpassword->{username};
# $context->{password} = $userpassword->{password};
# $context->{userpassworddelta} = $userpassword->{delta};
#} else {
# # once full username+passwords is available:
# delete $context->{username};
# delete $context->{password};
# delete $context->{userpassworddelta};
# if ($context->{subscriberdelta} eq
# $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::deleted_delta) {
#
# } else {
# $result &= 0;
#
# # for now, as username+passwords are incomplete:
# #$context->{username} = $context->{e164}->{sn};
# #$context->{password} = $context->{username};
# #$context->{userpassworddelta} = $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::updated_delta;
#
# if ($skip_errors) {
# # for now, as username+passwords are incomplete:
# _warn($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli});
# } else {
# _error($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli});
# }
# }
#}
#
#delete $context->{billing_voip_subscriber};
#delete $context->{provisioning_voip_subscriber};
return $result;
}
sub _error {
my ($context,$message) = @_;
$context->{error_count} = $context->{error_count} + 1;
rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _info {
my ($context,$message,$debug) = @_;
if ($debug) {
processing_debug($context->{tid},$message,getlogger(__PACKAGE__));
} else {
processing_info($context->{tid},$message,getlogger(__PACKAGE__));
}
}
1;

@ -0,0 +1,114 @@
package NGCP::BulkProcessor::Projects::Disaster::Balances::Settings;
use strict;
## no critic
use NGCP::BulkProcessor::Globals qw(
$enablemultithreading
$cpucount
);
#$working_path
#create_path
use NGCP::BulkProcessor::Logging qw(
getlogger
scriptinfo
configurationinfo
);
use NGCP::BulkProcessor::LogError qw(
fileerror
configurationwarn
configurationerror
);
use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(prompt);
#format_number check_ipnet
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
update_settings
check_dry
$defaultsettings
$defaultconfig
$dry
$skip_errors
$force
$fix_contract_balance_gaps_multithreading
$fix_contract_balance_gaps_numofthreads
);
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
our $force = 0;
our $dry = 0;
our $skip_errors = 0;
our $fix_contract_balance_gaps_multithreading = $enablemultithreading;
our $fix_contract_balance_gaps_numofthreads = $cpucount;
sub update_settings {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
#my $regexp_result;
#&$configurationinfocode("testinfomessage",$configlogger);
#$result &= _prepare_working_paths(1);
$dry = $data->{dry} if exists $data->{dry};
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$fix_contract_balance_gaps_multithreading = $data->{fix_contract_balance_gaps_multithreading} if exists $data->{fix_contract_balance_gaps_multithreading};
$fix_contract_balance_gaps_numofthreads = _get_numofthreads($cpucount,$data,'fix_contract_balance_gaps_numofthreads');
return $result;
}
return 0;
}
sub check_dry {
if ($dry) {
scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__));
return 1;
} else {
scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__));
if (!$force) {
if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) {
return 1;
} else {
return 0;
}
} else {
scriptinfo('force option applied',getlogger(__PACKAGE__));
return 1;
}
}
}
sub _get_numofthreads {
my ($default_value,$data,$key) = @_;
my $_numofthreads = $default_value;
$_numofthreads = $data->{$key} if exists $data->{$key};
$_numofthreads = $cpucount if $_numofthreads > $cpucount;
return $_numofthreads;
}
1;

@ -0,0 +1,62 @@
##general settings:
working_path = /var/sipwise
cpucount = 4
enablemultithreading = 0
##gearman/service listener config:
jobservers = 127.0.0.1:4730
#provisioning_conf = /etc/ngcp-panel/provisioning.conf
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 127.0.0.1
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = 192.168.0.74
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = 192.168.0.74
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = 192.168.0.74
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = 192.168.0.74
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = DEBUG
screenloglevel = INFO
emailloglevel = OFF

@ -0,0 +1,245 @@
use strict;
## no critic
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../');
use Getopt::Long qw(GetOptions);
use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::Disaster::Balances::Settings qw(
update_settings
check_dry
$defaultsettings
$defaultconfig
$dry
$skip_errors
$force
);
use NGCP::BulkProcessor::Logging qw(
init_log
getlogger
$attachmentlogfile
scriptinfo
cleanuplogfiles
$currentlogfile
);
use NGCP::BulkProcessor::LogError qw (
completion
done
scriptwarn
scripterror
filewarn
fileerror
);
use NGCP::BulkProcessor::LoadConfig qw(
load_config
$SIMPLE_CONFIG_TYPE
$YAML_CONFIG_TYPE
$ANY_CONFIG_TYPE
);
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
);
use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw();
use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw();
use NGCP::BulkProcessor::Projects::Disaster::Balances::Check qw(
check_billing_db_tables
);
#check_rest_get_items
use NGCP::BulkProcessor::Projects::Disaster::Balances::Contracts qw(
fix_contract_balance_gaps
);
#use NGCP::BulkProcessor::Projects::Disaster::Balances::Api qw(
# set_call_forwards
# set_call_forwards_batch
#);
use NGCP::BulkProcessor::ConnectorPool qw(
destroy_dbs
);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
my @TASK_OPTS = ();
my $tasks = [];
my $check_task_opt = 'check';
push(@TASK_OPTS,$check_task_opt);
my $cleanup_task_opt = 'cleanup';
push(@TASK_OPTS,$cleanup_task_opt);
my $fix_contract_balance_gaps_task_opt = 'fix_contract_balance_gaps';
push(@TASK_OPTS,$fix_contract_balance_gaps_task_opt);
if (init()) {
main();
exit(0);
} else {
exit(1);
}
sub init {
my $configfile = $defaultconfig;
my $settingsfile = $defaultsettings;
return 0 unless GetOptions(
"config=s" => \$configfile,
"settings=s" => \$settingsfile,
"task=s" => $tasks,
"dry" => \$dry,
"skip-errors" => \$skip_errors,
"force" => \$force,
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
$tasks = removeduplicates($tasks,1);
my $result = load_config($configfile);
init_log();
$result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE);
return $result;
}
sub main() {
my @messages = ();
my @attachmentfiles = ();
my $result = 1;
my $completion = 0;
if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) {
scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors;
foreach my $task (@$tasks) {
if (lc($check_task_opt) eq lc($task)) {
$result &= check_task(\@messages) if taskinfo($check_task_opt,$result);
} elsif (lc($cleanup_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages) if taskinfo($cleanup_task_opt,$result);
} elsif (lc($fix_contract_balance_gaps_task_opt) eq lc($task)) {
if (taskinfo($fix_contract_balance_gaps_task_opt,$result)) {
next unless check_dry();
$result &= fix_contract_balance_gaps_task(\@messages);
$completion |= 1;
}
} else {
$result = 0;
scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
last;
}
}
} else {
$result = 0;
scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
}
push(@attachmentfiles,$attachmentlogfile);
if ($completion) {
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
} else {
done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
}
return $result;
}
sub taskinfo {
my ($task,$result) = @_;
scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath()));
return $result;
}
sub check_task {
my ($messages) = @_;
my @check_messages = ();
my $result = check_billing_db_tables(\@check_messages);
#$result &= ..
push(@$messages,join("\n",@check_messages));
#@check_messages = ();
#$result = check_provisioning_db_tables(\@check_messages);
##$result &= ..
#push(@$messages,join("\n",@check_messages));
destroy_dbs();
return $result;
}
sub cleanup_task {
my ($messages) = @_;
my $result = 0;
eval {
#cleanupcvsdirs() if $clean_generated;
#cleanupdbfiles() if $clean_generated;
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
cleanupmsgfiles(\&fileerror,\&filewarn);
#cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
#cleanupdir($rollback_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
$result = 1;
};
if ($@ or !$result) {
push(@$messages,'cleanup INCOMPLETE');
return 0;
} else {
push(@$messages,'cleanup completed');
return 1;
}
}
sub fix_contract_balance_gaps_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = fix_contract_balance_gaps();
#if ($batch) {
# ($result,$warning_count) = set_barring_profiles_batch();
#} else {
# ($result,$warning_count) = set_barring_profiles();
#}
};
my $err = $@;
my $stats = ($skip_errors ? ": $warning_count warnings" : '');
eval {
};
if ($err or !$result) {
push(@$messages,"fix contract balances gaps INCOMPLETE$stats");
} else {
push(@$messages,"fix contract balances gaps completed$stats");
}
destroy_dbs(); #every task should leave with closed connections.
return $result;
}
#END {
# # this should not be required explicitly, but prevents Log4Perl's
# # "rootlogger not initialized error upon exit..
# destroy_all_dbs
#}
__DATA__
This exists to allow the locking code at the beginning of the file to work.
DO NOT REMOVE THESE LINES!

@ -240,10 +240,10 @@ sub update_settings {
$import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading};
$features_define_filename = _get_import_filename($features_define_filename,$data,'features_define_filename');
$features_define_import_numofthreads =_get_import_numofthreads($cpucount,$data,'features_define_import_numofthreads');
$features_define_import_numofthreads =_get_numofthreads($cpucount,$data,'features_define_import_numofthreads');
$subscriber_define_filename = _get_import_filename($subscriber_define_filename,$data,'subscriber_define_filename');
$subscriber_define_import_numofthreads = _get_import_numofthreads($cpucount,$data,'subscriber_define_import_numofthreads');
$subscriber_define_import_numofthreads = _get_numofthreads($cpucount,$data,'subscriber_define_import_numofthreads');
$subscribernumer_exclude_pattern = $data->{subscribernumer_exclude_pattern} if exists $data->{subscribernumer_exclude_pattern};
($regexp_result,$subscribernumer_exclude_pattern) = parse_regexp($subscribernumer_exclude_pattern,$configfile);
@ -257,16 +257,16 @@ sub update_settings {
$result &= $regexp_result;
$lnp_define_filename = _get_import_filename($lnp_define_filename,$data,'lnp_define_filename');
$lnp_define_import_numofthreads = _get_import_numofthreads($cpucount,$data,'lnp_define_import_numofthreads');
$lnp_define_import_numofthreads = _get_numofthreads($cpucount,$data,'lnp_define_import_numofthreads');
$user_password_filename = _get_import_filename($user_password_filename,$data,'user_password_filename');
$user_password_import_numofthreads = _get_import_numofthreads($cpucount,$data,'user_password_import_numofthreads');
$user_password_import_numofthreads = _get_numofthreads($cpucount,$data,'user_password_import_numofthreads');
$username_prefix = $data->{username_prefix} if exists $data->{username_prefix};
$min_password_length = $data->{min_password_length} if exists $data->{min_password_length};
$batch_filename = _get_import_filename($batch_filename,$data,'batch_filename');
$batch_import_numofthreads = _get_import_numofthreads($cpucount,$data,'batch_import_numofthreads');
$batch_import_numofthreads = _get_numofthreads($cpucount,$data,'batch_import_numofthreads');
$reseller_id = $data->{reseller_id} if exists $data->{reseller_id};
$domain_name = $data->{domain_name} if exists $data->{domain_name};
@ -285,20 +285,20 @@ sub update_settings {
$subsciber_username_prefix = $data->{subsciber_username_prefix} if exists $data->{subsciber_username_prefix};
$provision_subscriber_multithreading = $data->{provision_subscriber_multithreading} if exists $data->{provision_subscriber_multithreading};
$provision_subscriber_numofthreads = _get_import_numofthreads($cpucount,$data,'provision_subscriber_numofthreads');
$provision_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_subscriber_numofthreads');
$reprovision_upon_password_change = $data->{reprovision_upon_password_change} if exists $data->{reprovision_upon_password_change};
$always_update_subscriber = $data->{always_update_subscriber} if exists $data->{always_update_subscriber};
$set_barring_profiles_multithreading = $data->{set_barring_profiles_multithreading} if exists $data->{set_barring_profiles_multithreading};
$set_barring_profiles_numofthreads = _get_import_numofthreads($cpucount,$data,'set_barring_profiles_numofthreads');
$set_barring_profiles_numofthreads = _get_numofthreads($cpucount,$data,'set_barring_profiles_numofthreads');
$barring_profiles_yml = $data->{barring_profiles_yml} if exists $data->{barring_profiles_yml};
$set_peer_auth_multithreading = $data->{set_peer_auth_multithreading} if exists $data->{set_peer_auth_multithreading};
$set_peer_auth_numofthreads = _get_import_numofthreads($cpucount,$data,'set_peer_auth_numofthreads');
$set_peer_auth_numofthreads = _get_numofthreads($cpucount,$data,'set_peer_auth_numofthreads');
$peer_auth_realm = $data->{peer_auth_realm} if exists $data->{peer_auth_realm};
$set_allowed_ips_multithreading = $data->{set_peer_auth_multithreading} if exists $data->{set_allowed_ips_multithreading};
$set_allowed_ips_numofthreads = _get_import_numofthreads($cpucount,$data,'set_allowed_ips_numofthreads');
$set_allowed_ips_numofthreads = _get_numofthreads($cpucount,$data,'set_allowed_ips_numofthreads');
$allowed_ips = [ split_tuple($data->{allowed_ips}) ] if exists $data->{allowed_ips};
foreach my $ipnet (@$allowed_ips) {
if (not check_ipnet($ipnet)) {
@ -308,7 +308,7 @@ sub update_settings {
}
$set_call_forwards_multithreading = $data->{set_call_forwards_multithreading} if exists $data->{set_call_forwards_multithreading};
$set_call_forwards_numofthreads = _get_import_numofthreads($cpucount,$data,'set_call_forwards_numofthreads');
$set_call_forwards_numofthreads = _get_numofthreads($cpucount,$data,'set_call_forwards_numofthreads');
$cfb_priorities = [ split_tuple($data->{cfb_priorities}) ] if exists $data->{cfb_priorities};
$cfb_timeouts = [ split_tuple($data->{cfb_timeouts}) ] if exists $data->{cfb_timeouts};
$cfu_priorities = [ split_tuple($data->{cfu_priorities}) ] if exists $data->{cfu_priorities};
@ -330,11 +330,11 @@ sub update_settings {
}
$create_lnps_multithreading = $data->{create_lnps_multithreading} if exists $data->{create_lnps_multithreading};
$create_lnps_numofthreads = _get_import_numofthreads($cpucount,$data,'create_lnps_numofthreads');
$create_lnps_numofthreads = _get_numofthreads($cpucount,$data,'create_lnps_numofthreads');
$create_lnp_block_txn = $data->{create_lnp_block_txn} if exists $data->{create_lnp_block_txn};
$set_preference_bulk_multithreading = $data->{set_preference_bulk_multithreading} if exists $data->{set_preference_bulk_multithreading};
$set_preference_bulk_numofthreads = _get_import_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads');
$set_preference_bulk_numofthreads = _get_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads');
$concurrent_max_total = $data->{concurrent_max_total} if exists $data->{concurrent_max_total};
if (defined $concurrent_max_total and $concurrent_max_total <= 0) {
configurationerror($configfile,'empty concurrent_max_total or greater than 0 required',getlogger(__PACKAGE__));
@ -388,12 +388,12 @@ sub _prepare_working_paths {
}
sub _get_import_numofthreads {
sub _get_numofthreads {
my ($default_value,$data,$key) = @_;
my $import_numofthreads = $default_value;
$import_numofthreads = $data->{$key} if exists $data->{$key};
$import_numofthreads = $cpucount if $import_numofthreads > $cpucount;
return $import_numofthreads;
my $_numofthreads = $default_value;
$_numofthreads = $data->{$key} if exists $data->{$key};
$_numofthreads = $cpucount if $_numofthreads > $cpucount;
return $_numofthreads;
}
sub _get_import_db_file {

@ -24,7 +24,7 @@ use NGCP::BulkProcessor::LogError qw(
use NGCP::BulkProcessor::RestConnector qw(_add_headers);
use NGCP::BulkProcessor::FakeTime qw(get_fake_now_string);
use NGCP::BulkProcessor::Calendar qw(get_fake_now_string);
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::RestConnector);

@ -8,7 +8,7 @@ use threads::shared;
use Thread::Queue;
use Time::HiRes qw(sleep);
use URI::Escape qw();
#use URI::Escape qw();
use NGCP::BulkProcessor::Globals qw(
$enablemultithreading
@ -27,7 +27,7 @@ use NGCP::BulkProcessor::LogError qw(
restprocessingfailed
);
use NGCP::BulkProcessor::Utils qw(threadid);
use NGCP::BulkProcessor::Utils qw(threadid urlencode urldecode);
require Exporter;
our @ISA = qw(Exporter);
@ -55,7 +55,8 @@ sub get_query_string {
} else {
$query .= '&';
}
$query .= URI::Escape::uri_escape($param) . '=' . URI::Escape::uri_escape($filters->{$param});
#$query .= URI::Escape::uri_escape($param) . '=' . URI::Escape::uri_escape($filters->{$param});
$query .= urlencode($param) . '=' . urlencode($filters->{$param});
}
return $query;
};

@ -26,6 +26,7 @@ use DBI;
use NGCP::BulkProcessor::Utils qw(threadid);
use NGCP::BulkProcessor::Array qw(arrayeq);
use NGCP::BulkProcessor::RandomString qw(createtmpstring);
use NGCP::BulkProcessor::Calendar qw();
require Exporter;
our @ISA = qw(Exporter);
@ -456,6 +457,18 @@ sub _fetch_error {
}
sub datetime_to_string {
my $self = shift;
my ($dt) = @_;
return NGCP::BulkProcessor::Calendar::datetime_to_string($dt);
}
sub datetime_from_string {
my $self = shift;
my ($s,$tz) = @_;
return NGCP::BulkProcessor::Calendar::datetime_from_string($s,$tz);
}
# "The data type is 'sticky' in that bind values passed to execute() are bound with
# the data type specified by earlier bind_param() calls, if any."
sub _bind_params {

Loading…
Cancel
Save