this tool provides loading and updating the ngcp LNP
database from customer-specific textfiles with many
GBytes and tens of mio. of rows. in a *fast* way.
Change-Id: Ie64b1a9c7dfe3251c41dd443ed937cf8da18cb36
(cherry picked from commit d569230722)
mr7.5.5
parent
ec78f19236
commit
c4fc744545
@ -0,0 +1,326 @@
|
||||
package NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp;
|
||||
use strict;
|
||||
|
||||
## no critic
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw(
|
||||
get_sqlite_db
|
||||
destroy_all_dbs
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::SqlProcessor qw(
|
||||
registertableinfo
|
||||
create_targettable
|
||||
checktableinfo
|
||||
copy_row
|
||||
|
||||
insert_stmt
|
||||
process_table
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::SqlRecord qw();
|
||||
|
||||
require Exporter;
|
||||
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
|
||||
our @EXPORT_OK = qw(
|
||||
create_table
|
||||
gettablename
|
||||
check_table
|
||||
getinsertstatement
|
||||
getupsertstatement
|
||||
|
||||
@fieldnames
|
||||
has_rows
|
||||
|
||||
update_delta
|
||||
countby_delta
|
||||
|
||||
$deleted_delta
|
||||
$updated_delta
|
||||
$added_delta
|
||||
|
||||
find_carriers_by_delta
|
||||
|
||||
process_records
|
||||
);
|
||||
|
||||
my $tablename = 'lnp';
|
||||
my $get_db = \&get_sqlite_db;
|
||||
|
||||
our @fieldnames = (
|
||||
'carrier_name',
|
||||
'carrier_prefix',
|
||||
'number',
|
||||
'routing_number',
|
||||
'start',
|
||||
'end',
|
||||
'authoritative',
|
||||
'skip_rewrite',
|
||||
'type',
|
||||
#calculated fields at the end!
|
||||
#'rownum',
|
||||
#'filenum',
|
||||
#'filename',
|
||||
);
|
||||
|
||||
my $expected_fieldnames = [
|
||||
@fieldnames,
|
||||
'delta',
|
||||
];
|
||||
|
||||
# table creation:
|
||||
my $primarykey_fieldnames = [ 'number' ];
|
||||
my $indexes = {
|
||||
#$tablename . '_number' => [ 'number(32)' ],
|
||||
#$tablename . '_rownum' => [ 'rownum(11)' ],
|
||||
$tablename . '_delta' => [ 'delta(7)' ],
|
||||
$tablename . '_carrier_delta' => [ 'carrier_name(255)', 'carrier_prefix(32)', 'authoritative(1)', 'skip_rewrite(1)', 'delta(7)' ],
|
||||
};
|
||||
#my $fixtable_statements = [];
|
||||
|
||||
our $deleted_delta = 'DELETED';
|
||||
our $updated_delta = 'UPDATED';
|
||||
our $added_delta = 'ADDED';
|
||||
|
||||
sub new {
|
||||
|
||||
my $class = shift;
|
||||
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
|
||||
$tablename,$expected_fieldnames,$indexes);
|
||||
|
||||
copy_row($self,shift,$expected_fieldnames);
|
||||
|
||||
return $self;
|
||||
|
||||
}
|
||||
|
||||
sub create_table {
|
||||
|
||||
my ($truncate) = @_;
|
||||
|
||||
my $db = &$get_db();
|
||||
|
||||
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
|
||||
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
|
||||
|
||||
}
|
||||
|
||||
sub find_carriers_by_delta {
|
||||
|
||||
my ($deltas,$load_recursive) = @_;
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
my $stmt = '';
|
||||
my @params = ();
|
||||
if (defined $deltas and 'HASH' eq ref $deltas) {
|
||||
foreach my $in (keys %$deltas) {
|
||||
my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in}));
|
||||
$stmt .= ' AND ' if length($stmt);
|
||||
$stmt .= $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
|
||||
push(@params,@values);
|
||||
}
|
||||
} elsif (defined $deltas and length($deltas) > 0) {
|
||||
$stmt = $db->columnidentifier('delta') . ' = ?';
|
||||
push(@params,$deltas);
|
||||
}
|
||||
$stmt = ' WHERE ' . $stmt if length($stmt);
|
||||
|
||||
$stmt = 'SELECT * FROM ' . $table . $stmt . ' GROUP BY '
|
||||
. $db->columnidentifier('carrier_name')
|
||||
. ', ' . $db->columnidentifier('carrier_prefix')
|
||||
. ', ' . $db->columnidentifier('authoritative')
|
||||
. ', ' . $db->columnidentifier('skip_rewrite');
|
||||
|
||||
my $rows = $db->db_get_all_arrayref($stmt,@params);
|
||||
|
||||
return buildrecords_fromrows($rows,$load_recursive);
|
||||
|
||||
}
|
||||
|
||||
sub update_delta {
|
||||
|
||||
my ($number,$delta) = @_;
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
my $stmt = 'UPDATE ' . $table . ' SET delta = ?';
|
||||
my @params = ();
|
||||
push(@params,$delta);
|
||||
if (defined $number) {
|
||||
$stmt .= ' WHERE ' . $db->columnidentifier('number') . ' = ?';
|
||||
push(@params,$number);
|
||||
}
|
||||
|
||||
return $db->db_do($stmt,@params);
|
||||
|
||||
}
|
||||
|
||||
sub countby_delta {
|
||||
|
||||
my ($deltas) = @_;
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
my $stmt = '';
|
||||
my @params = ();
|
||||
if (defined $deltas and 'HASH' eq ref $deltas) {
|
||||
foreach my $in (keys %$deltas) {
|
||||
my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in}));
|
||||
$stmt .= ' AND ' if length($stmt);
|
||||
$stmt .= $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
|
||||
push(@params,@values);
|
||||
}
|
||||
} elsif (defined $deltas and length($deltas) > 0) {
|
||||
$stmt = $db->columnidentifier('delta') . ' = ?';
|
||||
push(@params,$deltas);
|
||||
}
|
||||
$stmt = ' WHERE ' . $stmt if length($stmt);
|
||||
|
||||
$stmt = 'SELECT COUNT(*) FROM ' . $table . $stmt;
|
||||
|
||||
return $db->db_get_value($stmt,@params);
|
||||
|
||||
}
|
||||
|
||||
sub has_rows {
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
my $stmt = 'SELECT COUNT(1) FROM (SELECT 1 FROM ' . $table . ' LIMIT 1) AS q';
|
||||
|
||||
return $db->db_get_value($stmt);
|
||||
}
|
||||
|
||||
sub buildrecords_fromrows {
|
||||
|
||||
my ($rows,$load_recursive) = @_;
|
||||
|
||||
my @records = ();
|
||||
my $record;
|
||||
|
||||
if (defined $rows and ref $rows eq 'ARRAY') {
|
||||
foreach my $row (@$rows) {
|
||||
$record = __PACKAGE__->new($row);
|
||||
|
||||
# transformations go here ...
|
||||
|
||||
push @records,$record;
|
||||
}
|
||||
}
|
||||
|
||||
return \@records;
|
||||
|
||||
}
|
||||
|
||||
sub process_records {
|
||||
|
||||
my %params = @_;
|
||||
my ($process_code,
|
||||
$static_context,
|
||||
$init_process_context_code,
|
||||
$uninit_process_context_code,
|
||||
$multithreading,
|
||||
$numofthreads,
|
||||
$deltas) = @params{qw/
|
||||
process_code
|
||||
static_context
|
||||
init_process_context_code
|
||||
uninit_process_context_code
|
||||
multithreading
|
||||
numofthreads
|
||||
deltas
|
||||
/};
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
|
||||
my @terms = ();
|
||||
if (defined $deltas and 'HASH' eq ref $deltas) {
|
||||
foreach my $in (keys %$deltas) {
|
||||
my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in}));
|
||||
push(@terms,$db->columnidentifier('delta') . ' ' . $in . ' ("' . join('","',@values) . '")');
|
||||
}
|
||||
} elsif (defined $deltas and length($deltas) > 0) {
|
||||
push(@terms,$db->columnidentifier('delta') . ' = "' . $deltas . '"');
|
||||
}
|
||||
|
||||
return process_table(
|
||||
get_db => $get_db,
|
||||
class => __PACKAGE__,
|
||||
process_code => sub {
|
||||
my ($context,$rowblock,$row_offset) = @_;
|
||||
return &$process_code($context,$rowblock,$row_offset);
|
||||
},
|
||||
static_context => $static_context,
|
||||
init_process_context_code => $init_process_context_code,
|
||||
uninit_process_context_code => $uninit_process_context_code,
|
||||
destroy_reader_dbs_code => \&destroy_all_dbs,
|
||||
multithreading => $multithreading,
|
||||
tableprocessing_threads => $numofthreads,
|
||||
((scalar @terms) ? ('select' => 'SELECT * FROM ' . $table . ' WHERE ' . join (' AND ',@terms)) : ()),
|
||||
((scalar @terms) ? ('selectcount' => 'SELECT COUNT(1) FROM ' . $table . ' WHERE ' . join (' AND ',@terms)) : ()),
|
||||
);
|
||||
}
|
||||
|
||||
sub carrier_hash {
|
||||
my $self = shift;
|
||||
return ($self->{carrier_name} // '') . '-' . ($self->{carrier_prefix} // '')
|
||||
. '-' . ($self->{authoritative} // '') . '-' . ($self->{skip_rewrite} // '');
|
||||
}
|
||||
|
||||
sub getinsertstatement {
|
||||
|
||||
my ($insert_ignore) = @_;
|
||||
check_table();
|
||||
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
|
||||
|
||||
}
|
||||
|
||||
sub getupsertstatement {
|
||||
|
||||
check_table();
|
||||
my $db = &$get_db();
|
||||
my $table = $db->tableidentifier($tablename);
|
||||
my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' .
|
||||
join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')';
|
||||
my @values = ();
|
||||
foreach my $fieldname (@$expected_fieldnames) {
|
||||
if ('delta' eq $fieldname) {
|
||||
my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' .
|
||||
$db->columnidentifier('number') . ' = ?';
|
||||
push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')');
|
||||
} else {
|
||||
push(@values,'?');
|
||||
}
|
||||
}
|
||||
$upsert_stmt .= ' VALUES (' . join(',',@values) . ')';
|
||||
return $upsert_stmt;
|
||||
|
||||
}
|
||||
|
||||
sub gettablename {
|
||||
|
||||
return $tablename;
|
||||
|
||||
}
|
||||
|
||||
sub check_table {
|
||||
|
||||
return checktableinfo($get_db,
|
||||
__PACKAGE__,$tablename,
|
||||
$expected_fieldnames,
|
||||
$indexes);
|
||||
|
||||
}
|
||||
|
||||
1;
|
||||
@ -0,0 +1,66 @@
|
||||
package NGCP::BulkProcessor::Projects::ETL::Lnp::FileProcessors::NumbersFile;
|
||||
use strict;
|
||||
|
||||
## no critic
|
||||
|
||||
use Encode qw(decode);
|
||||
|
||||
use NGCP::BulkProcessor::Logging qw(
|
||||
getlogger
|
||||
);
|
||||
use NGCP::BulkProcessor::LogError qw(
|
||||
fileprocessingerror
|
||||
fileprocessingwarn
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw(
|
||||
$expand_numbers_code
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::FileProcessor;
|
||||
|
||||
use NGCP::BulkProcessor::Array qw(contains);
|
||||
|
||||
require Exporter;
|
||||
our @ISA = qw(Exporter NGCP::BulkProcessor::FileProcessor);
|
||||
our @EXPORT_OK = qw();
|
||||
|
||||
my $lineseparator = '\\r\\n|\\r|\\n|\\s'; #\\n\\r
|
||||
my $default_encoding = 'UTF-8';
|
||||
|
||||
my $buffersize = 1000 * 1024;
|
||||
my $threadqueuelength = 10;
|
||||
my $default_numofthreads = 3;
|
||||
#my $multithreading = 0;
|
||||
my $blocksize = 100;
|
||||
|
||||
sub new {
|
||||
|
||||
my $class = shift;
|
||||
|
||||
my $self = NGCP::BulkProcessor::FileProcessor->new(@_);
|
||||
|
||||
$self->{numofthreads} = shift // $default_numofthreads;
|
||||
$self->{line_separator} = $lineseparator;
|
||||
$self->{field_separator} = undef;
|
||||
$self->{encoding} = shift // $default_encoding;
|
||||
$self->{buffersize} = $buffersize;
|
||||
$self->{threadqueuelength} = $threadqueuelength;
|
||||
#$self->{multithreading} = $multithreading;
|
||||
$self->{blocksize} = $blocksize;
|
||||
|
||||
bless($self,$class);
|
||||
|
||||
#restdebug($self,__PACKAGE__ . ' file processor created',getlogger(__PACKAGE__));
|
||||
|
||||
return $self;
|
||||
|
||||
}
|
||||
|
||||
sub extractfields {
|
||||
my ($context,$line_ref) = @_;
|
||||
return $expand_numbers_code->($context,$$line_ref);
|
||||
|
||||
}
|
||||
|
||||
1;
|
||||
@ -0,0 +1,190 @@
|
||||
package NGCP::BulkProcessor::Projects::ETL::Lnp::Import;
|
||||
use strict;
|
||||
|
||||
## no critic
|
||||
|
||||
use threads::shared qw();
|
||||
|
||||
#use Encode qw();
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw(
|
||||
$import_multithreading
|
||||
|
||||
$lnp_filename
|
||||
$lnp_rownum_start
|
||||
$lnp_import_numofthreads
|
||||
$ignore_lnp_unique
|
||||
$lnp_import_single_row_txn
|
||||
|
||||
$expand_numbers_code
|
||||
|
||||
$skip_errors
|
||||
|
||||
);
|
||||
use NGCP::BulkProcessor::Logging qw (
|
||||
getlogger
|
||||
processing_info
|
||||
processing_debug
|
||||
);
|
||||
use NGCP::BulkProcessor::LogError qw(
|
||||
fileprocessingwarn
|
||||
fileprocessingerror
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::FileProcessors::CSVFileSimple qw();
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::FileProcessors::NumbersFile qw();
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw(
|
||||
get_sqlite_db
|
||||
destroy_all_dbs
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp qw();
|
||||
|
||||
use NGCP::BulkProcessor::Utils qw(threadid trim);
|
||||
|
||||
require Exporter;
|
||||
our @ISA = qw(Exporter);
|
||||
our @EXPORT_OK = qw(
|
||||
load_file
|
||||
);
|
||||
|
||||
sub load_file {
|
||||
|
||||
my $result = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::create_table(0);
|
||||
|
||||
my $importer;
|
||||
if (defined $expand_numbers_code) {
|
||||
$importer = NGCP::BulkProcessor::Projects::ETL::Lnp::FileProcessors::NumbersFile->new($lnp_import_numofthreads);
|
||||
} else {
|
||||
$importer = NGCP::BulkProcessor::FileProcessors::CSVFileSimple->new($lnp_import_numofthreads);
|
||||
}
|
||||
|
||||
my $upsert = _lnp_reset_delta();
|
||||
|
||||
destroy_all_dbs(); #close all db connections before forking..
|
||||
my $warning_count :shared = 0;
|
||||
return ($result && $importer->process(
|
||||
file => $lnp_filename,
|
||||
process_code => sub {
|
||||
my ($context,$rows,$row_offset) = @_;
|
||||
my $rownum = $row_offset;
|
||||
$context->{lnp_rows} = [];
|
||||
foreach my $row (@$rows) {
|
||||
$rownum++;
|
||||
next if (defined $lnp_rownum_start and $rownum < $lnp_rownum_start);
|
||||
next if (scalar @$row) == 0;
|
||||
#$row = [ map { local $_ = $_; trim($_); $_ =~ s/^"//; $_ =~ s/"$//r; } @$row ];
|
||||
my $record = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp->new([
|
||||
map { local $_ = $_; trim($_); $_ =~ s/^"//; $_ =~ s/"$//r; } @$row
|
||||
]);
|
||||
#$record->{number} = $record->{cc} . $record->{ac} . $record->{sn};
|
||||
|
||||
my %r = %$record; my @row_ext = @r{@NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::fieldnames};
|
||||
if ($context->{upsert}) {
|
||||
push(@row_ext,$record->{number});
|
||||
} else {
|
||||
push(@row_ext,$NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta);
|
||||
}
|
||||
push(@{$context->{lnp_rows}},\@row_ext);
|
||||
if ($lnp_import_single_row_txn and (scalar @{$context->{lnp_rows}}) > 0) {
|
||||
while (defined (my $lnp_row = shift @{$context->{lnp_rows}})) {
|
||||
if ($skip_errors) {
|
||||
eval { _insert_lnp_rows($context,[$lnp_row]); };
|
||||
_warn($context,$@) if $@;
|
||||
} else {
|
||||
_insert_lnp_rows($context,[$lnp_row]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (not $lnp_import_single_row_txn and (scalar @{$context->{lnp_rows}}) > 0) {
|
||||
if ($skip_errors) {
|
||||
eval { _insert_lnp_rows($context,$context->{lnp_rows}); };
|
||||
_warn($context,$@) if $@;
|
||||
} else {
|
||||
_insert_lnp_rows($context,$context->{lnp_rows});
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
},
|
||||
init_process_context_code => sub {
|
||||
my ($context)= @_;
|
||||
$context->{db} = &get_sqlite_db();
|
||||
$context->{upsert} = $upsert;
|
||||
$context->{error_count} = 0;
|
||||
$context->{warning_count} = 0;
|
||||
},
|
||||
uninit_process_context_code => sub {
|
||||
my ($context)= @_;
|
||||
undef $context->{db};
|
||||
destroy_all_dbs();
|
||||
{
|
||||
lock $warning_count;
|
||||
$warning_count += $context->{warning_count};
|
||||
}
|
||||
},
|
||||
multithreading => $import_multithreading,
|
||||
),$warning_count);
|
||||
|
||||
}
|
||||
|
||||
sub _insert_lnp_rows {
|
||||
my ($context,$lnp_rows) = @_;
|
||||
$context->{db}->db_do_begin(($context->{upsert} ?
|
||||
NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::getupsertstatement()
|
||||
: NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::getinsertstatement($ignore_lnp_unique)),
|
||||
);
|
||||
eval {
|
||||
$context->{db}->db_do_rowblock($lnp_rows);
|
||||
$context->{db}->db_finish();
|
||||
};
|
||||
my $err = $@;
|
||||
if ($err) {
|
||||
eval {
|
||||
$context->{db}->db_finish(1);
|
||||
};
|
||||
die($err);
|
||||
}
|
||||
}
|
||||
|
||||
sub _lnp_reset_delta {
|
||||
my $upsert = 0;
|
||||
if (NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::has_rows()) {
|
||||
processing_info(threadid(),'resetting delta of ' .
|
||||
NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::update_delta(undef,
|
||||
$NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta) .
|
||||
' lnp records',getlogger(__PACKAGE__));
|
||||
$upsert |= 1;
|
||||
}
|
||||
return $upsert;
|
||||
}
|
||||
|
||||
sub _error {
|
||||
|
||||
my ($context,$message) = @_;
|
||||
$context->{error_count} = $context->{error_count} + 1;
|
||||
fileprocessingerror($context->{filename},$message,getlogger(__PACKAGE__));
|
||||
|
||||
}
|
||||
|
||||
sub _warn {
|
||||
|
||||
my ($context,$message) = @_;
|
||||
$context->{warning_count} = $context->{warning_count} + 1;
|
||||
fileprocessingwarn($context->{filename},$message,getlogger(__PACKAGE__));
|
||||
|
||||
}
|
||||
|
||||
sub _info {
|
||||
|
||||
my ($context,$message,$debug) = @_;
|
||||
if ($debug) {
|
||||
processing_debug($context->{tid},$message,getlogger(__PACKAGE__));
|
||||
} else {
|
||||
processing_info($context->{tid},$message,getlogger(__PACKAGE__));
|
||||
}
|
||||
}
|
||||
|
||||
1;
|
||||
@ -0,0 +1,322 @@
|
||||
package NGCP::BulkProcessor::Projects::ETL::Lnp::ProcessLnp;
|
||||
use strict;
|
||||
|
||||
## no critic
|
||||
|
||||
use threads::shared qw();
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw(
|
||||
|
||||
$skip_errors
|
||||
|
||||
$create_lnp_multithreading
|
||||
$create_lnp_numofthreads
|
||||
|
||||
$delete_lnp_multithreading
|
||||
$delete_lnp_numofthreads
|
||||
|
||||
$ignore_lnp_numbers_unique
|
||||
$lnp_numbers_single_row_txn
|
||||
|
||||
$lnp_numbers_batch_delete
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::Logging qw (
|
||||
getlogger
|
||||
processing_info
|
||||
processing_debug
|
||||
);
|
||||
use NGCP::BulkProcessor::LogError qw(
|
||||
rowprocessingerror
|
||||
rowprocessingwarn
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw();
|
||||
use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw();
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp qw();
|
||||
|
||||
use NGCP::BulkProcessor::ConnectorPool qw(
|
||||
get_xa_db
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw(
|
||||
get_sqlite_db
|
||||
destroy_all_dbs
|
||||
ping_all_dbs
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::Utils qw(threadid);
|
||||
|
||||
require Exporter;
|
||||
our @ISA = qw(Exporter);
|
||||
our @EXPORT_OK = qw(
|
||||
create_lnp_numbers
|
||||
delete_lnp_numbers
|
||||
);
|
||||
|
||||
sub create_lnp_numbers {
|
||||
|
||||
my $static_context = {};
|
||||
my $result = _create_lnp_numbers_checks($static_context);
|
||||
|
||||
destroy_all_dbs();
|
||||
my $warning_count :shared = 0;
|
||||
my $result = $result && NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::process_records(
|
||||
static_context => $static_context,
|
||||
deltas => $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta,
|
||||
process_code => sub {
|
||||
my ($context,$records,$row_offset) = @_;
|
||||
ping_all_dbs();
|
||||
foreach my $row (@$records) {
|
||||
my $lnp = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp->new($row);
|
||||
my $lnp_number = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers->new({ %$lnp });
|
||||
my $lnp_provider = $context->{carrier_map}->{
|
||||
$lnp->carrier_hash()
|
||||
};
|
||||
$lnp_number->{lnp_provider_id} = $lnp_provider->{id};
|
||||
|
||||
my %r = %$lnp_number; my @row_ext = @r{@NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::fieldnames};
|
||||
|
||||
push(@{$context->{lnp_numbers}},\@row_ext);
|
||||
if ($lnp_numbers_single_row_txn and (scalar @{$context->{lnp_numbers}}) > 0) {
|
||||
while (defined (my $lnp_number = shift @{$context->{lnp_numbers}})) {
|
||||
if ($skip_errors) {
|
||||
eval { _insert_lnp_numbers($context,[$lnp_number]); };
|
||||
_warn($context,$@) if $@;
|
||||
} else {
|
||||
_insert_lnp_numbers($context,[$lnp_number]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (not $lnp_numbers_single_row_txn and (scalar @{$context->{lnp_numbers}}) > 0) {
|
||||
if ($skip_errors) {
|
||||
eval { _insert_lnp_numbers($context,$context->{lnp_numbers}); };
|
||||
_warn($context,$@) if $@;
|
||||
} else {
|
||||
_insert_lnp_numbers($context,$context->{lnp_numbers});
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
},
|
||||
init_process_context_code => sub {
|
||||
my ($context)= @_;
|
||||
$context->{db} = &get_xa_db();
|
||||
$context->{error_count} = 0;
|
||||
$context->{warning_count} = 0;
|
||||
$context->{lnp_numbers} = [];
|
||||
},
|
||||
uninit_process_context_code => sub {
|
||||
my ($context)= @_;
|
||||
undef $context->{db};
|
||||
destroy_all_dbs();
|
||||
{
|
||||
lock $warning_count;
|
||||
$warning_count += $context->{warning_count};
|
||||
}
|
||||
},
|
||||
multithreading => $create_lnp_multithreading,
|
||||
numofthreads => $create_lnp_numofthreads,
|
||||
);
|
||||
|
||||
return ($result,$warning_count);
|
||||
|
||||
}
|
||||
|
||||
|
||||
sub _create_lnp_numbers_checks {
|
||||
|
||||
my $context = shift;
|
||||
my $result = 1;
|
||||
|
||||
$context->{carrier_map} = {};
|
||||
my $carriers = [];
|
||||
eval {
|
||||
$carriers = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::find_carriers_by_delta($NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta);
|
||||
};
|
||||
if ($@) {
|
||||
$result = 0; #even in skip-error mode..
|
||||
} else {
|
||||
foreach my $carrier (@$carriers) {
|
||||
my $lp = {
|
||||
name => $carrier->{carrier_name},
|
||||
prefix => ($carrier->{carrier_prefix} // ''),
|
||||
authoritative => ($carrier->{authoritative} // 0),
|
||||
skip_rewrite => ($carrier->{skip_rewrite} // 0),
|
||||
};
|
||||
my $lnp_provider = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_nameprefixauthoritativeskiprewrite(
|
||||
$lp->{name},
|
||||
$lp->{prefix},
|
||||
$lp->{authoritative},
|
||||
$lp->{skip_rewrite},
|
||||
)->[0];
|
||||
if ($lnp_provider) {
|
||||
processing_info(threadid(),"lnp provider '$lnp_provider->{name}' found",getlogger(__PACKAGE__));
|
||||
} else {
|
||||
$lnp_provider = { %$lp };
|
||||
$lnp_provider->{id} = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::insert_row(undef,$lp);
|
||||
processing_info(threadid(),"lnp provider '$lnp_provider->{name}' created",getlogger(__PACKAGE__));
|
||||
}
|
||||
$context->{carrier_map}->{
|
||||
$carrier->carrier_hash()
|
||||
} = $lnp_provider;
|
||||
}
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
|
||||
sub _insert_lnp_numbers {
|
||||
my ($context,$lnp_numbers) = @_;
|
||||
$context->{db}->db_do_begin(
|
||||
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::getinsertstatement($ignore_lnp_numbers_unique),
|
||||
);
|
||||
eval {
|
||||
$context->{db}->db_do_rowblock($lnp_numbers);
|
||||
$context->{db}->db_finish();
|
||||
};
|
||||
my $err = $@;
|
||||
if ($err) {
|
||||
eval {
|
||||
$context->{db}->db_finish(1);
|
||||
};
|
||||
die($err);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
sub delete_lnp_numbers {
|
||||
|
||||
my $static_context = {};
|
||||
my $result = 1;
|
||||
|
||||
destroy_all_dbs();
|
||||
my $warning_count :shared = 0;
|
||||
my $result = $result && NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::process_records(
|
||||
static_context => $static_context,
|
||||
deltas => $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta,
|
||||
process_code => sub {
|
||||
my ($context,$records,$row_offset) = @_;
|
||||
ping_all_dbs();
|
||||
foreach my $row (@$records) {
|
||||
my $lnp = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp->new($row);
|
||||
push(@{$context->{numbers}},$lnp->{number});
|
||||
if (not $lnp_numbers_batch_delete and (scalar @{$context->{numbers}}) > 0) {
|
||||
while (defined (my $number = shift @{$context->{numbers}})) {
|
||||
if ($skip_errors) {
|
||||
eval {
|
||||
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},$number);
|
||||
};
|
||||
} else {
|
||||
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},$number);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ($lnp_numbers_batch_delete and (scalar @{$context->{numbers}}) > 0) {
|
||||
if ($skip_errors) {
|
||||
eval {
|
||||
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},{
|
||||
'IN' => $context->{numbers},
|
||||
});
|
||||
};
|
||||
} else {
|
||||
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},{
|
||||
'IN' => $context->{numbers},
|
||||
});
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
},
|
||||
init_process_context_code => sub {
|
||||
my ($context)= @_;
|
||||
$context->{db} = &get_xa_db();
|
||||
$context->{error_count} = 0;
|
||||
$context->{warning_count} = 0;
|
||||
$context->{numbers} = [];
|
||||
},
|
||||
uninit_process_context_code => sub {
|
||||
my ($context)= @_;
|
||||
undef $context->{db};
|
||||
destroy_all_dbs();
|
||||
{
|
||||
lock $warning_count;
|
||||
$warning_count += $context->{warning_count};
|
||||
}
|
||||
},
|
||||
multithreading => $create_lnp_multithreading,
|
||||
numofthreads => $create_lnp_numofthreads,
|
||||
) && _delete_lnp_providers($static_context);
|
||||
|
||||
return ($result,$warning_count);
|
||||
|
||||
}
|
||||
|
||||
|
||||
sub _delete_lnp_providers {
|
||||
|
||||
my $context = shift;
|
||||
my $result = 1;
|
||||
|
||||
my $carriers = [];
|
||||
eval {
|
||||
$carriers = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::find_carriers_by_delta($NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta);
|
||||
};
|
||||
if ($@) {
|
||||
$result = 0; #even in skip-error mode..
|
||||
} else {
|
||||
foreach my $carrier (@$carriers) {
|
||||
my $lp = {
|
||||
name => $carrier->{carrier_name},
|
||||
prefix => ($carrier->{carrier_prefix} // ''),
|
||||
authoritative => ($carrier->{authoritative} // 0),
|
||||
skip_rewrite => ($carrier->{skip_rewrite} // 0),
|
||||
};
|
||||
foreach my $lnp_provider (@{NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_nameprefixauthoritativeskiprewrite(
|
||||
$lp->{name},
|
||||
$lp->{prefix},
|
||||
$lp->{authoritative},
|
||||
$lp->{skip_rewrite},
|
||||
)}) {
|
||||
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::delete_row(undef,$lnp_provider);
|
||||
processing_info(threadid(),"lnp provider '$lnp_provider->{name}' removed",getlogger(__PACKAGE__));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
|
||||
sub _error {
|
||||
|
||||
my ($context,$message) = @_;
|
||||
$context->{error_count} = $context->{error_count} + 1;
|
||||
rowprocessingerror($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
|
||||
|
||||
}
|
||||
|
||||
sub _warn {
|
||||
|
||||
my ($context,$message) = @_;
|
||||
$context->{warning_count} = $context->{warning_count} + 1;
|
||||
rowprocessingwarn($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
|
||||
|
||||
}
|
||||
|
||||
sub _info {
|
||||
|
||||
my ($context,$message,$debug) = @_;
|
||||
if ($debug) {
|
||||
processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
|
||||
} else {
|
||||
processing_info($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
1;
|
||||
@ -0,0 +1,92 @@
|
||||
package NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool;
|
||||
use strict;
|
||||
|
||||
## no critic
|
||||
|
||||
use File::Basename;
|
||||
use Cwd;
|
||||
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../');
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw(
|
||||
$sqlite_db_file
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::ConnectorPool qw(
|
||||
get_connectorinstancename
|
||||
ping
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::SqlConnectors::MySQLDB;
|
||||
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode);
|
||||
#use NGCP::BulkProcessor::RestConnectors::NGCPRestApi;
|
||||
|
||||
use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo);
|
||||
|
||||
require Exporter;
|
||||
our @ISA = qw(Exporter);
|
||||
our @EXPORT_OK = qw(
|
||||
|
||||
get_sqlite_db
|
||||
sqlite_db_tableidentifier
|
||||
|
||||
destroy_dbs
|
||||
destroy_all_dbs
|
||||
|
||||
ping_dbs
|
||||
ping_all_dbs
|
||||
);
|
||||
|
||||
my $sqlite_dbs = {};
|
||||
|
||||
sub get_sqlite_db {
|
||||
|
||||
my ($instance_name,$reconnect) = @_;
|
||||
my $name = get_connectorinstancename($instance_name); #threadid(); #shift;
|
||||
|
||||
if (not defined $sqlite_dbs->{$name}) {
|
||||
$sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); #$name);
|
||||
if (not defined $reconnect) {
|
||||
$reconnect = 1;
|
||||
}
|
||||
}
|
||||
if ($reconnect) {
|
||||
$sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file);
|
||||
}
|
||||
|
||||
return $sqlite_dbs->{$name};
|
||||
|
||||
}
|
||||
|
||||
sub sqlite_db_tableidentifier {
|
||||
|
||||
my ($get_target_db,$tablename) = @_;
|
||||
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
|
||||
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file));
|
||||
|
||||
}
|
||||
|
||||
sub ping_dbs {
|
||||
|
||||
}
|
||||
|
||||
sub ping_all_dbs {
|
||||
ping_dbs();
|
||||
NGCP::BulkProcessor::ConnectorPool::ping_dbs();
|
||||
}
|
||||
|
||||
sub destroy_dbs {
|
||||
|
||||
foreach my $name (keys %$sqlite_dbs) {
|
||||
cleartableinfo($sqlite_dbs->{$name});
|
||||
undef $sqlite_dbs->{$name};
|
||||
delete $sqlite_dbs->{$name};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
sub destroy_all_dbs() {
|
||||
destroy_dbs();
|
||||
NGCP::BulkProcessor::ConnectorPool::destroy_dbs();
|
||||
}
|
||||
|
||||
1;
|
||||
@ -0,0 +1,218 @@
|
||||
package NGCP::BulkProcessor::Projects::ETL::Lnp::Settings;
|
||||
use strict;
|
||||
|
||||
## no critic
|
||||
|
||||
use threads::shared qw();
|
||||
|
||||
use File::Basename qw(fileparse);
|
||||
|
||||
use NGCP::BulkProcessor::Globals qw(
|
||||
$working_path
|
||||
$enablemultithreading
|
||||
$cpucount
|
||||
create_path
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::Logging qw(
|
||||
getlogger
|
||||
scriptinfo
|
||||
configurationinfo
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::LogError qw(
|
||||
fileerror
|
||||
filewarn
|
||||
configurationwarn
|
||||
configurationerror
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::LoadConfig qw(
|
||||
split_tuple
|
||||
parse_regexp
|
||||
);
|
||||
use NGCP::BulkProcessor::Utils qw(prompt);
|
||||
|
||||
use NGCP::BulkProcessor::Array qw(contains);
|
||||
|
||||
require Exporter;
|
||||
our @ISA = qw(Exporter);
|
||||
our @EXPORT_OK = qw(
|
||||
update_settings
|
||||
|
||||
$sqlite_db_file
|
||||
|
||||
check_dry
|
||||
|
||||
$input_path
|
||||
|
||||
$defaultsettings
|
||||
$defaultconfig
|
||||
|
||||
$dry
|
||||
$skip_errors
|
||||
$force
|
||||
|
||||
$import_multithreading
|
||||
|
||||
$lnp_filename
|
||||
$lnp_rownum_start
|
||||
$lnp_import_numofthreads
|
||||
$ignore_lnp_unique
|
||||
$lnp_import_single_row_txn
|
||||
|
||||
$expand_numbers_code
|
||||
|
||||
$create_lnp_multithreading
|
||||
$create_lnp_numofthreads
|
||||
|
||||
$delete_lnp_multithreading
|
||||
$delete_lnp_numofthreads
|
||||
|
||||
$ignore_lnp_numbers_unique
|
||||
$lnp_numbers_single_row_txn
|
||||
|
||||
$lnp_numbers_batch_delete
|
||||
);
|
||||
|
||||
our $defaultconfig = 'config.cfg';
|
||||
our $defaultsettings = 'settings.yml';
|
||||
|
||||
our $input_path = $working_path . 'input/';
|
||||
|
||||
our $force = 0;
|
||||
our $dry = 0;
|
||||
our $skip_errors = 0;
|
||||
|
||||
our $sqlite_db_file = 'sqlite';
|
||||
|
||||
our $import_multithreading = 1;
|
||||
|
||||
our $lnp_filename = undef;
|
||||
our $lnp_rownum_start = 2;
|
||||
our $lnp_import_numofthreads = $cpucount;
|
||||
our $ignore_lnp_unique = 0;
|
||||
our $lnp_import_single_row_txn = 0;
|
||||
our $expand_numbers_code = undef;
|
||||
|
||||
our $create_lnp_multithreading = 1;
|
||||
our $create_lnp_numofthreads = $cpucount;
|
||||
|
||||
our $delete_lnp_multithreading = 1;
|
||||
our $delete_lnp_numofthreads = $cpucount;
|
||||
|
||||
our $ignore_lnp_numbers_unique = 0;
|
||||
our $lnp_numbers_single_row_txn = 0;
|
||||
|
||||
our $lnp_numbers_batch_delete = 1;
|
||||
|
||||
sub update_settings {
|
||||
|
||||
my ($data,$configfile) = @_;
|
||||
|
||||
if (defined $data) {
|
||||
|
||||
my $result = 1;
|
||||
my $regexp_result;
|
||||
|
||||
#&$configurationinfocode("testinfomessage",$configlogger);
|
||||
|
||||
$result &= _prepare_working_paths(1);
|
||||
|
||||
$sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file};
|
||||
|
||||
$lnp_filename = _get_import_filename($lnp_filename,$data,'lnp_filename');
|
||||
unless ($lnp_filename and -e $lnp_filename) {
|
||||
configurationerror($configfile,"invalid lnp filename",getlogger(__PACKAGE__));
|
||||
}
|
||||
$lnp_rownum_start = $data->{lnp_rownum_start} if exists $data->{lnp_rownum_start};
|
||||
$lnp_import_single_row_txn = $data->{lnp_import_single_row_txn} if exists $data->{lnp_import_single_row_txn};
|
||||
$ignore_lnp_unique = $data->{ignore_lnp_unique} if exists $data->{ignore_lnp_unique};
|
||||
|
||||
$import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading};
|
||||
$lnp_import_numofthreads = _get_numofthreads($lnp_import_numofthreads,$data,'lnp_import_numofthreads');
|
||||
|
||||
$dry = $data->{dry} if exists $data->{dry};
|
||||
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
|
||||
|
||||
$expand_numbers_code = $data->{expand_numbers} if exists $data->{expand_numbers};
|
||||
if (defined $expand_numbers_code and 'CODE' ne ref $expand_numbers_code) {
|
||||
configurationerror($configfile,"expand_numbers coderef required",getlogger(__PACKAGE__));
|
||||
}
|
||||
|
||||
$create_lnp_multithreading = $data->{create_lnp_multithreading} if exists $data->{create_lnp_multithreading};
|
||||
$create_lnp_numofthreads = _get_numofthreads($create_lnp_numofthreads,$data,'create_lnp_numofthreads');
|
||||
|
||||
$delete_lnp_multithreading = $data->{delete_lnp_multithreading} if exists $data->{delete_lnp_multithreading};
|
||||
$delete_lnp_numofthreads = _get_numofthreads($delete_lnp_numofthreads,$data,'delete_lnp_numofthreads');
|
||||
|
||||
$ignore_lnp_numbers_unique = $data->{ignore_lnp_numbers_unique} if exists $data->{ignore_lnp_numbers_unique};
|
||||
$lnp_numbers_single_row_txn = $data->{lnp_numbers_single_row_txn} if exists $data->{lnp_numbers_single_row_txn};
|
||||
|
||||
$lnp_numbers_batch_delete = $data->{lnp_numbers_batch_delete} if exists $data->{lnp_numbers_batch_delete};
|
||||
|
||||
return $result;
|
||||
|
||||
}
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
sub _prepare_working_paths {
|
||||
|
||||
my ($create) = @_;
|
||||
my $result = 1;
|
||||
my $path_result;
|
||||
|
||||
($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__));
|
||||
$result &= $path_result;
|
||||
|
||||
return $result;
|
||||
|
||||
}
|
||||
|
||||
sub _get_numofthreads {
|
||||
my ($default_value,$data,$key) = @_;
|
||||
my $numofthreads = $default_value;
|
||||
$numofthreads = $data->{$key} if exists $data->{$key};
|
||||
$numofthreads = $cpucount if $numofthreads > $cpucount;
|
||||
return $numofthreads;
|
||||
}
|
||||
|
||||
sub _get_sqlite_db_file {
|
||||
my ($run,$name) = @_;
|
||||
return ((defined $run and length($run) > 0) ? $run . '_' : '') . $name;
|
||||
}
|
||||
|
||||
sub _get_import_filename {
|
||||
my ($old_value,$data,$key) = @_;
|
||||
my $import_filename = $old_value;
|
||||
$import_filename = $data->{$key} if exists $data->{$key};
|
||||
if (defined $import_filename and length($import_filename) > 0) {
|
||||
$import_filename = $input_path . $import_filename unless -e $import_filename;
|
||||
}
|
||||
return $import_filename;
|
||||
}
|
||||
|
||||
sub check_dry {
|
||||
|
||||
if ($dry) {
|
||||
scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__));
|
||||
return 1;
|
||||
} else {
|
||||
scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__));
|
||||
if (!$force) {
|
||||
if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
scriptinfo('force option applied',getlogger(__PACKAGE__));
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
1;
|
||||
@ -0,0 +1,62 @@
|
||||
##general settings:
|
||||
working_path = /home/rkrenn/temp/lnp
|
||||
cpucount = 4
|
||||
enablemultithreading = 1
|
||||
|
||||
##gearman/service listener config:
|
||||
jobservers = 127.0.0.1:4730
|
||||
|
||||
##NGCP MySQL connectivity - "accounting" db:
|
||||
accounting_host = 192.168.0.96
|
||||
accounting_port = 3306
|
||||
accounting_databasename = accounting
|
||||
accounting_username = root
|
||||
accounting_password =
|
||||
|
||||
##NGCP MySQL connectivity - "billing" db:
|
||||
billing_host = 192.168.0.96
|
||||
billing_port = 3306
|
||||
billing_databasename = billing
|
||||
billing_username = root
|
||||
billing_password =
|
||||
|
||||
##NGCP MySQL connectivity - "provisioning" db:
|
||||
provisioning_host = 192.168.0.96
|
||||
provisioning_port = 3306
|
||||
provisioning_databasename = provisioning
|
||||
provisioning_username = root
|
||||
provisioning_password =
|
||||
|
||||
##NGCP MySQL connectivity - "kamailio" db:
|
||||
kamailio_host = 192.168.0.96
|
||||
kamailio_port = 3306
|
||||
kamailio_databasename = kamailio
|
||||
kamailio_username = root
|
||||
kamailio_password =
|
||||
|
||||
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
|
||||
xa_host = 192.168.0.96
|
||||
xa_port = 3306
|
||||
xa_databasename = ngcp
|
||||
xa_username = root
|
||||
xa_password =
|
||||
|
||||
##NGCP REST-API connectivity:
|
||||
ngcprestapi_uri = https://127.0.0.1:1443
|
||||
ngcprestapi_username = administrator
|
||||
ngcprestapi_password = administrator
|
||||
ngcprestapi_realm = api_admin_http
|
||||
|
||||
##sending email:
|
||||
emailenable = 0
|
||||
erroremailrecipient =
|
||||
warnemailrecipient =
|
||||
completionemailrecipient = rkrenn@sipwise.com
|
||||
doneemailrecipient =
|
||||
|
||||
##logging:
|
||||
fileloglevel = INFO
|
||||
#DEBUG
|
||||
screenloglevel = INFO
|
||||
#INFO
|
||||
emailloglevel = OFF
|
||||
@ -0,0 +1,291 @@
|
||||
use strict;
|
||||
|
||||
## no critic
|
||||
|
||||
use File::Basename;
|
||||
use Cwd;
|
||||
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../');
|
||||
|
||||
use Getopt::Long qw(GetOptions);
|
||||
use Fcntl qw(LOCK_EX LOCK_NB);
|
||||
|
||||
use NGCP::BulkProcessor::Globals qw();
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw(
|
||||
update_settings
|
||||
|
||||
check_dry
|
||||
|
||||
$defaultsettings
|
||||
$defaultconfig
|
||||
$dry
|
||||
$skip_errors
|
||||
$force
|
||||
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::Logging qw(
|
||||
init_log
|
||||
getlogger
|
||||
$attachmentlogfile
|
||||
scriptinfo
|
||||
cleanuplogfiles
|
||||
$currentlogfile
|
||||
);
|
||||
use NGCP::BulkProcessor::LogError qw (
|
||||
completion
|
||||
done
|
||||
scriptwarn
|
||||
scripterror
|
||||
filewarn
|
||||
fileerror
|
||||
);
|
||||
use NGCP::BulkProcessor::LoadConfig qw(
|
||||
load_config
|
||||
$SIMPLE_CONFIG_TYPE
|
||||
$YAML_CONFIG_TYPE
|
||||
$ANY_CONFIG_TYPE
|
||||
);
|
||||
use NGCP::BulkProcessor::Array qw(removeduplicates);
|
||||
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
|
||||
use NGCP::BulkProcessor::Mail qw(
|
||||
cleanupmsgfiles
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles);
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp qw();
|
||||
use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw();
|
||||
use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw();
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw(destroy_all_dbs);
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::Import qw(
|
||||
load_file
|
||||
);
|
||||
|
||||
use NGCP::BulkProcessor::Projects::ETL::Lnp::ProcessLnp qw(
|
||||
create_lnp_numbers
|
||||
delete_lnp_numbers
|
||||
);
|
||||
|
||||
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
|
||||
|
||||
my @TASK_OPTS = ();
|
||||
|
||||
my $tasks = [];
|
||||
|
||||
my $cleanup_task_opt = 'cleanup';
|
||||
push(@TASK_OPTS,$cleanup_task_opt);
|
||||
|
||||
my $load_file_task_opt = 'load_file';
|
||||
push(@TASK_OPTS,$load_file_task_opt);
|
||||
|
||||
my $create_lnp_task_opt = 'create_lnp';
|
||||
push(@TASK_OPTS,$create_lnp_task_opt);
|
||||
|
||||
my $delete_lnp_task_opt = 'delete_lnp';
|
||||
push(@TASK_OPTS,$delete_lnp_task_opt);
|
||||
|
||||
if (init()) {
|
||||
main();
|
||||
exit(0);
|
||||
} else {
|
||||
exit(1);
|
||||
}
|
||||
|
||||
sub init {
|
||||
|
||||
my $configfile = $defaultconfig;
|
||||
my $settingsfile = $defaultsettings;
|
||||
|
||||
return 0 unless GetOptions(
|
||||
"config=s" => \$configfile,
|
||||
"settings=s" => \$settingsfile,
|
||||
"task=s" => $tasks,
|
||||
"dry" => \$dry,
|
||||
"skip-errors" => \$skip_errors,
|
||||
"force" => \$force,
|
||||
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
|
||||
|
||||
$tasks = removeduplicates($tasks,1);
|
||||
|
||||
my $result = load_config($configfile);
|
||||
init_log();
|
||||
$result &= load_config($settingsfile,\&update_settings,$YAML_CONFIG_TYPE);
|
||||
return $result;
|
||||
|
||||
}
|
||||
|
||||
sub main() {
|
||||
|
||||
my @messages = ();
|
||||
my @attachmentfiles = ();
|
||||
my $result = 1;
|
||||
my $completion = 0;
|
||||
|
||||
if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) {
|
||||
scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors;
|
||||
foreach my $task (@$tasks) {
|
||||
|
||||
if (lc($cleanup_task_opt) eq lc($task)) {
|
||||
$result &= cleanup_task(\@messages,1) if taskinfo($cleanup_task_opt,$result);
|
||||
|
||||
} elsif (lc($load_file_task_opt) eq lc($task)) {
|
||||
$result &= load_file_task(\@messages) if taskinfo($load_file_task_opt,$result);
|
||||
|
||||
} elsif (lc($create_lnp_task_opt) eq lc($task)) {
|
||||
if (taskinfo($create_lnp_task_opt,$result,1)) {
|
||||
next unless check_dry();
|
||||
$result &= create_lnp_task(\@messages);
|
||||
$completion |= 1;
|
||||
}
|
||||
|
||||
} elsif (lc($delete_lnp_task_opt) eq lc($task)) {
|
||||
if (taskinfo($delete_lnp_task_opt,$result,1)) {
|
||||
next unless check_dry();
|
||||
$result &= delete_lnp_task(\@messages);
|
||||
$completion |= 1;
|
||||
}
|
||||
|
||||
} else {
|
||||
$result = 0;
|
||||
scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
|
||||
last;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$result = 0;
|
||||
scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
|
||||
}
|
||||
|
||||
push(@attachmentfiles,$attachmentlogfile);
|
||||
if ($completion) {
|
||||
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
|
||||
} else {
|
||||
done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
sub taskinfo {
|
||||
my ($task,$result) = @_;
|
||||
scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath()));
|
||||
return $result;
|
||||
}
|
||||
|
||||
sub cleanup_task {
|
||||
my ($messages,$clean_generated) = @_;
|
||||
my $result = 0;
|
||||
if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) {
|
||||
eval {
|
||||
#cleanupcvsdirs() if $clean_generated;
|
||||
cleanupdbfiles() if $clean_generated;
|
||||
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
|
||||
cleanupmsgfiles(\&fileerror,\&filewarn);
|
||||
#cleanupcertfiles();
|
||||
#cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
|
||||
$result = 1;
|
||||
};
|
||||
}
|
||||
if ($@ or !$result) {
|
||||
#print $@;
|
||||
push(@$messages,'working directory cleanup INCOMPLETE');
|
||||
return 0;
|
||||
} else {
|
||||
push(@$messages,'working directory folders cleaned up');
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
sub load_file_task {
|
||||
|
||||
my ($messages) = @_;
|
||||
my ($result,$warning_count) = (0,0);
|
||||
eval {
|
||||
($result,$warning_count) = load_file();
|
||||
};
|
||||
#print $@;
|
||||
my $err = $@;
|
||||
my $stats = ": $warning_count warnings";
|
||||
eval {
|
||||
$stats .= "\n total file LNP records: " .
|
||||
NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta() . ' rows';
|
||||
my $added_count = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta(
|
||||
$NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta
|
||||
);
|
||||
$stats .= "\n new: $added_count rows";
|
||||
my $existing_count = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta(
|
||||
$NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::updated_delta
|
||||
);
|
||||
$stats .= "\n existing: $existing_count rows";
|
||||
my $deleted_count = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta(
|
||||
$NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta
|
||||
);
|
||||
$stats .= "\n removed: $deleted_count rows";
|
||||
};
|
||||
if ($err or !$result) {
|
||||
push(@$messages,"loading LNP file INCOMPLETE$stats");
|
||||
} else {
|
||||
push(@$messages,"loading LNP file completed$stats");
|
||||
}
|
||||
destroy_all_dbs(); #every task should leave with closed connections.
|
||||
return $result;
|
||||
|
||||
}
|
||||
|
||||
sub create_lnp_task {
|
||||
|
||||
my ($messages) = @_;
|
||||
my ($result,$warning_count) = (0,0);
|
||||
eval {
|
||||
($result,$warning_count) = create_lnp_numbers();
|
||||
};
|
||||
#print $@;
|
||||
my $err = $@;
|
||||
my $stats = ": $warning_count warnings";
|
||||
eval {
|
||||
$stats .= "\n total mariadb LNP providers: " .
|
||||
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::countby_name() . ' rows';
|
||||
$stats .= "\n total mariadb LNP numbers: " .
|
||||
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::countby_lnpproviderid_number() . ' rows';
|
||||
};
|
||||
if ($err or !$result) {
|
||||
push(@$messages,"creating LNP numbers INCOMPLETE$stats");
|
||||
} else {
|
||||
push(@$messages,"creating LNP numbers completed$stats");
|
||||
}
|
||||
destroy_all_dbs(); #every task should leave with closed connections.
|
||||
return 1; #$result;
|
||||
|
||||
}
|
||||
|
||||
sub delete_lnp_task {
|
||||
|
||||
my ($messages) = @_;
|
||||
my ($result,$warning_count) = (0,0);
|
||||
eval {
|
||||
($result,$warning_count) = delete_lnp_numbers();
|
||||
};
|
||||
#print $@;
|
||||
my $err = $@;
|
||||
my $stats = ": $warning_count warnings";
|
||||
eval {
|
||||
$stats .= "\n total mariadb LNP providers: " .
|
||||
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::countby_name() . ' rows';
|
||||
$stats .= "\n total mariadb LNP numbers: " .
|
||||
NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::countby_lnpproviderid_number() . ' rows';
|
||||
};
|
||||
if ($err or !$result) {
|
||||
push(@$messages,"deleting LNP numbers INCOMPLETE$stats");
|
||||
} else {
|
||||
push(@$messages,"deleting LNP numbers completed$stats");
|
||||
}
|
||||
destroy_all_dbs(); #every task should leave with closed connections.
|
||||
return 1; #$result;
|
||||
|
||||
}
|
||||
|
||||
__DATA__
|
||||
This exists to allow the locking code at the beginning of the file to work.
|
||||
DO NOT REMOVE THESE LINES!
|
||||
Loading…
Reference in new issue