From 8a61eb12ce270d9ab5da7034ab9fea461e76fbc8 Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Thu, 10 Nov 2016 11:31:04 +0100 Subject: [PATCH] TT#5569 contract_balance gap recovery for mr3.8 +initial process.pl +contract balance gap fix thread +contract iteration +context init +context teardown +detectig gaps and overlaps +insert missing contract balances: no packages or free time considered for now +calendar implementation +drop URI::Encode dependency +for now, adopt to run with mr3.8 tables only. Change-Id: I48e0de59bb703adc0d28092c1a93a5b47432bc03 (cherry picked from commit c7521621aeac04f9c97a1f7657cc1ade00171739) --- debian/control | 1 - .../{FakeTime.pm => Calendar.pm} | 55 ++- .../Dao/Trunk/billing/billing_mappings.pm | 28 ++ .../Dao/Trunk/billing/contract_balances.pm | 61 ++++ .../Dao/Trunk/billing/contracts.pm | 43 +++ .../Dao/mr38/billing/billing_mappings.pm | 168 +++++++++ .../Dao/mr38/billing/billing_profiles.pm | 121 +++++++ .../Dao/mr38/billing/contract_balances.pm | 209 +++++++++++ .../Dao/mr38/billing/contracts.pm | 221 ++++++++++++ .../Projects/Disaster/Balances/Check.pm | 110 ++++++ .../Projects/Disaster/Balances/Contracts.pm | 340 ++++++++++++++++++ .../Projects/Disaster/Balances/Settings.pm | 114 ++++++ .../Projects/Disaster/Balances/config.cfg | 62 ++++ .../Projects/Disaster/Balances/process.pl | 245 +++++++++++++ .../Projects/Disaster/Balances/settings.cfg | 4 + .../Projects/Migration/IPGallery/Settings.pm | 34 +- .../RestConnectors/NGCPRestApi.pm | 2 +- lib/NGCP/BulkProcessor/RestProcessor.pm | 7 +- lib/NGCP/BulkProcessor/SqlConnector.pm | 13 + 19 files changed, 1805 insertions(+), 33 deletions(-) rename lib/NGCP/BulkProcessor/{FakeTime.pm => Calendar.pm} (68%) create mode 100644 lib/NGCP/BulkProcessor/Dao/mr38/billing/billing_mappings.pm create mode 100644 lib/NGCP/BulkProcessor/Dao/mr38/billing/billing_profiles.pm create mode 100644 lib/NGCP/BulkProcessor/Dao/mr38/billing/contract_balances.pm create mode 100644 lib/NGCP/BulkProcessor/Dao/mr38/billing/contracts.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Check.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Contracts.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Settings.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.cfg create mode 100644 lib/NGCP/BulkProcessor/Projects/Disaster/Balances/process.pl create mode 100644 lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.cfg diff --git a/debian/control b/debian/control index b33c267..d82f845 100644 --- a/debian/control +++ b/debian/control @@ -15,7 +15,6 @@ Replaces: Breaks: ngcp-bulk-processor-pro, Depends: - libany-uri-escape-perl, libarchive-zip-perl, libconfig-any-perl, libdata-dump-perl, diff --git a/lib/NGCP/BulkProcessor/FakeTime.pm b/lib/NGCP/BulkProcessor/Calendar.pm similarity index 68% rename from lib/NGCP/BulkProcessor/FakeTime.pm rename to lib/NGCP/BulkProcessor/Calendar.pm index 420b719..4ebd56f 100644 --- a/lib/NGCP/BulkProcessor/FakeTime.pm +++ b/lib/NGCP/BulkProcessor/Calendar.pm @@ -1,4 +1,4 @@ -package NGCP::BulkProcessor::FakeTime; +package NGCP::BulkProcessor::Calendar; use strict; ## no critic @@ -28,11 +28,18 @@ our @EXPORT_OK = qw( fake_current_unix infinite_future is_infinite_future + infinite_past + is_infinite_past datetime_to_string datetime_from_string + set_timezone ); my $is_fake_time = 0; +my $timezone_cache = {}; +my $UTC = DateTime::TimeZone->new(name => 'UTC'); +my $LOCAL = DateTime::TimeZone->new(name => 'local'); +my $FLOATING = DateTime::TimeZone::Floating->new(); sub set_fake_time { my ($o) = @_; @@ -65,13 +72,9 @@ sub fake_current_unix { sub _current_local { if ($is_fake_time) { - return DateTime->from_epoch(epoch => Time::Warp::time, - time_zone => DateTime::TimeZone->new(name => 'local') - ); + return DateTime->from_epoch(epoch => Time::Warp::time, time_zone => $LOCAL); } else { - return DateTime->now( - time_zone => DateTime::TimeZone->new(name => 'local') - ); + return DateTime->now(time_zone => $LOCAL); } } @@ -81,7 +84,7 @@ sub infinite_future { #applying the 'local' timezone takes too long -> "The current implementation of DateTime::TimeZone #will use a huge amount of memory calculating all the DST changes from now until the future date. #Use UTC or the floating time zone and you will be safe." - time_zone => DateTime::TimeZone->new(name => 'UTC') + time_zone => $UTC #- with floating timezones, the long conversion takes place when comparing with a 'local' dt #- the error due to leap years/seconds is not relevant in comparisons ); @@ -92,6 +95,19 @@ sub is_infinite_future { return $dt->year >= 9999; } +sub infinite_past { + #mysql 5.5: The supported range is '1000-01-01 00:00:00' ... + return DateTime->new(year => 1000, month => 1, day => 1, hour => 0, minute => 0, second => 0, + time_zone => $UTC + ); + #$dt->epoch calls should be okay if perl >= 5.12.0 +} + +sub is_infinite_past { + my $dt = shift; + return $dt->year <= 1000; +} + sub datetime_to_string { my ($dt) = @_; return unless defined ($dt); @@ -101,11 +117,28 @@ sub datetime_to_string { } sub datetime_from_string { - my $s = shift; + my ($s,$tz) = @_; $s =~ s/^(\d{4}\-\d{2}\-\d{2})\s+(\d.+)$/$1T$2/; my $ts = DateTime::Format::ISO8601->parse_datetime($s); - $ts->set_time_zone( DateTime::TimeZone->new(name => 'local') ); - return $ts; + return set_timezone($ts,$tz); +} + +sub set_timezone { + my ($dt,$tz) = @_; + return unless defined ($dt); + if (defined $tz and length($tz) > 0) { + my $timezone; + if (exists $timezone_cache->{$tz}) { + $timezone = $timezone_cache->{$tz}; + } else { + $timezone = DateTime::TimeZone->new(name => $tz); + $timezone_cache->{$tz} = $timezone; + } + $dt->set_time_zone( $timezone ); + } else { #floating otherwise. + $dt->set_time_zone( $FLOATING ); + } + return $dt; } sub _set_fake_time { diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_mappings.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_mappings.pm index dcb5120..a2ba1e2 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_mappings.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/billing_mappings.pm @@ -25,6 +25,8 @@ our @EXPORT_OK = qw( gettablename check_table insert_row + + findby_contractid_ts ); my $tablename = 'billing_mappings'; @@ -56,6 +58,32 @@ sub new { } +sub findby_contractid_ts { + + my ($xa_db,$contract_id,$dt,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_id') . ' = ?'; + my @params = ($contract_id); + if (defined $dt) { + $stmt .= ' AND (' . $db->columnidentifier('start_date') . ' IS NULL OR ' . $db->columnidentifier('start_date') . ' <= ? ) ' . + 'AND (' . $db->columnidentifier('end_date') . ' IS NULL OR ' . $db->columnidentifier('end_date') . ' >= ? ) ' . + 'ORDER BY ' . $db->columnidentifier('start_date') . ' DESC, ' . $db->columnidentifier('id') . ' DESC LIMIT 1'; + push(@params, $db->datetime_to_string($dt) ); + push(@params, $db->datetime_to_string($dt) ); + } + + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub insert_row { my $db = &$get_db(); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm index 7610b20..8353cb5 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contract_balances.pm @@ -3,6 +3,8 @@ use strict; ## no critic +use DateTime qw(); + use NGCP::BulkProcessor::Logging qw( getlogger rowinserted @@ -18,6 +20,7 @@ use NGCP::BulkProcessor::SqlProcessor qw( copy_row ); use NGCP::BulkProcessor::SqlRecord qw(); +use NGCP::BulkProcessor::Calendar qw(is_infinite_future infinite_future set_timezone); require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); @@ -25,6 +28,8 @@ our @EXPORT_OK = qw( gettablename check_table insert_row + findby_contractid + sort_by_end ); my $tablename = 'contract_balances'; @@ -62,6 +67,25 @@ sub new { } +sub findby_contractid { + + my ($xa_db,$contract_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_id') . ' = ?'; + my @params = ($contract_id); + + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + sub insert_row { my $db = &$get_db(); @@ -115,10 +139,19 @@ sub buildrecords_fromrows { my $record; if (defined $rows and ref $rows eq 'ARRAY') { + my $db = &$get_db(); foreach my $row (@$rows) { $record = __PACKAGE__->new($row); # transformations go here ... + my $end = $db->datetime_from_string($record->{end},undef); + if (is_infinite_future($end)) { + $record->{_end} = infinite_future(); + } else { + $record->{_end} = set_timezone($end); + } + + $record->{_start} = $db->datetime_from_string($record->{start},'local'); push @records,$record; } @@ -128,6 +161,34 @@ sub buildrecords_fromrows { } +sub sort_by_end ($$) { + return _sort_by_date('_end',0,@_); +} + +sub _sort_by_date { + my ($ts_field,$desc,$a,$b) = @_; + if ($desc) { + $desc = -1; + } else { + $desc = 1; + } + #use Data::Dumper; + #print Dumper($a); + #print Dumper($b); + my $a_inf = is_infinite_future($a->{$ts_field}); + my $b_inf = is_infinite_future($b->{$ts_field}); + if ($a_inf and $b_inf) { + return 0; + } elsif ($a_inf) { + return 1 * $desc; + } elsif ($b_inf) { + return -1 * $desc; + } else { + return DateTime->compare($a->{$ts_field}, $b->{$ts_field}) * $desc; + } + +} + sub gettablename { return $tablename; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm index cedb880..a6624d4 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/contracts.pm @@ -10,12 +10,15 @@ use NGCP::BulkProcessor::Logging qw( use NGCP::BulkProcessor::ConnectorPool qw( get_billing_db + destroy_dbs ); use NGCP::BulkProcessor::SqlProcessor qw( checktableinfo insert_record copy_row + + process_table ); use NGCP::BulkProcessor::SqlRecord qw(); @@ -28,6 +31,8 @@ our @EXPORT_OK = qw( countby_status_resellerid + process_records + $ACTIVE_STATE $TERMINATED_STATE ); @@ -139,6 +144,44 @@ sub insert_row { } +sub process_records { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + check_table(); + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_dbs, + multithreading => $multithreading, + tableprocessing_threads => $numofthreads, + ); +} + + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/mr38/billing/billing_mappings.pm b/lib/NGCP/BulkProcessor/Dao/mr38/billing/billing_mappings.pm new file mode 100644 index 0000000..ceb4af5 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr38/billing/billing_mappings.pm @@ -0,0 +1,168 @@ +package NGCP::BulkProcessor::Dao::mr38::billing::billing_mappings; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowinserted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + insert_record + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + insert_row + + findby_contractid_ts +); + +my $tablename = 'billing_mappings'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'start_date', + 'end_date', + 'billing_profile_id', + 'contract_id', + 'product_id', + #'network_id', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_contractid_ts { + + my ($xa_db,$contract_id,$dt,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_id') . ' = ?'; + my @params = ($contract_id); + if (defined $dt) { + $stmt .= ' AND (' . $db->columnidentifier('start_date') . ' IS NULL OR ' . $db->columnidentifier('start_date') . ' <= ? ) ' . + 'AND (' . $db->columnidentifier('end_date') . ' IS NULL OR ' . $db->columnidentifier('end_date') . ' >= ? ) ' . + 'ORDER BY ' . $db->columnidentifier('start_date') . ' DESC, ' . $db->columnidentifier('id') . ' DESC LIMIT 1'; + push(@params, $db->datetime_to_string($dt) ); + push(@params, $db->datetime_to_string($dt) ); + } + + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($billing_profile_id, + $contract_id, + $product_id) = @params{qw/ + billing_profile_id + contract_id + product_id + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('billing_profile_id') . ', ' . + $db->columnidentifier('contract_id') . ', ' . + $db->columnidentifier('end_date') . ', ' . + #$db->columnidentifier('network_id') . ', ' . + $db->columnidentifier('product_id') . ', ' . + $db->columnidentifier('start_date') . ') VALUES (' . + '?, ' . + '?, ' . + 'NULL, ' . + #'NULL, ' . + '?, ' . + 'NULL)', + $billing_profile_id, + $contract_id, + $product_id, + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/mr38/billing/billing_profiles.pm b/lib/NGCP/BulkProcessor/Dao/mr38/billing/billing_profiles.pm new file mode 100644 index 0000000..ad908f8 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr38/billing/billing_profiles.pm @@ -0,0 +1,121 @@ +package NGCP::BulkProcessor::Dao::mr38::billing::billing_profiles; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db + +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + findby_id +); + +my $tablename = 'billing_profiles'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'reseller_id', + 'handle', + 'name', + 'prepaid', + 'interval_charge', + 'interval_free_time', + 'interval_free_cash', + 'interval_unit', + 'interval_count', + 'fraud_interval_limit', + 'fraud_interval_lock', + 'fraud_interval_notify', + 'fraud_daily_limit', + 'fraud_daily_lock', + 'fraud_daily_notify', + 'fraud_use_reseller_rates', + 'currency', + 'status', + 'modify_timestamp', + 'create_timestamp', + 'terminate_timestamp', +]; + +my $indexes = {}; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/mr38/billing/contract_balances.pm b/lib/NGCP/BulkProcessor/Dao/mr38/billing/contract_balances.pm new file mode 100644 index 0000000..ca217e5 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr38/billing/contract_balances.pm @@ -0,0 +1,209 @@ +package NGCP::BulkProcessor::Dao::mr38::billing::contract_balances; +use strict; + +## no critic + +use DateTime qw(); + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowinserted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + insert_record + copy_row +); +use NGCP::BulkProcessor::SqlRecord qw(); +use NGCP::BulkProcessor::Calendar qw(is_infinite_future infinite_future set_timezone); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + insert_row + findby_contractid + sort_by_end +); + +my $tablename = 'contract_balances'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'contract_id', + 'cash_balance', + 'cash_balance_interval', + 'free_time_balance', + 'free_time_balance_interval', + #'topup_count', + #'timely_topup_count', + 'start', + 'end', + 'invoice_id', + #'underrun_profiles', + #'underrun_lock', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_contractid { + + my ($xa_db,$contract_id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('contract_id') . ' = ?'; + my @params = ($contract_id); + + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($contract_id) = @params{qw/ + contract_id + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('cash_balance') . ', ' . + $db->columnidentifier('cash_balance_interval') . ', ' . + $db->columnidentifier('contract_id') . ', ' . + $db->columnidentifier('end') . ', ' . + $db->columnidentifier('free_time_balance') . ', ' . + $db->columnidentifier('free_time_balance_interval') . ', ' . + $db->columnidentifier('start') . ') VALUES (' . + #$db->columnidentifier('start') . ', ' . + #$db->columnidentifier('underrun_lock') . ', ' . + #$db->columnidentifier('underrun_profiles') . ') VALUES (' . + '0.0, ' . + '0.0, ' . + '?, ' . + 'CONCAT(LAST_DAY(NOW()),\' 23:59:59\'), ' . + '0, ' . + '0, ' . + 'CONCAT(SUBDATE(CURDATE(),(DAY(CURDATE())-1)),\' 00:00:00\')', + #'CONCAT(SUBDATE(CURDATE(),(DAY(CURDATE())-1)),\' 00:00:00\'), ' . + #'NULL, ' . + #'NULL)', + $contract_id, + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + my $db = &$get_db(); + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + my $end = $db->datetime_from_string($record->{end},undef); + if (is_infinite_future($end)) { + $record->{_end} = infinite_future(); + } else { + $record->{_end} = set_timezone($end); + } + + $record->{_start} = $db->datetime_from_string($record->{start},'local'); + + push @records,$record; + } + } + + return \@records; + +} + +sub sort_by_end ($$) { + return _sort_by_date('_end',0,@_); +} + +sub _sort_by_date { + my ($ts_field,$desc,$a,$b) = @_; + if ($desc) { + $desc = -1; + } else { + $desc = 1; + } + #use Data::Dumper; + #print Dumper($a); + #print Dumper($b); + my $a_inf = is_infinite_future($a->{$ts_field}); + my $b_inf = is_infinite_future($b->{$ts_field}); + if ($a_inf and $b_inf) { + return 0; + } elsif ($a_inf) { + return 1 * $desc; + } elsif ($b_inf) { + return -1 * $desc; + } else { + return DateTime->compare($a->{$ts_field}, $b->{$ts_field}) * $desc; + } + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Dao/mr38/billing/contracts.pm b/lib/NGCP/BulkProcessor/Dao/mr38/billing/contracts.pm new file mode 100644 index 0000000..35149a7 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/mr38/billing/contracts.pm @@ -0,0 +1,221 @@ +package NGCP::BulkProcessor::Dao::mr38::billing::contracts; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowinserted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_billing_db + destroy_dbs +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + insert_record + copy_row + + process_table +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + insert_row + + countby_status_resellerid + + process_records + + $ACTIVE_STATE + $TERMINATED_STATE +); + +my $tablename = 'contracts'; +my $get_db = \&get_billing_db; + +my $expected_fieldnames = [ + 'id', + 'customer_id', + 'contact_id', + 'order_id', + #'profile_package_id', + 'status', + 'external_id', + 'modify_timestamp', + 'create_timestamp', + 'activate_timestamp', + 'terminate_timestamp', + 'max_subscribers', + 'send_invoice', + 'subscriber_email_template_id', + 'passreset_email_template_id', + 'invoice_email_template_id', + 'invoice_template_id', + 'vat_rate', + 'add_vat', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +our $ACTIVE_STATE = 'active'; +our $TERMINATED_STATE = 'terminated'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub countby_status_resellerid { + + my ($status,$reseller_id) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' AS contract' . + ' INNER JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::billing::contacts::gettablename()) . ' AS contact ON contract.contact_id = contact.id'; + my @params = (); + my @terms = (); + if ($status) { + push(@terms,'contract.status = ?'); + push(@params,$status); + } + if ($reseller_id) { + push(@terms,'contact.reseller_id = ?'); + push(@params,$reseller_id); + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + + return $db->db_get_value($stmt,@params); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($contact_id) = @params{qw/ + contact_id + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('contact_id') . ', ' . + $db->columnidentifier('create_timestamp') . ', ' . + $db->columnidentifier('modify_timestamp') . ', ' . + $db->columnidentifier('status') . ') VALUES (' . + '?, ' . + 'NOW(), ' . + 'NOW(), ' . + '\'' . $ACTIVE_STATE . '\')', + $contact_id, + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + +sub process_records { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + check_table(); + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_dbs, + multithreading => $multithreading, + tableprocessing_threads => $numofthreads, + ); +} + + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Check.pm b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Check.pm new file mode 100644 index 0000000..2eafb91 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Check.pm @@ -0,0 +1,110 @@ +package NGCP::BulkProcessor::Projects::Disaster::Balances::Check; +use strict; + +## no critic + +no strict 'refs'; + +use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw(); +use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw(); + +#use NGCP::BulkProcessor::RestRequests::mr38::Contracts qw(); +#use NGCP::BulkProcessor::RestRequests::mr38::Customers qw(); +#use NGCP::BulkProcessor::RestRequests::mr38::BillingProfiles qw(); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + check_billing_db_tables +); +#check_rest_get_items + +my $NOK = 'NOK'; +my $OK = 'ok'; + +sub check_billing_db_tables { + + my ($messages) = @_; + + my $result = 1; + my $check_result; + my $message; + + my $message_prefix = 'NGCP billing db tables - '; + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr38::billing::contracts'); + $result &= $check_result; push(@$messages,$message); + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr38::billing::contract_balances'); + $result &= $check_result; push(@$messages,$message); + + #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr38::billing::lnp_providers'); + #if (not $check_result) { + # ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::mr441::billing::lnp_providers'); + #} + #$result &= $check_result; push(@$messages,$message); + + return $result; + +} + + +sub _check_table { + + my ($message_prefix,$module) = @_; + my $result = 0; + my $message = ($message_prefix // '') . &{$module . '::gettablename'}() . ': '; + eval { + $result = &{$module . '::check_table'}(); + }; + if (@$ or not $result) { + return (0,$message . $NOK); + } else { + return (1,$message . $OK); + } + +} + +#sub check_rest_get_items { +# +# my ($messages) = @_; +# +# my $result = 1; +# my $check_result; +# my $message; +# +# my $message_prefix = 'NGCP id\'s/constants - '; +# +# ($check_result,$message, my $reseller) = _check_rest_get_item($message_prefix, +# 'NGCP::BulkProcessor::RestRequests::mr38::Resellers', +# $reseller_id, +# 'name'); +# $result &= $check_result; push(@$messages,$message); +# +# +# return $result; +# +#} + +sub _check_rest_get_item { + + my ($message_prefix,$module,$id,$item_name_field,$get_method,$item_path_method) = @_; + my $item = undef; + $get_method //= 'get_item'; + $item_path_method //= 'get_item_path'; + my $message = ($message_prefix // '') . &{$module . '::' . $item_path_method}($id) . ': '; + return (0,$message . $NOK,$item) unless $id; + eval { + $item = &{$module . '::' . $get_method}($id); + }; + + if (@$ or not defined $item or ('ARRAY' eq ref $item and (scalar @$item) != 1)) { + return (0,$message . $NOK,$item); + } else { + $item = $item->[0] if ('ARRAY' eq ref $item and (scalar @$item) == 1); + return (1,$message . "'" . $item->{$item_name_field} . "' " . $OK,$item); + } + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Contracts.pm b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Contracts.pm new file mode 100644 index 0000000..e45d4fd --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Contracts.pm @@ -0,0 +1,340 @@ +package NGCP::BulkProcessor::Projects::Disaster::Balances::Contracts; +use strict; + +## no critic + +use threads::shared qw(); +#use List::Util qw(); +use DateTime qw(); + +use NGCP::BulkProcessor::Projects::Disaster::Balances::Settings qw( + $dry + $skip_errors + + $fix_contract_balance_gaps_multithreading + $fix_contract_balance_gaps_numofthreads +); +#$set_preference_bulk_numofthreads + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn +); + +use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw(); +use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw(); +use NGCP::BulkProcessor::Dao::mr38::billing::billing_mappings qw(); + + + +use NGCP::BulkProcessor::ConnectorPool qw( + get_xa_db +); + +use NGCP::BulkProcessor::ConnectorPool qw( + destroy_dbs +); + +use NGCP::BulkProcessor::Utils qw(threadid); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + fix_contract_balance_gaps +); + +sub fix_contract_balance_gaps { + + my $static_context = {}; + my $result = _fix_contract_balance_gaps_checks($static_context); + + destroy_dbs(); + my $warning_count :shared = 0; + return ($result && NGCP::BulkProcessor::Dao::mr38::billing::contracts::process_records( + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + my $rownum = $row_offset; + foreach my $contract (@$records) { + $rownum++; + next unless _reset_fix_contract_balance_gaps_context($context,$contract,$rownum); + _fix_contract_balance_gaps($context); + } + + #return 0; + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_xa_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + # below is not mandatory.. + _check_insert_tables(); + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + load_recursive => 0, + multithreading => $fix_contract_balance_gaps_multithreading, + numofthreads => $fix_contract_balance_gaps_numofthreads, + ),$warning_count); +} + + +sub _check_insert_tables { + + #NGCP::BulkProcessor::Dao::mr38::provisioning::voip_usr_preferences::check_table(); + +} + +sub _fix_contract_balance_gaps { + my ($context) = @_; + + eval { + $context->{db}->db_begin(); + my $last_balance = undef; + foreach my $contract_balance (sort NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::sort_by_end @{$context->{contract_balances}}) { + #print " " . $contract_balance->{id} . " " . $contract_balance->{_start} . ' ' . $contract_balance->{_end} . "\n"; + if (defined $last_balance) { + my $gap_start = $last_balance->{_end}->clone->add(seconds => 1); + my $gap_end = $contract_balance->{_start}; + my $date_comparison = DateTime->compare($gap_start, $gap_end); + if ($date_comparison > 0) { + if ($skip_errors) { + _warn($context,"($context->{rownum}) " . 'contract balances overlap for contract id ' . $context->{contract}->{id} . ' detected: '. + $gap_start . ' - ' . $gap_end); + } else { + _error($context,"($context->{rownum}) " . 'contract balances overlap for contract id ' . $context->{contract}->{id} . ' detected: '. + $gap_start . ' - ' . $gap_end); + } + } elsif ($date_comparison < 0) { + _info($context,"($context->{rownum}) " . 'contract balances gap for contract id ' . $context->{contract}->{id} . ' detected: '. + $gap_start . ' - ' . $gap_end); + _insert_contract_balances($context,$gap_start,$gap_end->clone->subtract(seconds => 1),$contract_balance); + } + } + $last_balance = $contract_balance; + } + + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } + + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + }; + if ($skip_errors) { + _warn($context,"($context->{rownum}) " . 'database error with contract id ' . $context->{contract}->{id} . ': ' . $err); + } else { + _error($context,"($context->{rownum}) " . 'database error with contract id ' . $context->{contract}->{id} . ': ' . $err); + } + } +} + +sub _insert_contract_balances { + my ($context,$gap_start,$gap_end,$contract_balance) = @_; + + my $start = $gap_start; + my $last_end; + my $end; + while (($end = $start->clone->add(months => 1)->subtract(seconds => 1)) <= $gap_end) { + my $billing_mapping = NGCP::BulkProcessor::Dao::mr38::billing::billing_mappings::findby_contractid_ts($context->{db},$context->{contract}->{id},$start)->[0]; + if (defined $billing_mapping) { + #todo: check if billing profile is postpaid, has zero free_time and free_cash. + #todo: contracts with profile packages defining intervals other than 1 month are not supported atm. + #todo: dynamically choose mr38/4x contract_balance table dao. + $last_end = $end; + #_insert_contract_balances($context,$gap_start,$gap_end,$contract_balance,$billing_mapping); + my $id = NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::insert_row($context->{db},{ + contract_id => $context->{contract}->{id}, + start => $context->{db}->datetime_to_string($start), + end => $context->{db}->datetime_to_string($end), + cash_balance => 0, + free_time_balance => 0, + }); + _info($context,"($context->{rownum}) " . 'contract balance id ' . $id . ' for contract id ' . $context->{contract}->{id} . ' inserted: '. + $start . ' - ' . $end); + } else { + if ($skip_errors) { + _warn($context,"($context->{rownum}) " . 'no billing mapping for contract id ' . $context->{contract}->{id} . ', t = ' . $start . ' found '); + } else { + _error($context,"($context->{rownum}) " . 'no billing mapping for contract id ' . $context->{contract}->{id} . ', t = ' . $start . ' found '); + } + } + $start = $end->clone->add(seconds => 1); + } + if (not defined $last_end or DateTime->compare($last_end, $gap_end) != 0) { + if ($skip_errors) { + _warn($context,"($context->{rownum}) " . 'contract balances gap for contract id ' . $context->{contract}->{id} . ' cannot be filled with monthly intervals'); + } else { + _error($context,"($context->{rownum}) " . 'contract balances gap for contract id ' . $context->{contract}->{id} . ' cannot be filled with monthly intervals'); + } + } + +} + +sub _fix_contract_balance_gaps_checks { + my ($context) = @_; + + my $result = _checks($context); + + return $result; +} + +sub _reset_fix_contract_balance_gaps_context { + + my ($context,$contract,$rownum) = @_; + + my $result = _reset_context($context,$contract,$rownum); + + $context->{contract_balances} = NGCP::BulkProcessor::Dao::mr38::billing::contract_balances::findby_contractid($context->{db},$context->{contract}->{id}); + + #$context->{barring_profile} = $imported_subscriber->{barring_profile}; + #$context->{ncos_level} = $context->{ncos_level_map}->{$context->{barring_profile}}; + + #delete $context->{adm_ncos_id_preference_id}; + + return $result; + +} + + +sub _checks { + + my ($context) = @_; + + my $result = 1; + #my $optioncount = 0; + #eval { + # $optioncount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::FeatureOption::countby_subscribernumber_option(); + #}; + #if ($@ or $optioncount == 0) { + # rowprocessingerror(threadid(),'please import subscriber features first',getlogger(__PACKAGE__)); + # $result = 0; #even in skip-error mode.. + #} + #my $userpasswordcount = 0; + #eval { + # $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::countby_fqdn(); + #}; + #if ($@ or $userpasswordcount == 0) { + # rowprocessingerror(threadid(),'please import user passwords first',getlogger(__PACKAGE__)); + # $result = 0; #even in skip-error mode.. + #} + #my $subscribercount = 0; + #my $subscriber_barring_profiles = []; + #eval { + # $subscribercount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::countby_subscribernumber(); + # $subscriber_barring_profiles = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::list_barringprofiles(); + #}; + #if ($@ or $subscribercount == 0) { + # rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__)); + # $result = 0; #even in skip-error mode.. + #} + + return $result; + +} + +sub _reset_context { + + my ($context,$contract,$rownum) = @_; + + my $result = 1; + + $context->{rownum} = $rownum; + + $context->{contract} = $contract; + + #$context->{cli} = $imported_subscriber->subscribernumber(); + #$context->{e164} = {}; + #$context->{e164}->{cc} = substr($context->{cli},0,3); + #$context->{e164}->{ac} = ''; + #$context->{e164}->{sn} = substr($context->{cli},3); + + #$context->{subscriberdelta} = $imported_subscriber->{delta}; + + #my $userpassword = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::findby_fqdn($context->{cli}); + #if (defined $userpassword) { + # $context->{username} = (defined $subsciber_username_prefix ? $subsciber_username_prefix : '') . $userpassword->{username}; + # $context->{password} = $userpassword->{password}; + # $context->{userpassworddelta} = $userpassword->{delta}; + #} else { + # # once full username+passwords is available: + # delete $context->{username}; + # delete $context->{password}; + # delete $context->{userpassworddelta}; + # if ($context->{subscriberdelta} eq + # $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::deleted_delta) { + # + # } else { + # $result &= 0; + # + # # for now, as username+passwords are incomplete: + # #$context->{username} = $context->{e164}->{sn}; + # #$context->{password} = $context->{username}; + # #$context->{userpassworddelta} = $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::updated_delta; + # + # if ($skip_errors) { + # # for now, as username+passwords are incomplete: + # _warn($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli}); + # } else { + # _error($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli}); + # } + # } + #} + # + #delete $context->{billing_voip_subscriber}; + #delete $context->{provisioning_voip_subscriber}; + + return $result; + +} + + + + + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid},$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid},$message,getlogger(__PACKAGE__)); + } +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Settings.pm new file mode 100644 index 0000000..da7ea7d --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/Settings.pm @@ -0,0 +1,114 @@ +package NGCP::BulkProcessor::Projects::Disaster::Balances::Settings; +use strict; + +## no critic + +use NGCP::BulkProcessor::Globals qw( + $enablemultithreading + $cpucount +); +#$working_path +#create_path + +use NGCP::BulkProcessor::Logging qw( + getlogger + scriptinfo + configurationinfo +); + +use NGCP::BulkProcessor::LogError qw( + fileerror + configurationwarn + configurationerror +); + +use NGCP::BulkProcessor::LoadConfig qw( + split_tuple + parse_regexp +); +use NGCP::BulkProcessor::Utils qw(prompt); +#format_number check_ipnet + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + update_settings + check_dry + + $defaultsettings + $defaultconfig + + $dry + $skip_errors + $force + + $fix_contract_balance_gaps_multithreading + $fix_contract_balance_gaps_numofthreads +); + +our $defaultconfig = 'config.cfg'; +our $defaultsettings = 'settings.cfg'; + +our $force = 0; +our $dry = 0; +our $skip_errors = 0; + +our $fix_contract_balance_gaps_multithreading = $enablemultithreading; +our $fix_contract_balance_gaps_numofthreads = $cpucount; + +sub update_settings { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + #my $regexp_result; + + #&$configurationinfocode("testinfomessage",$configlogger); + + #$result &= _prepare_working_paths(1); + + $dry = $data->{dry} if exists $data->{dry}; + $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; + + $fix_contract_balance_gaps_multithreading = $data->{fix_contract_balance_gaps_multithreading} if exists $data->{fix_contract_balance_gaps_multithreading}; + $fix_contract_balance_gaps_numofthreads = _get_numofthreads($cpucount,$data,'fix_contract_balance_gaps_numofthreads'); + + return $result; + + } + return 0; + +} + +sub check_dry { + + if ($dry) { + scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__)); + return 1; + } else { + scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__)); + if (!$force) { + if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) { + return 1; + } else { + return 0; + } + } else { + scriptinfo('force option applied',getlogger(__PACKAGE__)); + return 1; + } + } + +} + +sub _get_numofthreads { + my ($default_value,$data,$key) = @_; + my $_numofthreads = $default_value; + $_numofthreads = $data->{$key} if exists $data->{$key}; + $_numofthreads = $cpucount if $_numofthreads > $cpucount; + return $_numofthreads; +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.cfg b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.cfg new file mode 100644 index 0000000..0b874d5 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/config.cfg @@ -0,0 +1,62 @@ +##general settings: +working_path = /var/sipwise +cpucount = 4 +enablemultithreading = 0 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +#provisioning_conf = /etc/ngcp-panel/provisioning.conf + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = 127.0.0.1 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = 192.168.0.74 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = 192.168.0.74 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = 192.168.0.74 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = 192.168.0.74 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = DEBUG +screenloglevel = INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/process.pl b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/process.pl new file mode 100644 index 0000000..425984c --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/process.pl @@ -0,0 +1,245 @@ +use strict; + +## no critic + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); + +use Getopt::Long qw(GetOptions); +use Fcntl qw(LOCK_EX LOCK_NB); + +use NGCP::BulkProcessor::Globals qw(); +use NGCP::BulkProcessor::Projects::Disaster::Balances::Settings qw( + update_settings + check_dry + + $defaultsettings + $defaultconfig + $dry + $skip_errors + $force + +); + +use NGCP::BulkProcessor::Logging qw( + init_log + getlogger + $attachmentlogfile + scriptinfo + cleanuplogfiles + $currentlogfile +); +use NGCP::BulkProcessor::LogError qw ( + completion + done + scriptwarn + scripterror + filewarn + fileerror +); +use NGCP::BulkProcessor::LoadConfig qw( + load_config + $SIMPLE_CONFIG_TYPE + $YAML_CONFIG_TYPE + $ANY_CONFIG_TYPE +); +use NGCP::BulkProcessor::Array qw(removeduplicates); +use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); +use NGCP::BulkProcessor::Mail qw( + cleanupmsgfiles +); + +use NGCP::BulkProcessor::Dao::mr38::billing::contracts qw(); +use NGCP::BulkProcessor::Dao::mr38::billing::contract_balances qw(); + +use NGCP::BulkProcessor::Projects::Disaster::Balances::Check qw( + check_billing_db_tables +); +#check_rest_get_items + +use NGCP::BulkProcessor::Projects::Disaster::Balances::Contracts qw( + fix_contract_balance_gaps +); + +#use NGCP::BulkProcessor::Projects::Disaster::Balances::Api qw( +# set_call_forwards +# set_call_forwards_batch +#); + +use NGCP::BulkProcessor::ConnectorPool qw( + destroy_dbs +); + +scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet + +my @TASK_OPTS = (); + +my $tasks = []; + +my $check_task_opt = 'check'; +push(@TASK_OPTS,$check_task_opt); + +my $cleanup_task_opt = 'cleanup'; +push(@TASK_OPTS,$cleanup_task_opt); + +my $fix_contract_balance_gaps_task_opt = 'fix_contract_balance_gaps'; +push(@TASK_OPTS,$fix_contract_balance_gaps_task_opt); + +if (init()) { + main(); + exit(0); +} else { + exit(1); +} + +sub init { + + my $configfile = $defaultconfig; + my $settingsfile = $defaultsettings; + + return 0 unless GetOptions( + "config=s" => \$configfile, + "settings=s" => \$settingsfile, + "task=s" => $tasks, + "dry" => \$dry, + "skip-errors" => \$skip_errors, + "force" => \$force, + ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); + + $tasks = removeduplicates($tasks,1); + + my $result = load_config($configfile); + init_log(); + $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); + return $result; + +} + +sub main() { + + my @messages = (); + my @attachmentfiles = (); + my $result = 1; + my $completion = 0; + + if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { + scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; + foreach my $task (@$tasks) { + + if (lc($check_task_opt) eq lc($task)) { + $result &= check_task(\@messages) if taskinfo($check_task_opt,$result); + } elsif (lc($cleanup_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages) if taskinfo($cleanup_task_opt,$result); + + } elsif (lc($fix_contract_balance_gaps_task_opt) eq lc($task)) { + if (taskinfo($fix_contract_balance_gaps_task_opt,$result)) { + next unless check_dry(); + $result &= fix_contract_balance_gaps_task(\@messages); + $completion |= 1; + } + + } else { + $result = 0; + scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + last; + } + } + } else { + $result = 0; + scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + } + + push(@attachmentfiles,$attachmentlogfile); + if ($completion) { + completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } else { + done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } + + return $result; +} + +sub taskinfo { + my ($task,$result) = @_; + scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); + return $result; +} + +sub check_task { + my ($messages) = @_; + my @check_messages = (); + my $result = check_billing_db_tables(\@check_messages); + #$result &= .. + push(@$messages,join("\n",@check_messages)); + + #@check_messages = (); + #$result = check_provisioning_db_tables(\@check_messages); + ##$result &= .. + #push(@$messages,join("\n",@check_messages)); + + + destroy_dbs(); + return $result; +} + +sub cleanup_task { + my ($messages) = @_; + my $result = 0; + eval { + #cleanupcvsdirs() if $clean_generated; + #cleanupdbfiles() if $clean_generated; + cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); + cleanupmsgfiles(\&fileerror,\&filewarn); + #cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + #cleanupdir($rollback_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + $result = 1; + }; + + if ($@ or !$result) { + push(@$messages,'cleanup INCOMPLETE'); + return 0; + } else { + push(@$messages,'cleanup completed'); + return 1; + } +} + + + +sub fix_contract_balance_gaps_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = fix_contract_balance_gaps(); + #if ($batch) { + # ($result,$warning_count) = set_barring_profiles_batch(); + #} else { + # ($result,$warning_count) = set_barring_profiles(); + #} + }; + my $err = $@; + my $stats = ($skip_errors ? ": $warning_count warnings" : ''); + eval { + + }; + if ($err or !$result) { + push(@$messages,"fix contract balances gaps INCOMPLETE$stats"); + } else { + push(@$messages,"fix contract balances gaps completed$stats"); + } + destroy_dbs(); #every task should leave with closed connections. + return $result; + +} + +#END { +# # this should not be required explicitly, but prevents Log4Perl's +# # "rootlogger not initialized error upon exit.. +# destroy_all_dbs +#} + +__DATA__ +This exists to allow the locking code at the beginning of the file to work. +DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.cfg new file mode 100644 index 0000000..7c776e0 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Disaster/Balances/settings.cfg @@ -0,0 +1,4 @@ + +#dry=0 +#skip_errors=0 + diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm index 75b2937..e4fecd8 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/IPGallery/Settings.pm @@ -240,10 +240,10 @@ sub update_settings { $import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading}; $features_define_filename = _get_import_filename($features_define_filename,$data,'features_define_filename'); - $features_define_import_numofthreads =_get_import_numofthreads($cpucount,$data,'features_define_import_numofthreads'); + $features_define_import_numofthreads =_get_numofthreads($cpucount,$data,'features_define_import_numofthreads'); $subscriber_define_filename = _get_import_filename($subscriber_define_filename,$data,'subscriber_define_filename'); - $subscriber_define_import_numofthreads = _get_import_numofthreads($cpucount,$data,'subscriber_define_import_numofthreads'); + $subscriber_define_import_numofthreads = _get_numofthreads($cpucount,$data,'subscriber_define_import_numofthreads'); $subscribernumer_exclude_pattern = $data->{subscribernumer_exclude_pattern} if exists $data->{subscribernumer_exclude_pattern}; ($regexp_result,$subscribernumer_exclude_pattern) = parse_regexp($subscribernumer_exclude_pattern,$configfile); @@ -257,16 +257,16 @@ sub update_settings { $result &= $regexp_result; $lnp_define_filename = _get_import_filename($lnp_define_filename,$data,'lnp_define_filename'); - $lnp_define_import_numofthreads = _get_import_numofthreads($cpucount,$data,'lnp_define_import_numofthreads'); + $lnp_define_import_numofthreads = _get_numofthreads($cpucount,$data,'lnp_define_import_numofthreads'); $user_password_filename = _get_import_filename($user_password_filename,$data,'user_password_filename'); - $user_password_import_numofthreads = _get_import_numofthreads($cpucount,$data,'user_password_import_numofthreads'); + $user_password_import_numofthreads = _get_numofthreads($cpucount,$data,'user_password_import_numofthreads'); $username_prefix = $data->{username_prefix} if exists $data->{username_prefix}; $min_password_length = $data->{min_password_length} if exists $data->{min_password_length}; $batch_filename = _get_import_filename($batch_filename,$data,'batch_filename'); - $batch_import_numofthreads = _get_import_numofthreads($cpucount,$data,'batch_import_numofthreads'); + $batch_import_numofthreads = _get_numofthreads($cpucount,$data,'batch_import_numofthreads'); $reseller_id = $data->{reseller_id} if exists $data->{reseller_id}; $domain_name = $data->{domain_name} if exists $data->{domain_name}; @@ -285,20 +285,20 @@ sub update_settings { $subsciber_username_prefix = $data->{subsciber_username_prefix} if exists $data->{subsciber_username_prefix}; $provision_subscriber_multithreading = $data->{provision_subscriber_multithreading} if exists $data->{provision_subscriber_multithreading}; - $provision_subscriber_numofthreads = _get_import_numofthreads($cpucount,$data,'provision_subscriber_numofthreads'); + $provision_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_subscriber_numofthreads'); $reprovision_upon_password_change = $data->{reprovision_upon_password_change} if exists $data->{reprovision_upon_password_change}; $always_update_subscriber = $data->{always_update_subscriber} if exists $data->{always_update_subscriber}; $set_barring_profiles_multithreading = $data->{set_barring_profiles_multithreading} if exists $data->{set_barring_profiles_multithreading}; - $set_barring_profiles_numofthreads = _get_import_numofthreads($cpucount,$data,'set_barring_profiles_numofthreads'); + $set_barring_profiles_numofthreads = _get_numofthreads($cpucount,$data,'set_barring_profiles_numofthreads'); $barring_profiles_yml = $data->{barring_profiles_yml} if exists $data->{barring_profiles_yml}; $set_peer_auth_multithreading = $data->{set_peer_auth_multithreading} if exists $data->{set_peer_auth_multithreading}; - $set_peer_auth_numofthreads = _get_import_numofthreads($cpucount,$data,'set_peer_auth_numofthreads'); + $set_peer_auth_numofthreads = _get_numofthreads($cpucount,$data,'set_peer_auth_numofthreads'); $peer_auth_realm = $data->{peer_auth_realm} if exists $data->{peer_auth_realm}; $set_allowed_ips_multithreading = $data->{set_peer_auth_multithreading} if exists $data->{set_allowed_ips_multithreading}; - $set_allowed_ips_numofthreads = _get_import_numofthreads($cpucount,$data,'set_allowed_ips_numofthreads'); + $set_allowed_ips_numofthreads = _get_numofthreads($cpucount,$data,'set_allowed_ips_numofthreads'); $allowed_ips = [ split_tuple($data->{allowed_ips}) ] if exists $data->{allowed_ips}; foreach my $ipnet (@$allowed_ips) { if (not check_ipnet($ipnet)) { @@ -308,7 +308,7 @@ sub update_settings { } $set_call_forwards_multithreading = $data->{set_call_forwards_multithreading} if exists $data->{set_call_forwards_multithreading}; - $set_call_forwards_numofthreads = _get_import_numofthreads($cpucount,$data,'set_call_forwards_numofthreads'); + $set_call_forwards_numofthreads = _get_numofthreads($cpucount,$data,'set_call_forwards_numofthreads'); $cfb_priorities = [ split_tuple($data->{cfb_priorities}) ] if exists $data->{cfb_priorities}; $cfb_timeouts = [ split_tuple($data->{cfb_timeouts}) ] if exists $data->{cfb_timeouts}; $cfu_priorities = [ split_tuple($data->{cfu_priorities}) ] if exists $data->{cfu_priorities}; @@ -330,11 +330,11 @@ sub update_settings { } $create_lnps_multithreading = $data->{create_lnps_multithreading} if exists $data->{create_lnps_multithreading}; - $create_lnps_numofthreads = _get_import_numofthreads($cpucount,$data,'create_lnps_numofthreads'); + $create_lnps_numofthreads = _get_numofthreads($cpucount,$data,'create_lnps_numofthreads'); $create_lnp_block_txn = $data->{create_lnp_block_txn} if exists $data->{create_lnp_block_txn}; $set_preference_bulk_multithreading = $data->{set_preference_bulk_multithreading} if exists $data->{set_preference_bulk_multithreading}; - $set_preference_bulk_numofthreads = _get_import_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads'); + $set_preference_bulk_numofthreads = _get_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads'); $concurrent_max_total = $data->{concurrent_max_total} if exists $data->{concurrent_max_total}; if (defined $concurrent_max_total and $concurrent_max_total <= 0) { configurationerror($configfile,'empty concurrent_max_total or greater than 0 required',getlogger(__PACKAGE__)); @@ -388,12 +388,12 @@ sub _prepare_working_paths { } -sub _get_import_numofthreads { +sub _get_numofthreads { my ($default_value,$data,$key) = @_; - my $import_numofthreads = $default_value; - $import_numofthreads = $data->{$key} if exists $data->{$key}; - $import_numofthreads = $cpucount if $import_numofthreads > $cpucount; - return $import_numofthreads; + my $_numofthreads = $default_value; + $_numofthreads = $data->{$key} if exists $data->{$key}; + $_numofthreads = $cpucount if $_numofthreads > $cpucount; + return $_numofthreads; } sub _get_import_db_file { diff --git a/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm b/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm index 14469ff..09ba722 100644 --- a/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm +++ b/lib/NGCP/BulkProcessor/RestConnectors/NGCPRestApi.pm @@ -24,7 +24,7 @@ use NGCP::BulkProcessor::LogError qw( use NGCP::BulkProcessor::RestConnector qw(_add_headers); -use NGCP::BulkProcessor::FakeTime qw(get_fake_now_string); +use NGCP::BulkProcessor::Calendar qw(get_fake_now_string); require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::RestConnector); diff --git a/lib/NGCP/BulkProcessor/RestProcessor.pm b/lib/NGCP/BulkProcessor/RestProcessor.pm index 5fee843..f44f870 100644 --- a/lib/NGCP/BulkProcessor/RestProcessor.pm +++ b/lib/NGCP/BulkProcessor/RestProcessor.pm @@ -8,7 +8,7 @@ use threads::shared; use Thread::Queue; use Time::HiRes qw(sleep); -use URI::Escape qw(); +#use URI::Escape qw(); use NGCP::BulkProcessor::Globals qw( $enablemultithreading @@ -27,7 +27,7 @@ use NGCP::BulkProcessor::LogError qw( restprocessingfailed ); -use NGCP::BulkProcessor::Utils qw(threadid); +use NGCP::BulkProcessor::Utils qw(threadid urlencode urldecode); require Exporter; our @ISA = qw(Exporter); @@ -55,7 +55,8 @@ sub get_query_string { } else { $query .= '&'; } - $query .= URI::Escape::uri_escape($param) . '=' . URI::Escape::uri_escape($filters->{$param}); + #$query .= URI::Escape::uri_escape($param) . '=' . URI::Escape::uri_escape($filters->{$param}); + $query .= urlencode($param) . '=' . urlencode($filters->{$param}); } return $query; }; diff --git a/lib/NGCP/BulkProcessor/SqlConnector.pm b/lib/NGCP/BulkProcessor/SqlConnector.pm index b2e93db..ab03fff 100644 --- a/lib/NGCP/BulkProcessor/SqlConnector.pm +++ b/lib/NGCP/BulkProcessor/SqlConnector.pm @@ -26,6 +26,7 @@ use DBI; use NGCP::BulkProcessor::Utils qw(threadid); use NGCP::BulkProcessor::Array qw(arrayeq); use NGCP::BulkProcessor::RandomString qw(createtmpstring); +use NGCP::BulkProcessor::Calendar qw(); require Exporter; our @ISA = qw(Exporter); @@ -456,6 +457,18 @@ sub _fetch_error { } +sub datetime_to_string { + my $self = shift; + my ($dt) = @_; + return NGCP::BulkProcessor::Calendar::datetime_to_string($dt); +} + +sub datetime_from_string { + my $self = shift; + my ($s,$tz) = @_; + return NGCP::BulkProcessor::Calendar::datetime_from_string($s,$tz); +} + # "The data type is 'sticky' in that bind values passed to execute() are bound with # the data type specified by earlier bind_param() calls, if any." sub _bind_params {