From 7f905cec58b99f0bdeb2945378c74f33c9eced93 Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Mon, 22 Apr 2024 21:44:26 +0200 Subject: [PATCH] MT#59932 remove lnp importer and customer exporter Change-Id: I9832f3f7d7a5d897b85812f3f83cf4d084d0aa4b (cherry picked from commit bb20bc5061bf5d7097c187cc2ccf2c8f5c3b4527) --- .../Projects/ETL/Customer/Dao/Tabular.pm | 279 ---------- .../Projects/ETL/Customer/ExportCustomers.pm | 490 ----------------- .../ETL/Customer/ProjectConnectorPool.pm | 120 ---- .../Projects/ETL/Customer/Settings.pm | 511 ------------------ .../Projects/ETL/Customer/config.cfg | 61 --- .../Projects/ETL/Customer/config.debug.cfg | 61 --- .../Projects/ETL/Customer/graph.yml | 5 - .../Projects/ETL/Customer/load.yml | 44 -- .../Projects/ETL/Customer/process.pl | 321 ----------- .../Projects/ETL/Customer/settings.cfg | 74 --- .../Projects/ETL/Customer/tabular.yml | 70 --- .../BulkProcessor/Projects/ETL/Lnp/Dao/lnp.pm | 326 ----------- .../ETL/Lnp/FileProcessors/NumbersFile.pm | 66 --- .../BulkProcessor/Projects/ETL/Lnp/Import.pm | 190 ------- .../Projects/ETL/Lnp/ProcessLnp.pm | 322 ----------- .../Projects/ETL/Lnp/ProjectConnectorPool.pm | 92 ---- .../Projects/ETL/Lnp/Settings.pm | 218 -------- .../BulkProcessor/Projects/ETL/Lnp/config.cfg | 62 --- .../BulkProcessor/Projects/ETL/Lnp/process.pl | 291 ---------- .../Projects/ETL/Lnp/settings.yml | 65 --- 20 files changed, 3668 deletions(-) delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.cfg delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.debug.cfg delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/graph.yml delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/load.yml delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular.yml delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Dao/lnp.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/FileProcessors/NumbersFile.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Import.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProcessLnp.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProjectConnectorPool.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Settings.pm delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/config.cfg delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/process.pl delete mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/settings.yml diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm deleted file mode 100644 index fbe4a8e..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm +++ /dev/null @@ -1,279 +0,0 @@ -package NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular; -use strict; - -## no critic - -use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw( - get_sqlite_db - destroy_all_dbs -); - -use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( - $tabular_fields - $csv_all_expected_fields -); - -use NGCP::BulkProcessor::SqlProcessor qw( - registertableinfo - create_targettable - checktableinfo - copy_row - insert_stmt - transfer_table -); - -use NGCP::BulkProcessor::SqlRecord qw(); - -require Exporter; -our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); -our @EXPORT_OK = qw( - create_table - gettablename - check_table - getinsertstatement - getupsertstatement - - get_fieldnames - - update_delta - findby_delta - countby_delta - - $deleted_delta - $updated_delta - $added_delta - - copy_table -); - -my $tablename = 'tabular'; -my $get_db = \&get_sqlite_db; - -my $fieldnames; -my $expected_fieldnames; -sub get_fieldnames { - my $expected = shift; - unless (defined $fieldnames and defined $expected_fieldnames) { - $fieldnames = [ map { - local $_ = (ref $_ ? (exists $_->{colname} ? $_->{colname} : $_->{path}) : $_); - $_ =~ s/\./_/g; - $_ =~ s/\[(\d+)\]/_$1/g; - $_; - } @$tabular_fields ]; - $expected_fieldnames = [ @$fieldnames ]; - push(@$expected_fieldnames,'uuid') unless grep { 'uuid' eq $_; } @$expected_fieldnames; - push(@$expected_fieldnames,'delta'); - } - return $fieldnames unless $expected; - return $expected_fieldnames; -} - -my $primarykey_fieldnames = [ 'uuid' ]; -my $indexes = { - $tablename . '_delta' => [ 'delta(7)' ], -}; - -our $deleted_delta = 'DELETED'; -our $updated_delta = 'UPDATED'; -our $added_delta = 'ADDED'; - -sub new { - - my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, - $tablename,get_fieldnames(1),$indexes); - - copy_row($self,shift,get_fieldnames(1)); - - return $self; - -} - -sub create_table { - - my ($truncate) = @_; - - my $db = &$get_db(); - - registertableinfo($db,__PACKAGE__,$tablename,get_fieldnames(1),$indexes,$primarykey_fieldnames); - return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); - -} - -sub findby_delta { - - my ($delta,$load_recursive) = @_; - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - - return [] unless defined $delta; - - my $rows = $db->db_get_all_arrayref( - 'SELECT * FROM ' . - $table . - ' WHERE ' . - $db->columnidentifier('delta') . ' = ?' - , $delta); - - return buildrecords_fromrows($rows,$load_recursive); - -} - -sub findby_domainusername { - - my ($domain,$username,$load_recursive) = @_; - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - - return [] unless (defined $domain and defined $username); - - my $rows = $db->db_get_all_arrayref( - 'SELECT * FROM ' . $table . - ' WHERE ' . $db->columnidentifier('domain') . ' = ?' . - ' AND ' . $db->columnidentifier('username') . ' = ?' - , $domain, $username); - - return buildrecords_fromrows($rows,$load_recursive)->[0]; - -} - -sub update_delta { - - my ($uuid,$delta) = @_; - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - - my $stmt = 'UPDATE ' . $table . ' SET delta = ?'; - my @params = (); - push(@params,$delta); - if (defined $uuid) { - $stmt .= ' WHERE ' . - $db->columnidentifier('uuid') . ' = ?'; - push(@params, $uuid); - } - - return $db->db_do($stmt,@params); - -} - -sub countby_delta { - - my ($deltas) = @_; - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - - my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1'; - my @params = (); - if (defined $deltas and 'HASH' eq ref $deltas) { - foreach my $in (keys %$deltas) { - my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); - $stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; - push(@params,@values); - } - } elsif (defined $deltas and length($deltas) > 0) { - $stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?'; - push(@params,$deltas); - } - - return $db->db_get_value($stmt,@params); - -} - -sub copy_table { - - my ($get_target_db) = @_; - - if ($csv_all_expected_fields) { - check_table(); - } else { - checktableinfo($get_db, - __PACKAGE__,$tablename, - get_fieldnames(0), - $indexes); - } - - return transfer_table( - get_db => $get_db, - class => __PACKAGE__, - get_target_db => $get_target_db, - targetclass => __PACKAGE__, - targettablename => $tablename, - ); - -} - -sub buildrecords_fromrows { - - my ($rows,$load_recursive) = @_; - - my @records = (); - my $record; - - if (defined $rows and ref $rows eq 'ARRAY') { - foreach my $row (@$rows) { - $record = __PACKAGE__->new($row); - - # transformations go here ... - - push @records,$record; - } - } - - return \@records; - -} - -sub getinsertstatement { - - my ($insert_ignore) = @_; - check_table(); - return insert_stmt($get_db,__PACKAGE__,$insert_ignore); - -} - -sub getupsertstatement { - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' . - join(', ', map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @{get_fieldnames(1)}) . ')'; - my @values = (); - foreach my $fieldname (@{get_fieldnames(1)}) { - if ('delta' eq $fieldname) { - my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' . - $db->columnidentifier('uuid') . ' = ?'; - push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')'); - } else { - push(@values,'?'); - } - } - $upsert_stmt .= ' VALUES (' . join(',',@values) . ')'; - return $upsert_stmt; - -} - -sub gettablename { - - return $tablename; - -} - -sub check_table { - - return checktableinfo($get_db, - __PACKAGE__,$tablename, - get_fieldnames(1), - $indexes); - -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm deleted file mode 100644 index 11fe980..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm +++ /dev/null @@ -1,490 +0,0 @@ -package NGCP::BulkProcessor::Projects::ETL::Customer::ExportCustomers; -use strict; - -## no critic - -use threads::shared qw(); - -use Tie::IxHash; - -use NGCP::BulkProcessor::Serialization qw(); -use Scalar::Util qw(blessed); -use MIME::Base64 qw(encode_base64); - -use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( - $dry - $skip_errors - - $export_customers_multithreading - $export_customers_numofthreads - $export_customers_blocksize - - run_dao_method - get_dao_var - get_export_filename - - write_export_file - $customer_export_filename_format - - $tabular_fields - $load_recursive - $tabular_single_row_txn - $ignore_tabular_unique - $graph_fields - $graph_fields_mode -); - -use NGCP::BulkProcessor::Logging qw ( - getlogger - processing_info - processing_debug -); -use NGCP::BulkProcessor::LogError qw( - rowprocessingerror - rowprocessingwarn - fileerror -); - -use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); - -use NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular qw(); - -use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw( - get_sqlite_db - destroy_all_dbs - ping_all_dbs -); - -use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp stringtobool trim); #check_ipnet -#use NGCP::BulkProcessor::DSSorter qw(sort_by_configs); -#use NGCP::BulkProcessor::Table qw(get_rowhash); -use NGCP::BulkProcessor::Array qw(array_to_map); -use NGCP::BulkProcessor::DSPath qw(); - -require Exporter; -our @ISA = qw(Exporter); -our @EXPORT_OK = qw( - export_customers_graph - export_customers_tabular -); - -sub _init_graph_field_map { - my $context = shift; - my %graph_field_map = (); - my %graph_field_globs = (); - tie(%graph_field_globs, 'Tie::IxHash'); - foreach my $graph_field (@$graph_fields) { - my ($c,$a); - if ('HASH' eq ref $graph_field) { - $a = $graph_field->{path}; - $c = $graph_field; - } else { - $a = $graph_field; - $c = 1; - } - #my @a = (); - #my $b = ''; - #foreach my $c (split(/\./,$a)) { - # - # foreach my () { - # $b .= $_ - # push() - # } - #} - if ($a =~ /\*/) { - $a = quotemeta($a); - $a =~ s/(\\\*)+/[^.]+/g; - $a = '^' . $a . '$'; - $graph_field_globs{$a} = $c unless exists $graph_field_globs{$a}; - } else { - $graph_field_map{$a} = $c unless exists $graph_field_map{$a}; - } - } - $context->{graph_field_map} = \%graph_field_map; - $context->{graph_field_globs} = \%graph_field_globs; -} - -sub export_customers_graph { - - my $static_context = { - - }; - _init_graph_field_map($static_context); - ($static_context->{export_filename},$static_context->{export_format}) = get_export_filename($customer_export_filename_format); - - my $result = 1; #_copy_customers_checks($static_context); - - destroy_all_dbs(); - my $warning_count :shared = 0; - return ($result && run_dao_method('billing::contracts::process_records', - #source_dbs => $static_context->{source_dbs}, - static_context => $static_context, - process_code => sub { - my ($context,$records,$row_offset) = @_; - ping_all_dbs(); - my @data = (); - foreach my $record (@$records) { - next unless _export_customer_graph_init_context($context,$record); - push(@data,_get_contract_graph($context)); - } - write_export_file(\@data,$context->{export_filename},$context->{export_format}); - return 1; - }, - init_process_context_code => sub { - my ($context)= @_; - $context->{error_count} = 0; - $context->{warning_count} = 0; - }, - uninit_process_context_code => sub { - my ($context)= @_; - destroy_all_dbs(); - { - lock $warning_count; - $warning_count += $context->{warning_count}; - } - }, - destroy_reader_dbs_code => \&destroy_all_dbs, - blocksize => $export_customers_blocksize, - multithreading => $export_customers_multithreading, - numofthreads => $export_customers_numofthreads, - ),$warning_count,); - -} - -sub export_customers_tabular { - - my $result = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::create_table(0); - - my $static_context = { - upsert => _tabular_rows_reset_delta(), - }; - - destroy_all_dbs(); - my $warning_count :shared = 0; - return ($result && run_dao_method('billing::contracts::process_records', - static_context => $static_context, - process_code => sub { - my ($context,$records,$row_offset) = @_; - ping_all_dbs(); - my @subscriber_rows = (); - foreach my $record (@$records) { - next unless _export_customer_tabular_init_context($context,$record); - push(@subscriber_rows, _get_subscriber_rows($context)); - - if ($tabular_single_row_txn and (scalar @subscriber_rows) > 0) { - while (defined (my $subscriber_row = shift @subscriber_rows)) { - if ($skip_errors) { - eval { _insert_tabular_rows($context,[$subscriber_row]); }; - _warn($context,$@) if $@; - } else { - _insert_tabular_rows($context,[$subscriber_row]); - } - } - } - } - - if (not $tabular_single_row_txn and (scalar @subscriber_rows) > 0) { - if ($skip_errors) { - eval { insert_tabular_rows($context,\@subscriber_rows); }; - _warn($context,$@) if $@; - } else { - insert_tabular_rows($context,\@subscriber_rows); - } - } - - return 1; - }, - init_process_context_code => sub { - my ($context)= @_; - $context->{db} = &get_sqlite_db(); - $context->{error_count} = 0; - $context->{warning_count} = 0; - }, - uninit_process_context_code => sub { - my ($context)= @_; - undef $context->{db}; - destroy_all_dbs(); - { - lock $warning_count; - $warning_count += $context->{warning_count}; - } - }, - destroy_reader_dbs_code => \&destroy_all_dbs, - blocksize => $export_customers_blocksize, - multithreading => $export_customers_multithreading, - numofthreads => $export_customers_numofthreads, - ),$warning_count,); - -} - -sub _tabular_rows_reset_delta { - my $upsert = 0; - if (NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta() > 0) { - processing_info(threadid(),'resetting delta of ' . - NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::update_delta(undef, - $NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::deleted_delta) . - ' records',getlogger(__PACKAGE__)); - $upsert |= 1; - } - return $upsert; -} - -sub _insert_tabular_rows { - my ($context,$subscriber_rows) = @_; - $context->{db}->db_do_begin( - ($context->{upsert} ? - NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::getupsertstatement() - : NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::getinsertstatement($ignore_tabular_unique)), - ); - eval { - $context->{db}->db_do_rowblock($subscriber_rows); - $context->{db}->db_finish(); - }; - my $err = $@; - if ($err) { - eval { - $context->{db}->db_finish(1); - }; - die($err); - } - -} - -sub _export_customer_graph_init_context { - - my ($context,$record) = @_; - - my $result = 1; - - return 0 unless _load_contract($context,$record); - - return $result; - -} - -sub _get_contract_graph { - my ($context) = @_; - - my $dp = NGCP::BulkProcessor::DSPath->new($context->{contract}, { - filter => sub { - my $path = shift; - if ('whitelist' eq $graph_fields_mode) { - my $include; - if (exists $context->{graph_field_map}->{$path}) { - $include = $context->{graph_field_map}->{$path}; - } else { - foreach my $glob (keys %{$context->{graph_field_globs}}) { - if ($path =~ /$glob/) { - $include = $context->{graph_field_globs}->{$glob}; - last; - } - } - } - if ('HASH' eq ref $include) { - if (exists $include->{include}) { - return $include->{include}; - } - return 1; - } else { - return $include; - } - } elsif ('blacklist' eq $graph_fields_mode) { - my $exclude; - if (exists $context->{graph_field_map}->{$path}) { - $exclude = $context->{graph_field_map}->{$path}; - } else { - foreach my $glob (keys %{$context->{graph_field_globs}}) { - if ($path =~ /$glob/) { - $exclude = $context->{graph_field_globs}->{$glob}; - last; - } - } - } - if ('HASH' eq ref $exclude) { - if (exists $exclude->{exclude}) { - return not $exclude->{exclude}; - } elsif ($exclude->{transform}) { - return 1; - } - return 0; - } else { - return not $exclude; - } - } - }, - transform => sub { - return shift; - # ($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) = - # array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, - # sub { return shift->{attribute}; }, sub { my $p = shift; }, 'group' ); - # if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) { - # foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) { - # foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) { - # $voicemail->{recording} = encode_base64($voicemail->{recording},''); - # } - # } - # } - }, - }); - - $dp->filter()->transform(); - - return $context->{contract}; - -} - -sub _export_customer_tabular_init_context { - - my ($context,$record) = @_; - - my $result = 1; - - return 0 unless _load_contract($context,$record); - - if (defined $context->{contract}->{voip_subscribers} - and not scalar @{$context->{contract}->{voip_subscribers}}) { - _info($context,"contract ID $record->{id} has no subscribers, skipping",1); - $result = 0; - } - - return $result; - -} - -sub _get_subscriber_rows { - - my ($context) = @_; - - my @rows = (); - foreach my $bill_subs (@{$context->{contract}->{voip_subscribers}}) { - my @row = (); - $bill_subs->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts->new($context->{contract}); #no circular ref - ($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) = - array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, - sub { return shift->{_attribute}; }, sub { my $p = shift; }, 'group' ); - if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) { - foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) { - foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) { - $voicemail->{recording} = encode_base64($voicemail->{recording},''); - } - } - } - my $dp = NGCP::BulkProcessor::DSPath->new($bill_subs, { - retrieve_key_from_non_hash => sub {}, - key_does_not_exist => sub {}, - index_does_not_exist => sub {}, - }); - foreach my $tabular_field (@$tabular_fields) { - my $a; - my $sep = ','; - my $transform; - if ('HASH' eq ref $tabular_field) { - $a = $tabular_field->{path}; - $sep = $tabular_field->{sep}; - $transform = $tabular_field->{transform}; - } else { - $a = $tabular_field; - } - #eval {'' . ($dp->get('.' . $a) // '');}; if($@){ - # my $x=5; - #} - my $v = $dp->get('.' . $a); - if ('CODE' eq ref $transform) { - my $closure = _closure($transform,_get_closure_context($context)); - $v = $closure->($v,$bill_subs); - } - if ('ARRAY' eq ref $v) { - if ('HASH' eq ref $v->[0] - or (blessed($v->[0]) and $v->[0]->isa('NGCP::BulkProcessor::SqlRecord'))) { - $v = join($sep, sort map { $_->{$tabular_field->{field}}; } @$v); - } else { - $v = join($sep, sort @$v); - } - } else { - $v = '' . ($v // ''); - } - push(@row,$v); - } - push(@row,$bill_subs->{uuid}) unless grep { 'uuid' eq $_; } @{NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::get_fieldnames()}; - if ($context->{upsert}) { - push(@row,$bill_subs->{uuid}); - } else { - push(@row,$NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::added_delta); - } - - push(@rows,\@row); - } - - return @rows; - -} - -sub _load_contract { - - my ($context,$record) = @_; - $context->{contract} = run_dao_method('billing::contracts::findby_id', $record->{id}, { %$load_recursive, - #'contracts.voip_subscribers.domain' => 1, - _context => _get_closure_context($context), - }); - - return 1 if $context->{contract}; - return 0; - -} - -sub _get_closure_context { - my $context = shift; - return { - _info => \&_info, - _error => \&_error, - _debug => \&_debug, - _warn => \&_warn, - context => $context, - }; -} - -sub _closure { - my ($sub,$context) = @_; - return sub { - foreach my $key (keys %$context) { - no strict "refs"; ## no critic (ProhibitNoStrict) - *{"main::$key"} = $context->{$key} if 'CODE' eq ref $context->{$key}; - } - return $sub->(@_,$context); - }; -} - -sub _error { - - my ($context,$message) = @_; - $context->{error_count} = $context->{error_count} + 1; - rowprocessingerror($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); - -} - -sub _warn { - - my ($context,$message) = @_; - $context->{warning_count} = $context->{warning_count} + 1; - rowprocessingwarn($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); - -} - -sub _info { - - my ($context,$message,$debug) = @_; - if ($debug) { - processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); - } else { - processing_info($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); - } -} - -sub _debug { - - my ($context,$message,$debug) = @_; - processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); - -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm deleted file mode 100644 index 31e8d56..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm +++ /dev/null @@ -1,120 +0,0 @@ -package NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool; -use strict; - -## no critic - -use File::Basename; -use Cwd; -use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../'); - -use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( - $csv_dir - $sqlite_db_file -); - -use NGCP::BulkProcessor::ConnectorPool qw( - get_connectorinstancename -); - -use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(); -use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode); - -use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); - -require Exporter; -our @ISA = qw(Exporter); -our @EXPORT_OK = qw( - - get_sqlite_db - sqlite_db_tableidentifier - - get_csv_db - csv_db_tableidentifier - - destroy_dbs - destroy_all_dbs - ping_all_dbs - -); - -my $sqlite_dbs = {}; -my $csv_dbs = {}; - -sub get_sqlite_db { - - my ($instance_name,$reconnect) = @_; - my $name = get_connectorinstancename($instance_name); - - if (not defined $sqlite_dbs->{$name}) { - $sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); - if (not defined $reconnect) { - $reconnect = 1; - } - } - if ($reconnect) { - $sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file); - } - - return $sqlite_dbs->{$name}; - -} - -sub sqlite_db_tableidentifier { - - my ($get_target_db,$tablename) = @_; - my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; - return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file)); - -} - -sub get_csv_db { - - my ($instance_name,$reconnect) = @_; - my $name = get_connectorinstancename($instance_name); - if (not defined $csv_dbs->{$name}) { - $csv_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::CSVDB->new($instance_name); - if (not defined $reconnect) { - $reconnect = 1; - } - } - if ($reconnect) { - $csv_dbs->{$name}->db_connect($csv_dir); - } - return $csv_dbs->{$name}; - -} - -sub csv_db_tableidentifier { - - my ($get_target_db,$tablename) = @_; - my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; - return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::CSVDB::get_tableidentifier($tablename,$csv_dir)); - -} - -sub destroy_dbs { - - foreach my $name (keys %$sqlite_dbs) { - cleartableinfo($sqlite_dbs->{$name}); - undef $sqlite_dbs->{$name}; - delete $sqlite_dbs->{$name}; - } - - foreach my $name (keys %$csv_dbs) { - cleartableinfo($csv_dbs->{$name}); - undef $csv_dbs->{$name}; - delete $csv_dbs->{$name}; - } - -} - -sub destroy_all_dbs() { - destroy_dbs(); - NGCP::BulkProcessor::ConnectorPool::destroy_dbs(); -} - -sub ping_all_dbs() { - NGCP::BulkProcessor::ConnectorPool::ping_dbs(); -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm deleted file mode 100644 index 19a115c..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm +++ /dev/null @@ -1,511 +0,0 @@ -package NGCP::BulkProcessor::Projects::ETL::Customer::Settings; -use strict; - -## no critic - -use threads::shared qw(); - -use File::Basename qw(fileparse); -use NGCP::BulkProcessor::Serialization qw(); -use DateTime::TimeZone qw(); - -use JSON -support_by_pp, -no_export; -*NGCP::BulkProcessor::Serialization::serialize_json = sub { - my $input_ref = shift; - return JSON::to_json($input_ref, { allow_nonref => 1, allow_blessed => 1, convert_blessed => 1, pretty => 1, as_nonblessed => 1 }); -}; -use NGCP::BulkProcessor::SqlConnectors::CSVDB qw($default_csv_config); - -use NGCP::BulkProcessor::Globals qw( - $working_path - $enablemultithreading - $cpucount - create_path -); - -use NGCP::BulkProcessor::Logging qw( - getlogger - scriptinfo - configurationinfo -); - -use NGCP::BulkProcessor::LogError qw( - fileerror - filewarn - configurationwarn - configurationerror -); - -use NGCP::BulkProcessor::LoadConfig qw( - split_tuple - parse_regexp -); - -use NGCP::BulkProcessor::Utils qw(prompt timestampdigits threadid load_module); - -use NGCP::BulkProcessor::Array qw(contains); - -require Exporter; -our @ISA = qw(Exporter); -our @EXPORT_OK = qw( - update_settings - run_dao_method - get_dao_var - get_export_filename - write_export_file - write_sql_file - - update_load_recursive - $load_yml - $load_recursive - - update_tabular_fields - $tabular_yml - $tabular_fields - $ignore_tabular_unique - $tabular_single_row_txn - $graph_yml - $graph_fields - $graph_fields_mode - update_graph_fields - - $sqlite_db_file - $csv_dir - - check_dry - - $output_path - $input_path - - $customer_export_filename_format - $customer_import_filename - $split_customers - - $defaultsettings - $defaultconfig - - $dry - $skip_errors - $force - - $export_customers_multithreading - $export_customers_numofthreads - $export_customers_blocksize - - $csv_all_expected_fields - - $csv_header_line -); -#$cf_default_priority -#$cf_default_timeout -#$cft_default_ringtimeout - -our $defaultconfig = 'config.cfg'; -our $defaultsettings = 'settings.cfg'; - -our $tabular_yml = 'tabular.yml'; -our $tabular_fields = []; -our $ignore_tabular_unique = 0; -our $tabular_single_row_txn = 1; - -our $graph_yml = 'graph.yml'; -our $graph_fields = []; -our $graph_fields_mode = 'whitelist'; -my @graph_fields_modes = qw(whitelist blacklist); - -our $load_yml = 'load.yml'; -our $load_recursive; - -our $output_path = $working_path . 'output/'; -our $input_path = $working_path . 'input/'; -our $csv_dir = 'customer'; - -our $customer_export_filename_format = undef; - -our $csv_all_expected_fields = 1; - -our $csv_header_line = 1; - -#our $customer_import_filename = undef; -#our $customer_import_numofthreads = $cpucount; -#our $customer_import_multithreading = 1; -#our $customer_reseller_name = 'default'; -#our $customer_billing_profile_name = 'Default Billing Profile'; -#our $customer_domain = undef; -#our $customer_contact_email_format = '%s@example.org'; -#our $subscriber_contact_email_format = '%s@example.org'; -#our $split_customers = 0; - -#our $subscriber_timezone = undef; -#our $contract_timezone = undef; - -#our $subscriber_profile_set_name = undef; -#our $subscriber_profile_name = undef; -#our $webusername_format = '%1$s'; -#our $subscriber_externalid_format = undef; - -our $force = 0; -our $dry = 0; -our $skip_errors = 0; - -my $mr = 'Trunk'; -my @supported_mr = ('Trunk'); - -our $sqlite_db_file = 'sqlite'; - -our $export_customers_multithreading = $enablemultithreading; -our $export_customers_numofthreads = $cpucount; -our $export_customers_blocksize = 1000; - -#our $cf_default_priority = 1; -#our $cf_default_timeout = 300; -#our $cft_default_ringtimeout = 20; - -#our $rollback_sql_export_filename_format = undef; -#our $rollback_sql_stmt_format = undef; - -my $file_lock :shared = undef; - -sub update_settings { - - my ($data,$configfile) = @_; - - if (defined $data) { - - my $result = 1; - my $regexp_result; - - #&$configurationinfocode("testinfomessage",$configlogger); - - $result &= _prepare_working_paths(1); - - $customer_export_filename_format = $data->{customer_export_filename} if exists $data->{customer_export_filename}; - get_export_filename($data->{customer_export_filename},$configfile); - - #$rollback_sql_export_filename_format = $data->{rollback_sql_export_filename_format} if exists $data->{rollback_sql_export_filename_format}; - #get_export_filename($data->{rollback_sql_export_filename_format},$configfile); - #$rollback_sql_stmt_format = $data->{rollback_sql_stmt_format} if exists $data->{rollback_sql_stmt_format}; - - $sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file}; - $csv_dir = $data->{csv_dir} if exists $data->{csv_dir}; - - #$customer_import_filename = _get_import_filename($customer_import_filename,$data,'customer_import_filename'); - #$customer_import_multithreading = $data->{customer_import_multithreading} if exists $data->{customer_import_multithreading}; - #$customer_import_numofthreads = _get_numofthreads($cpucount,$data,'customer_import_numofthreads'); - #$customer_reseller_name = $data->{customer_reseller_name} if exists $data->{customer_reseller_name}; - #$customer_billing_profile_name = $data->{customer_billing_profile_name} if exists $data->{customer_billing_profile_name}; - #$customer_domain = $data->{customer_domain} if exists $data->{customer_domain}; - #$customer_contact_email_format = $data->{customer_contact_email_format} if exists $data->{customer_contact_email_format}; - #$subscriber_contact_email_format = $data->{subscriber_contact_email_format} if exists $data->{subscriber_contact_email_format}; - #$split_customers = $data->{split_customers} if exists $data->{split_customers}; - - #$contract_timezone = $data->{customer_timezone} if exists $data->{customer_timezone}; - #if ($contract_timezone and not DateTime::TimeZone->is_valid_name($contract_timezone)) { - # configurationerror($configfile,"invalid customer_timezone '$contract_timezone'"); - # $result = 0; - #} - - #$subscriber_timezone = $data->{subscriber_timezone} if exists $data->{subscriber_timezone}; - #if ($subscriber_timezone and not DateTime::TimeZone->is_valid_name($subscriber_timezone)) { - # configurationerror($configfile,"invalid subscriber_timezone '$subscriber_timezone'"); - # $result = 0; - #} - - #$subscriber_profile_set_name = $data->{subscriber_profile_set_name} if exists $data->{subscriber_profile_set_name}; - #$subscriber_profile_name = $data->{subscriber_profile_name} if exists $data->{subscriber_profile_name}; - #if ($subscriber_profile_set_name and not $subscriber_profile_name - # or not $subscriber_profile_set_name and $subscriber_profile_name) { - # configurationerror($configfile,"both subscriber_profile_set_name and subscriber_profile_name required"); - # $result = 0; - #} - #$webusername_format = $data->{webusername_format} if exists $data->{webusername_format}; - #$subscriber_externalid_format = $data->{subscriber_externalid_format} if exists $data->{subscriber_externalid_format}; - - $dry = $data->{dry} if exists $data->{dry}; - $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; - - $export_customers_multithreading = $data->{export_customers_multithreading} if exists $data->{export_customers_multithreading}; - $export_customers_numofthreads = _get_numofthreads($cpucount,$data,'export_customers_numofthreads'); - $export_customers_blocksize = $data->{export_customers_blocksize} if exists $data->{export_customers_blocksize}; - - $tabular_yml = $data->{tabular_yml} if exists $data->{tabular_yml}; - $graph_yml = $data->{graph_yml} if exists $data->{graph_yml}; - $graph_fields_mode = $data->{graph_fields_mode} if exists $data->{graph_fields_mode}; - if (not $graph_fields_mode or not contains($graph_fields_mode,\@graph_fields_modes)) { - configurationerror($configfile,'graph_fields_mode must be one of ' . join(', ', @graph_fields_modes)); - $result = 0; - } - $load_yml = $data->{load_yml} if exists $data->{load_yml}; - $tabular_single_row_txn = $data->{tabular_single_row_txn} if exists $data->{tabular_single_row_txn}; - $ignore_tabular_unique = $data->{ignore_tabular_unique} if exists $data->{ignore_tabular_unique}; - - #$cf_default_priority = $data->{cf_default_priority} if exists $data->{cf_default_priority}; - #$cf_default_timeout = $data->{cf_default_timeout} if exists $data->{cf_default_timeout}; - #$cft_default_ringtimeout = $data->{cft_default_ringtimeout} if exists $data->{cft_default_ringtimeout}; - - $csv_all_expected_fields = $data->{csv_all_expected_fields} if exists $data->{csv_all_expected_fields}; - - $mr = $data->{schema_version}; - if (not defined $mr or not contains($mr,\@supported_mr)) { - configurationerror($configfile,'schema_version must be one of ' . join(', ', @supported_mr)); - $result = 0; - } - - $default_csv_config = { - eol => "\r\n", - sep_char => ';', - quote_char => '"', - escape_char => '"', - }; - $default_csv_config->{eol} = unescape($data->{csv_eol}) if exists $data->{csv_eol}; - $default_csv_config->{sep_char} = unescape($data->{csv_sep_char}) if exists $data->{csv_sep_char}; - $default_csv_config->{quote_char} = unescape($data->{csv_quote_char}) if exists $data->{csv_quote_char}; - $default_csv_config->{escape_char} = unescape($data->{csv_escape_char}) if exists $data->{csv_escape_char}; - - $csv_header_line = $data->{csv_header_line} if exists $data->{csv_header_line}; - - return $result; - } - return 0; - -} - -sub unescape { - my $input = shift; - $input =~ s/\\t/\t/g; - $input =~ s/\\n/\n/g; - $input =~ s/\\r/\r/g; - return $input; -} - -sub run_dao_method { - my $method_name = 'NGCP::BulkProcessor::Dao::' . $mr . '::' . shift; - load_module($method_name); - no strict 'refs'; - return $method_name->(@_); -} - -sub get_dao_var { - my $var_name = 'NGCP::BulkProcessor::Dao::' . $mr . '::' . shift; - load_module($var_name); - no strict 'refs'; - return @{$var_name} if wantarray; - return ${$var_name}; -} - -sub _prepare_working_paths { - - my ($create) = @_; - my $result = 1; - my $path_result; - - ($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__)); - $result &= $path_result; - ($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__)); - $result &= $path_result; - - return $result; - -} - -sub _get_numofthreads { - my ($default_value,$data,$key) = @_; - my $numofthreads = $default_value; - $numofthreads = $data->{$key} if exists $data->{$key}; - $numofthreads = $cpucount if $numofthreads > $cpucount; - return $numofthreads; -} - -sub get_export_filename { - my ($filename_format,$configfile) = @_; - my $export_filename; - my $export_format; - if ($filename_format) { - $export_filename = sprintf($filename_format,timestampdigits(),threadid()); - unless ($export_filename =~ /^\//) { - $export_filename = $output_path . $export_filename; - } - if (-e $export_filename and (unlink $export_filename) == 0) { - filewarn('cannot remove ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__)); - $export_filename = undef; - } - my ($name,$path,$suffix) = fileparse($export_filename,".json",".yml",".yaml",".xml",".php",".pl",".db",".csv"); - if ($suffix eq '.json') { - $export_format = $NGCP::BulkProcessor::Serialization::format_json; - } elsif ($suffix eq '.yml' or $suffix eq '.yaml') { - $export_format = $NGCP::BulkProcessor::Serialization::format_yaml; - } elsif ($suffix eq '.xml') { - $export_format = $NGCP::BulkProcessor::Serialization::format_xml; - } elsif ($suffix eq '.php') { - $export_format = $NGCP::BulkProcessor::Serialization::format_php; - } elsif ($suffix eq '.pl') { - $export_format = $NGCP::BulkProcessor::Serialization::format_perl; - } elsif ($suffix eq '.db') { - $export_format = 'sqlite'; - } elsif ($suffix eq '.csv') { - $export_format = 'csv'; - } else { - configurationerror($configfile,"$filename_format: either .json/.yaml/.xml/.php/.pl or .db/.csv export file format required"); - } - } - return ($export_filename,$export_format); -} - -sub write_export_file { - - my ($data,$export_filename,$export_format) = @_; - if (defined $export_filename) { - fileerror("invalid extension for output filename $export_filename",getlogger(__PACKAGE__)) - unless contains($export_format,\@NGCP::BulkProcessor::Serialization::formats); - # "concatenated json" https://en.wikipedia.org/wiki/JSON_streaming - my $str = ''; - if (ref $data eq 'ARRAY') { - foreach my $obj (@$data) { - #$str .= "\n" if length($str); - $str .= NGCP::BulkProcessor::Serialization::serialize($obj,$export_format); - } - } else { - $str = NGCP::BulkProcessor::Serialization::serialize($data,$export_format); - } - _write_file($str,$export_filename); - } - -} - -sub write_sql_file { - - my ($data,$export_filename,$stmt_format) = @_; - if (defined $export_filename and $stmt_format) { - my $str = ''; - if (ref $data eq 'ARRAY') { - foreach my $obj (@$data) { - $str .= "\n" if length($str); - if (ref $obj eq 'ARRAY') { - $str .= sprintf($stmt_format,@$obj); - } else { - $str .= sprintf($stmt_format,$str); - } - } - } else { - $str = sprintf($stmt_format,$data); - } - $str .= "\n"; - _write_file($str,$export_filename); - } - -} - -sub _write_file { - - my ($str,$export_filename) = @_; - if (defined $export_filename) { - lock $file_lock; - open(my $fh, '>>', $export_filename) or fileerror('cannot open file ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__)); - binmode($fh); - print $fh $str; - close $fh; - } - -} - -sub update_tabular_fields { - - my ($data,$configfile) = @_; - - if (defined $data) { - - my $result = 1; - - eval { - $tabular_fields = $data; - }; - if ($@ or 'ARRAY' ne ref $tabular_fields) { - $tabular_fields //= []; - configurationerror($configfile,'invalid tabular fields',getlogger(__PACKAGE__)); - $result = 0; - } - - return $result; - } - return 0; - -} - -sub update_graph_fields { - - my ($data,$configfile) = @_; - - if (defined $data) { - - my $result = 1; - - eval { - $graph_fields = $data; - }; - if ($@ or 'ARRAY' ne ref $graph_fields) { - $graph_fields //= []; - configurationerror($configfile,'invalid graph fields',getlogger(__PACKAGE__)); - $result = 0; - } - - return $result; - } - return 0; - -} - -sub update_load_recursive { - - my ($data,$configfile) = @_; - - if (defined $data) { - - my $result = 1; - - eval { - $load_recursive = $data; - }; - if ($@ or 'HASH' ne ref $load_recursive) { - undef $load_recursive; - configurationerror($configfile,'invalid load recursive def',getlogger(__PACKAGE__)); - $result = 0; - } - - return $result; - } - return 0; - -} - -sub _get_import_filename { - my ($old_value,$data,$key) = @_; - my $import_filename = $old_value; - $import_filename = $data->{$key} if exists $data->{$key}; - if (defined $import_filename and length($import_filename) > 0) { - $import_filename = $input_path . $import_filename unless -e $import_filename; - } - return $import_filename; -} - -sub check_dry { - - if ($dry) { - scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__)); - return 1; - } else { - scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__)); - if (!$force) { - if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) { - return 1; - } else { - return 0; - } - } else { - scriptinfo('force option applied',getlogger(__PACKAGE__)); - return 1; - } - } - -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.cfg deleted file mode 100644 index 442b428..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.cfg +++ /dev/null @@ -1,61 +0,0 @@ -##general settings: -working_path = /var/sipwise -cpucount = 4 -enablemultithreading = 1 - -##gearman/service listener config: -jobservers = 127.0.0.1:4730 - -##NGCP MySQL connectivity - "accounting" db: -accounting_host = db01 -accounting_port = 3306 -accounting_databasename = accounting -accounting_username = root -accounting_password = - -##NGCP MySQL connectivity - "billing" db: -billing_host = db01 -billing_port = 3306 -billing_databasename = billing -billing_username = root -billing_password = - -##NGCP MySQL connectivity - "provisioning" db: -provisioning_host = db01 -provisioning_port = 3306 -provisioning_databasename = provisioning -provisioning_username = root -provisioning_password = - -##NGCP MySQL connectivity - "kamailio" db: -kamailio_host = db01 -kamailio_port = 3306 -kamailio_databasename = kamailio -kamailio_username = root -kamailio_password = - -##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: -xa_host = db01 -xa_port = 3306 -xa_databasename = ngcp -xa_username = root -xa_password = - -##NGCP REST-API connectivity: -ngcprestapi_uri = https://127.0.0.1:1443 -ngcprestapi_username = administrator -ngcprestapi_password = administrator -ngcprestapi_realm = api_admin_http - -##sending email: -emailenable = 0 -erroremailrecipient = -warnemailrecipient = -completionemailrecipient = rkrenn@sipwise.com -doneemailrecipient = - -##logging: -fileloglevel = INFO -#DEBUG -screenloglevel = INFO -emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.debug.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.debug.cfg deleted file mode 100644 index 504dc89..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/config.debug.cfg +++ /dev/null @@ -1,61 +0,0 @@ -##general settings: -working_path = /home/rkrenn/temp/customer_exporter -cpucount = 4 -enablemultithreading = 1 - -##gearman/service listener config: -jobservers = 127.0.0.1:4730 - -##NGCP MySQL connectivity - "accounting" db: -accounting_host = 192.168.0.96 -accounting_port = 3306 -accounting_databasename = accounting -accounting_username = root -accounting_password = - -##NGCP MySQL connectivity - "billing" db: -billing_host = 192.168.0.96 -billing_port = 3306 -billing_databasename = billing -billing_username = root -billing_password = - -##NGCP MySQL connectivity - "provisioning" db: -provisioning_host = 192.168.0.96 -provisioning_port = 3306 -provisioning_databasename = provisioning -provisioning_username = root -provisioning_password = - -##NGCP MySQL connectivity - "kamailio" db: -kamailio_host = 192.168.0.96 -kamailio_port = 3306 -kamailio_databasename = kamailio -kamailio_username = root -kamailio_password = - -##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: -xa_host = 192.168.0.96 -xa_port = 3306 -xa_databasename = ngcp -xa_username = root -xa_password = - -##NGCP REST-API connectivity: -ngcprestapi_uri = https://127.0.0.1:1443 -ngcprestapi_username = administrator -ngcprestapi_password = administrator -ngcprestapi_realm = api_admin_http - -##sending email: -emailenable = 0 -erroremailrecipient = -warnemailrecipient = -completionemailrecipient = rkrenn@sipwise.com -doneemailrecipient = - -##logging: -fileloglevel = INFO -#DEBUG -screenloglevel = INFO -emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/graph.yml b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/graph.yml deleted file mode 100644 index 01f088a..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/graph.yml +++ /dev/null @@ -1,5 +0,0 @@ -# graph.yml: whitelist/blacklist of *contract* fields to export to .json/.yaml/.xml/... - -- id -- voip_subscribers*.provisioning_voip_subscriber.voip_usr_preferences*.attribute.attribute -- voip_subscribers*.provisioning_voip_subscriber.voip_usr_preferences*.value diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/load.yml b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/load.yml deleted file mode 100644 index 8c14467..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/load.yml +++ /dev/null @@ -1,44 +0,0 @@ -# load.yml: define which *contract* relations to fetch from db. - -#contracts.voip_subscribers: 1 -contracts.voip_subscribers: - include: !!perl/code | - { - my ($contract,$context) = @_; - #return 0 if $contract->{status} eq 'terminated'; - return 1; - } - - filter: !!perl/code | - { - my ($bill_subs,$context) = @_; - #_debug($context,"skipping terminated subscriber $bill_subs->{username}") if $bill_subs->{status} eq 'terminated'; - #return 0 if $bill_subs->{status} eq 'terminated'; - return 1; - } - - transform: !!perl/code | - { - my ($bill_subs,$context) = @_; - return $bill_subs; - } - -contracts.contact: 1 -contracts.voip_subscribers.primary_number: 1 -contracts.voip_subscribers.provisioning_voip_subscriber: 1 -contracts.voip_subscribers.provisioning_voip_subscriber.voip_dbaliases: 1 -contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences: 1 -contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.attribute: 1 -contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.allowed_ips: 1 -contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.ncos: 1 -contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.cf_mapping: 1 -contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.cf_mapping.destinations: 1 -contracts.voip_subscribers.provisioning_voip_subscriber.voicemail_users: 1 -#contracts.voip_subscribers.provisioning_voip_subscriber.voicemail_users.voicemail_spool: 1 -contracts.voip_subscribers.provisioning_voip_subscriber.voip_fax_preferences: 1 -contracts.voip_subscribers.provisioning_voip_subscriber.voip_fax_destinations: - transform: !!perl/code | - { - my ($fax_destinations,$context) = @_; - return [ map { $_->{destination} . ' (' . $_->{filetype} . ')'; } @$fax_destinations ]; - } \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl deleted file mode 100644 index 75118a4..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl +++ /dev/null @@ -1,321 +0,0 @@ -use strict; - -## no critic - -our $VERSION = "0.0"; - -use File::Basename; -use Cwd; -use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); - -use Getopt::Long qw(GetOptions); -use Fcntl qw(LOCK_EX LOCK_NB); - -use NGCP::BulkProcessor::Globals qw(); -use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( - update_settings - update_tabular_fields - update_graph_fields - $tabular_yml - $graph_yml - - update_load_recursive - get_export_filename - $customer_export_filename_format - $load_yml - - check_dry - $output_path - $defaultsettings - $defaultconfig - $dry - $skip_errors - $force - - $csv_header_line -); - -use NGCP::BulkProcessor::Logging qw( - init_log - getlogger - $attachmentlogfile - scriptinfo - cleanuplogfiles - $currentlogfile -); -use NGCP::BulkProcessor::LogError qw ( - completion - done - scriptwarn - scripterror - filewarn - fileerror -); -use NGCP::BulkProcessor::LoadConfig qw( - load_config - $SIMPLE_CONFIG_TYPE - $YAML_CONFIG_TYPE - $ANY_CONFIG_TYPE -); -use NGCP::BulkProcessor::Array qw(removeduplicates); -use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); -use NGCP::BulkProcessor::Mail qw( - cleanupmsgfiles -); - -use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs); -use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); - -use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw(destroy_all_dbs get_csv_db get_sqlite_db); - -use NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular qw(); - -use NGCP::BulkProcessor::Projects::ETL::Customer::ExportCustomers qw( - export_customers_graph - export_customers_tabular -); -#use NGCP::BulkProcessor::Projects::ETL::Customer::ImportCustomers qw( -# import_customers_json -#); - -scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; - -my @TASK_OPTS = (); - -my $tasks = []; - -my $cleanup_task_opt = 'cleanup'; -push(@TASK_OPTS,$cleanup_task_opt); - -my $cleanup_all_task_opt = 'cleanup_all'; -push(@TASK_OPTS,$cleanup_all_task_opt); - -my $export_customers_graph_task_opt = 'export_customers_graph'; -push(@TASK_OPTS,$export_customers_graph_task_opt); - -my $export_customers_tabular_task_opt = 'export_customers_tabular'; -push(@TASK_OPTS,$export_customers_tabular_task_opt); - -#my $import_customers_json_task_opt = 'import_customers_json'; -#push(@TASK_OPTS,$import_customers_json_task_opt); - -if (init()) { - main(); - exit(0); -} else { - exit(1); -} - -sub init { - - my $configfile = $defaultconfig; - my $settingsfile = $defaultsettings; - - return 0 unless GetOptions( - "config=s" => \$configfile, - "settings=s" => \$settingsfile, - "task=s" => $tasks, - "dry" => \$dry, - "skip-errors" => \$skip_errors, - "force" => \$force, - ); - - $tasks = removeduplicates($tasks,1); - - my $result = load_config($configfile); - init_log(); - $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); - $result &= load_config($tabular_yml,\&update_tabular_fields,$YAML_CONFIG_TYPE); - $result &= load_config($graph_yml,\&update_graph_fields,$YAML_CONFIG_TYPE); - $result &= load_config($load_yml,\&update_load_recursive,$YAML_CONFIG_TYPE); - return $result; - -} - -sub main() { - - my @messages = (); - my @attachmentfiles = (); - my $result = 1; - my $completion = 0; - - if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { - scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; - foreach my $task (@$tasks) { - - if (lc($cleanup_task_opt) eq lc($task)) { - $result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result); - } elsif (lc($cleanup_all_task_opt) eq lc($task)) { - $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result); - - } elsif (lc($export_customers_graph_task_opt) eq lc($task)) { - $result &= export_customers_graph_task(\@messages) if taskinfo($export_customers_graph_task_opt,$result); - $completion |= 1; - } elsif (lc($export_customers_tabular_task_opt) eq lc($task)) { - $result &= export_customers_tabular_task(\@messages) if taskinfo($export_customers_tabular_task_opt,$result); - $completion |= 1; - #} elsif (lc($import_customers_json_task_opt) eq lc($task)) { - # if (taskinfo($import_customers_json_task_opt,$result,1)) { - # next unless check_dry(); - # $result &= import_customers_json_task(\@messages); - # $completion |= 1; - # } - - } else { - $result = 0; - scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); - last; - } - } - } else { - $result = 0; - scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); - } - - push(@attachmentfiles,$attachmentlogfile); - if ($completion) { - completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); - } else { - done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); - } - - return $result; -} - -sub taskinfo { - my ($task,$result) = @_; - scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); - return $result; -} - -sub cleanup_task { - my ($messages,$clean_generated) = @_; - my $result = 0; - if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { - eval { - cleanupcvsdirs(); - cleanupdbfiles(); - cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); - cleanupmsgfiles(\&fileerror,\&filewarn); - cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; - $result = 1; - }; - } - if ($@ or !$result) { - push(@$messages,'working directory cleanup INCOMPLETE'); - return 0; - } else { - push(@$messages,'working directory folders cleaned up'); - return 1; - } -} - -sub export_customers_graph_task { - - my ($messages) = @_; - my ($result,$warning_count) = (0,0); - eval { - ($result,$warning_count) = export_customers_graph(); - }; - my $err = $@; - my $stats = ": $warning_count warnings"; - eval { - #$stats .= "\n total mta subscriber records: " . - # NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() . ' rows'; - #my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( - # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::added_delta - #); - #$stats .= "\n new: $added_count rows"; - #my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( - # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::updated_delta - #); - #$stats .= "\n existing: $existing_count rows"; - #my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( - # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::deleted_delta - #); - #$stats .= "\n removed: $deleted_count rows"; - }; - if ($err or !$result) { - push(@$messages,"exporting customers (graph) INCOMPLETE$stats"); - } else { - push(@$messages,"exporting customers (graph) completed$stats"); - } - destroy_all_dbs(); - return $result; - -} - -sub export_customers_tabular_task { - - my ($messages) = @_; - my ($result,$warning_count) = (0,0); - eval { - ($result,$warning_count) = export_customers_tabular(); - }; - my $err = $@; - my $stats = ": $warning_count warnings"; - eval { - $stats .= "\n total subscriber records: " . - NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta() . ' rows'; - my $added_count = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta( - $NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::added_delta - ); - $stats .= "\n new: $added_count rows"; - my $existing_count = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta( - $NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::updated_delta - ); - $stats .= "\n existing: $existing_count rows"; - my $deleted_count = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta( - $NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::deleted_delta - ); - $stats .= "\n removed: $deleted_count rows"; - my ($export_filename,$export_format) = get_export_filename($customer_export_filename_format); - if ('sqlite' eq $export_format) { - &get_sqlite_db()->copydbfile($export_filename); - } elsif ('csv' eq $export_format) { - NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::copy_table(\&get_csv_db); - &get_csv_db()->copytablefile(NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::gettablename(),$export_filename,not $csv_header_line); - } else { - push(@$messages,'invalid extension for output filename $export_filename'); - } - }; - if ($err or !$result) { - push(@$messages,"exporting customers (tabular) INCOMPLETE$stats"); - } else { - push(@$messages,"exporting customers (tabular) completed$stats"); - } - destroy_all_dbs(); - return $result; - -} - -#sub import_customers_json_task { -# -# my ($messages) = @_; -# my ($result,$warning_count,$contract_read_count,$subscriber_read_count,$contract_created_count,$subscriber_created_count,$contract_failed_count,$subscriber_failed_count) = (0,0,0,0,0,0,0,0); -# eval { -# ($result,$warning_count,$contract_read_count,$subscriber_read_count,$contract_created_count,$subscriber_created_count,$contract_failed_count,$subscriber_failed_count) = import_customers_json(); -# }; -# my $err = $@; -# my $stats = ": $warning_count warnings"; -# eval { -# $stats .= "\n contracts read: " . $contract_read_count; -# $stats .= "\n contracts created: " . $contract_created_count; -# $stats .= "\n contracts failed: " . $contract_failed_count; -# $stats .= "\n subscribers read: " . $subscriber_read_count; -# $stats .= "\n subscribers created: " . $subscriber_created_count; -# $stats .= "\n subscribers failed: " . $subscriber_failed_count; -# }; -# if ($err or !$result) { -# push(@$messages,"importing customers (json) INCOMPLETE$stats"); -# } else { -# push(@$messages,"importing customers (json) completed$stats"); -# } -# destroy_all_dbs(); -# return $result; -# -#} - -__DATA__ -This exists to allow the locking code at the beginning of the file to work. -DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg deleted file mode 100644 index 382c44b..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg +++ /dev/null @@ -1,74 +0,0 @@ -#dry=0 -#skip_errors=0 - -schema_version = Trunk - -export_customers_multithreading = 1 -export_customers_numofthreads = 4 -export_customers_blocksize = 1000 - -customer_export_filename=customer_%s.csv - -load_yml = load.yml -tabular_yml = tabular.yml -graph_yml = graph.yml -graph_fields_mode = whitelist - -csv_all_expected_fields = 0 - -#csv_eol = -csv_sep_char = , -csv_quote_char = " -csv_escape_char = " - -csv_header_line = 0; - - - - - - - - - - -sqlite_db_file = sqlite -csv_dir = customer -tabular_single_row_txn = 1 -ignore_tabular_unique = 0 - -#customer_import_filename=customer_20210216173615.json -#split_customers = 1 -#customer_import_multithreading = 1 -#customer_import_numofthreads = 4 -#customer_reseller_name = default -#customer_billing_profile_name = Default Billing Profile -#customer_domain = test1610072315.example.org -#customer_contact_email_format = DN0%2$s%3$s@example.org -#customer_timezone = Europe/Vienna -#subscriber_profile_set_name = subscriber_profile_1_set_65261 -#subscriber_profile_name = subscriber_profile_1_65261 -## sip username as webusername: -##webusername_format = %1$s -## webusername = cc+ac+sn: -##webusername_format = %2$s%3$s%4$s -## webusername = 0+ac+sn: -#webusername_format = 0%3$s%4$s -## sip username as external_id: -##subscriber_externalid_format = %1$s -## external_id = cc+ac+sn: -##subscriber_externalid_format = %2$s%3$s%4$s -## external_id = 0+ac+sn: -#subscriber_externalid_format = 0%3$s%4$s -## subscriber contact will be created, only if one of below is set. -#subscriber_contact_email_format = DN0%2$s%3$s@domain.org -#subscriber_timezone = Europe/Vienna - -#cf_default_priority: 1 -#cf_default_timeout: 300 -#cft_default_ringtimeout: 20 - -##write sql files for legacy db to set/unset the is_external pref of migrated subscribers: -# -#rollback_sql_export_filename_format = delete_subscribers_%s.sql -#rollback_sql_stmt_format = start transaction;call billing.remove_subscriber("%1$s",%2$s);commit; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular.yml b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular.yml deleted file mode 100644 index 7ea0796..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular.yml +++ /dev/null @@ -1,70 +0,0 @@ -# tabular.yml: define which *subscriber* columns to add tabular (.db/.csv) exports. - -- path: contract.id - transform: !!perl/code | - { - my ($id,$bill_subs) = @_; - return $id; - } - -- path: primary_number.cc -- path: primary_number.ac -- path: primary_number.sn -- path: provisioning_voip_subscriber.voicemail_users[0].attach -- path: provisioning_voip_subscriber.voicemail_users[0].delete -- path: provisioning_voip_subscriber.voicemail_users[0].email -- path: provisioning_voip_subscriber.voicemail_users[0].password -- path: provisioning_voip_subscriber.voip_usr_preferences.allowed_clis - sep: ',' - field: 'value' -- path: provisioning_voip_subscriber.voip_usr_preferences.allowed_ips_grp[0].allowed_ips - sep: ',' - field: 'ipnet' -- path: provisioning_voip_subscriber.voip_usr_preferences.block_out_list - sep: ',' - field: 'value' -- path: provisioning_voip_subscriber.voip_usr_preferences.block_out_mode[0].value -- path: provisioning_voip_subscriber.voip_usr_preferences.block_in_list - sep: ',' - field: 'value' -- path: provisioning_voip_subscriber.voip_usr_preferences.block_in_mode[0].value -- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_in_list - sep: ',' - field: 'value' -- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_in_mode[0].value -- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_out_list - sep: ',' - field: 'value' -- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_out_mode[0].value -- path: provisioning_voip_subscriber.voip_usr_preferences.ncos_id[0].ncos.level -- path: provisioning_voip_subscriber.voip_usr_preferences.adm_ncos_id[0].ncos.level -- path: provisioning_voip_subscriber.voip_usr_preferences.cfb[0].cf_mapping.destinations - sep: ',' - field: 'destination' -- path: provisioning_voip_subscriber.voip_usr_preferences.cfna[0].cf_mapping.destinations - sep: ',' - field: 'destination' -- path: provisioning_voip_subscriber.voip_usr_preferences.cfo[0].cf_mapping.destinations - sep: ',' - field: 'destination' -- path: provisioning_voip_subscriber.voip_usr_preferences.cfr[0].cf_mapping.destinations - sep: ',' - field: 'destination' -- path: provisioning_voip_subscriber.voip_usr_preferences.cfs[0].cf_mapping.destinations - sep: ',' - field: 'destination' -- path: provisioning_voip_subscriber.voip_usr_preferences.cft[0].cf_mapping.destinations - sep: ',' - field: 'destination' -- path: provisioning_voip_subscriber.voip_usr_preferences.cfu[0].cf_mapping.destinations - sep: ',' - field: 'destination' -- path: provisioning_voip_subscriber.voip_fax_preferences.active -- path: provisioning_voip_subscriber.voip_fax_preferences.ecm -- path: provisioning_voip_subscriber.voip_fax_preferences.name -- path: provisioning_voip_subscriber.voip_fax_preferences.t38 -- path: provisioning_voip_subscriber.voip_fax_destinations - sep: ',' -- path: provisioning_voip_subscriber.voip_usr_preferences.force_inbound_calls_to_peer[0].value -- path: provisioning_voip_subscriber.voip_usr_preferences.lnp_for_local_sub[0].value - diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Dao/lnp.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Dao/lnp.pm deleted file mode 100644 index 3af0c1c..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Dao/lnp.pm +++ /dev/null @@ -1,326 +0,0 @@ -package NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp; -use strict; - -## no critic - -use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw( - get_sqlite_db - destroy_all_dbs -); - -use NGCP::BulkProcessor::SqlProcessor qw( - registertableinfo - create_targettable - checktableinfo - copy_row - - insert_stmt - process_table -); - -use NGCP::BulkProcessor::SqlRecord qw(); - -require Exporter; -our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); -our @EXPORT_OK = qw( - create_table - gettablename - check_table - getinsertstatement - getupsertstatement - - @fieldnames - has_rows - - update_delta - countby_delta - - $deleted_delta - $updated_delta - $added_delta - - find_carriers_by_delta - - process_records -); - -my $tablename = 'lnp'; -my $get_db = \&get_sqlite_db; - -our @fieldnames = ( - 'carrier_name', - 'carrier_prefix', - 'number', - 'routing_number', - 'start', - 'end', - 'authoritative', - 'skip_rewrite', - 'type', - #calculated fields at the end! - #'rownum', - #'filenum', - #'filename', -); - -my $expected_fieldnames = [ - @fieldnames, - 'delta', -]; - -# table creation: -my $primarykey_fieldnames = [ 'number' ]; -my $indexes = { - #$tablename . '_number' => [ 'number(32)' ], - #$tablename . '_rownum' => [ 'rownum(11)' ], - $tablename . '_delta' => [ 'delta(7)' ], - $tablename . '_carrier_delta' => [ 'carrier_name(255)', 'carrier_prefix(32)', 'authoritative(1)', 'skip_rewrite(1)', 'delta(7)' ], -}; -#my $fixtable_statements = []; - -our $deleted_delta = 'DELETED'; -our $updated_delta = 'UPDATED'; -our $added_delta = 'ADDED'; - -sub new { - - my $class = shift; - my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, - $tablename,$expected_fieldnames,$indexes); - - copy_row($self,shift,$expected_fieldnames); - - return $self; - -} - -sub create_table { - - my ($truncate) = @_; - - my $db = &$get_db(); - - registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); - return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); - -} - -sub find_carriers_by_delta { - - my ($deltas,$load_recursive) = @_; - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - - my $stmt = ''; - my @params = (); - if (defined $deltas and 'HASH' eq ref $deltas) { - foreach my $in (keys %$deltas) { - my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); - $stmt .= ' AND ' if length($stmt); - $stmt .= $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; - push(@params,@values); - } - } elsif (defined $deltas and length($deltas) > 0) { - $stmt = $db->columnidentifier('delta') . ' = ?'; - push(@params,$deltas); - } - $stmt = ' WHERE ' . $stmt if length($stmt); - - $stmt = 'SELECT * FROM ' . $table . $stmt . ' GROUP BY ' - . $db->columnidentifier('carrier_name') - . ', ' . $db->columnidentifier('carrier_prefix') - . ', ' . $db->columnidentifier('authoritative') - . ', ' . $db->columnidentifier('skip_rewrite'); - - my $rows = $db->db_get_all_arrayref($stmt,@params); - - return buildrecords_fromrows($rows,$load_recursive); - -} - -sub update_delta { - - my ($number,$delta) = @_; - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - - my $stmt = 'UPDATE ' . $table . ' SET delta = ?'; - my @params = (); - push(@params,$delta); - if (defined $number) { - $stmt .= ' WHERE ' . $db->columnidentifier('number') . ' = ?'; - push(@params,$number); - } - - return $db->db_do($stmt,@params); - -} - -sub countby_delta { - - my ($deltas) = @_; - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - - my $stmt = ''; - my @params = (); - if (defined $deltas and 'HASH' eq ref $deltas) { - foreach my $in (keys %$deltas) { - my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); - $stmt .= ' AND ' if length($stmt); - $stmt .= $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; - push(@params,@values); - } - } elsif (defined $deltas and length($deltas) > 0) { - $stmt = $db->columnidentifier('delta') . ' = ?'; - push(@params,$deltas); - } - $stmt = ' WHERE ' . $stmt if length($stmt); - - $stmt = 'SELECT COUNT(*) FROM ' . $table . $stmt; - - return $db->db_get_value($stmt,@params); - -} - -sub has_rows { - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - - my $stmt = 'SELECT COUNT(1) FROM (SELECT 1 FROM ' . $table . ' LIMIT 1) AS q'; - - return $db->db_get_value($stmt); -} - -sub buildrecords_fromrows { - - my ($rows,$load_recursive) = @_; - - my @records = (); - my $record; - - if (defined $rows and ref $rows eq 'ARRAY') { - foreach my $row (@$rows) { - $record = __PACKAGE__->new($row); - - # transformations go here ... - - push @records,$record; - } - } - - return \@records; - -} - -sub process_records { - - my %params = @_; - my ($process_code, - $static_context, - $init_process_context_code, - $uninit_process_context_code, - $multithreading, - $numofthreads, - $deltas) = @params{qw/ - process_code - static_context - init_process_context_code - uninit_process_context_code - multithreading - numofthreads - deltas - /}; - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - - my @terms = (); - if (defined $deltas and 'HASH' eq ref $deltas) { - foreach my $in (keys %$deltas) { - my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); - push(@terms,$db->columnidentifier('delta') . ' ' . $in . ' ("' . join('","',@values) . '")'); - } - } elsif (defined $deltas and length($deltas) > 0) { - push(@terms,$db->columnidentifier('delta') . ' = "' . $deltas . '"'); - } - - return process_table( - get_db => $get_db, - class => __PACKAGE__, - process_code => sub { - my ($context,$rowblock,$row_offset) = @_; - return &$process_code($context,$rowblock,$row_offset); - }, - static_context => $static_context, - init_process_context_code => $init_process_context_code, - uninit_process_context_code => $uninit_process_context_code, - destroy_reader_dbs_code => \&destroy_all_dbs, - multithreading => $multithreading, - tableprocessing_threads => $numofthreads, - ((scalar @terms) ? ('select' => 'SELECT * FROM ' . $table . ' WHERE ' . join (' AND ',@terms)) : ()), - ((scalar @terms) ? ('selectcount' => 'SELECT COUNT(1) FROM ' . $table . ' WHERE ' . join (' AND ',@terms)) : ()), - ); -} - -sub carrier_hash { - my $self = shift; - return ($self->{carrier_name} // '') . '-' . ($self->{carrier_prefix} // '') - . '-' . ($self->{authoritative} // '') . '-' . ($self->{skip_rewrite} // ''); -} - -sub getinsertstatement { - - my ($insert_ignore) = @_; - check_table(); - return insert_stmt($get_db,__PACKAGE__,$insert_ignore); - -} - -sub getupsertstatement { - - check_table(); - my $db = &$get_db(); - my $table = $db->tableidentifier($tablename); - my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' . - join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')'; - my @values = (); - foreach my $fieldname (@$expected_fieldnames) { - if ('delta' eq $fieldname) { - my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' . - $db->columnidentifier('number') . ' = ?'; - push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')'); - } else { - push(@values,'?'); - } - } - $upsert_stmt .= ' VALUES (' . join(',',@values) . ')'; - return $upsert_stmt; - -} - -sub gettablename { - - return $tablename; - -} - -sub check_table { - - return checktableinfo($get_db, - __PACKAGE__,$tablename, - $expected_fieldnames, - $indexes); - -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/FileProcessors/NumbersFile.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/FileProcessors/NumbersFile.pm deleted file mode 100644 index f36394a..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/FileProcessors/NumbersFile.pm +++ /dev/null @@ -1,66 +0,0 @@ -package NGCP::BulkProcessor::Projects::ETL::Lnp::FileProcessors::NumbersFile; -use strict; - -## no critic - -use Encode qw(decode); - -use NGCP::BulkProcessor::Logging qw( - getlogger -); -use NGCP::BulkProcessor::LogError qw( - fileprocessingerror - fileprocessingwarn -); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw( - $expand_numbers_code -); - -use NGCP::BulkProcessor::FileProcessor; - -use NGCP::BulkProcessor::Array qw(contains); - -require Exporter; -our @ISA = qw(Exporter NGCP::BulkProcessor::FileProcessor); -our @EXPORT_OK = qw(); - -my $lineseparator = '\\r\\n|\\r|\\n|\\s'; #\\n\\r -my $default_encoding = 'UTF-8'; - -my $buffersize = 1000 * 1024; -my $threadqueuelength = 10; -my $default_numofthreads = 3; -#my $multithreading = 0; -my $blocksize = 100; - -sub new { - - my $class = shift; - - my $self = NGCP::BulkProcessor::FileProcessor->new(@_); - - $self->{numofthreads} = shift // $default_numofthreads; - $self->{line_separator} = $lineseparator; - $self->{field_separator} = undef; - $self->{encoding} = shift // $default_encoding; - $self->{buffersize} = $buffersize; - $self->{threadqueuelength} = $threadqueuelength; - #$self->{multithreading} = $multithreading; - $self->{blocksize} = $blocksize; - - bless($self,$class); - - #restdebug($self,__PACKAGE__ . ' file processor created',getlogger(__PACKAGE__)); - - return $self; - -} - -sub extractfields { - my ($context,$line_ref) = @_; - return $expand_numbers_code->($context,$$line_ref); - -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Import.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Import.pm deleted file mode 100644 index 5d9fb8f..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Import.pm +++ /dev/null @@ -1,190 +0,0 @@ -package NGCP::BulkProcessor::Projects::ETL::Lnp::Import; -use strict; - -## no critic - -use threads::shared qw(); - -#use Encode qw(); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw( - $import_multithreading - - $lnp_filename - $lnp_rownum_start - $lnp_import_numofthreads - $ignore_lnp_unique - $lnp_import_single_row_txn - - $expand_numbers_code - - $skip_errors - -); -use NGCP::BulkProcessor::Logging qw ( - getlogger - processing_info - processing_debug -); -use NGCP::BulkProcessor::LogError qw( - fileprocessingwarn - fileprocessingerror -); - -use NGCP::BulkProcessor::FileProcessors::CSVFileSimple qw(); -use NGCP::BulkProcessor::Projects::ETL::Lnp::FileProcessors::NumbersFile qw(); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw( - get_sqlite_db - destroy_all_dbs -); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp qw(); - -use NGCP::BulkProcessor::Utils qw(threadid trim); - -require Exporter; -our @ISA = qw(Exporter); -our @EXPORT_OK = qw( - load_file -); - -sub load_file { - - my $result = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::create_table(0); - - my $importer; - if (defined $expand_numbers_code) { - $importer = NGCP::BulkProcessor::Projects::ETL::Lnp::FileProcessors::NumbersFile->new($lnp_import_numofthreads); - } else { - $importer = NGCP::BulkProcessor::FileProcessors::CSVFileSimple->new($lnp_import_numofthreads); - } - - my $upsert = _lnp_reset_delta(); - - destroy_all_dbs(); #close all db connections before forking.. - my $warning_count :shared = 0; - return ($result && $importer->process( - file => $lnp_filename, - process_code => sub { - my ($context,$rows,$row_offset) = @_; - my $rownum = $row_offset; - $context->{lnp_rows} = []; - foreach my $row (@$rows) { - $rownum++; - next if (defined $lnp_rownum_start and $rownum < $lnp_rownum_start); - next if (scalar @$row) == 0; - #$row = [ map { local $_ = $_; trim($_); $_ =~ s/^"//; $_ =~ s/"$//r; } @$row ]; - my $record = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp->new([ - map { local $_ = $_; trim($_); $_ =~ s/^"//; $_ =~ s/"$//r; } @$row - ]); - #$record->{number} = $record->{cc} . $record->{ac} . $record->{sn}; - - my %r = %$record; my @row_ext = @r{@NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::fieldnames}; - if ($context->{upsert}) { - push(@row_ext,$record->{number}); - } else { - push(@row_ext,$NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta); - } - push(@{$context->{lnp_rows}},\@row_ext); - if ($lnp_import_single_row_txn and (scalar @{$context->{lnp_rows}}) > 0) { - while (defined (my $lnp_row = shift @{$context->{lnp_rows}})) { - if ($skip_errors) { - eval { _insert_lnp_rows($context,[$lnp_row]); }; - _warn($context,$@) if $@; - } else { - _insert_lnp_rows($context,[$lnp_row]); - } - } - } - } - - if (not $lnp_import_single_row_txn and (scalar @{$context->{lnp_rows}}) > 0) { - if ($skip_errors) { - eval { _insert_lnp_rows($context,$context->{lnp_rows}); }; - _warn($context,$@) if $@; - } else { - _insert_lnp_rows($context,$context->{lnp_rows}); - } - } - return 1; - }, - init_process_context_code => sub { - my ($context)= @_; - $context->{db} = &get_sqlite_db(); - $context->{upsert} = $upsert; - $context->{error_count} = 0; - $context->{warning_count} = 0; - }, - uninit_process_context_code => sub { - my ($context)= @_; - undef $context->{db}; - destroy_all_dbs(); - { - lock $warning_count; - $warning_count += $context->{warning_count}; - } - }, - multithreading => $import_multithreading, - ),$warning_count); - -} - -sub _insert_lnp_rows { - my ($context,$lnp_rows) = @_; - $context->{db}->db_do_begin(($context->{upsert} ? - NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::getupsertstatement() - : NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::getinsertstatement($ignore_lnp_unique)), - ); - eval { - $context->{db}->db_do_rowblock($lnp_rows); - $context->{db}->db_finish(); - }; - my $err = $@; - if ($err) { - eval { - $context->{db}->db_finish(1); - }; - die($err); - } -} - -sub _lnp_reset_delta { - my $upsert = 0; - if (NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::has_rows()) { - processing_info(threadid(),'resetting delta of ' . - NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::update_delta(undef, - $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta) . - ' lnp records',getlogger(__PACKAGE__)); - $upsert |= 1; - } - return $upsert; -} - -sub _error { - - my ($context,$message) = @_; - $context->{error_count} = $context->{error_count} + 1; - fileprocessingerror($context->{filename},$message,getlogger(__PACKAGE__)); - -} - -sub _warn { - - my ($context,$message) = @_; - $context->{warning_count} = $context->{warning_count} + 1; - fileprocessingwarn($context->{filename},$message,getlogger(__PACKAGE__)); - -} - -sub _info { - - my ($context,$message,$debug) = @_; - if ($debug) { - processing_debug($context->{tid},$message,getlogger(__PACKAGE__)); - } else { - processing_info($context->{tid},$message,getlogger(__PACKAGE__)); - } -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProcessLnp.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProcessLnp.pm deleted file mode 100644 index 1b1dab1..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProcessLnp.pm +++ /dev/null @@ -1,322 +0,0 @@ -package NGCP::BulkProcessor::Projects::ETL::Lnp::ProcessLnp; -use strict; - -## no critic - -use threads::shared qw(); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw( - - $skip_errors - - $create_lnp_multithreading - $create_lnp_numofthreads - - $delete_lnp_multithreading - $delete_lnp_numofthreads - - $ignore_lnp_numbers_unique - $lnp_numbers_single_row_txn - - $lnp_numbers_batch_delete -); - -use NGCP::BulkProcessor::Logging qw ( - getlogger - processing_info - processing_debug -); -use NGCP::BulkProcessor::LogError qw( - rowprocessingerror - rowprocessingwarn -); - -use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw(); -use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw(); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp qw(); - -use NGCP::BulkProcessor::ConnectorPool qw( - get_xa_db -); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw( - get_sqlite_db - destroy_all_dbs - ping_all_dbs -); - -use NGCP::BulkProcessor::Utils qw(threadid); - -require Exporter; -our @ISA = qw(Exporter); -our @EXPORT_OK = qw( - create_lnp_numbers - delete_lnp_numbers -); - -sub create_lnp_numbers { - - my $static_context = {}; - my $result = _create_lnp_numbers_checks($static_context); - - destroy_all_dbs(); - my $warning_count :shared = 0; - my $result = $result && NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::process_records( - static_context => $static_context, - deltas => $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta, - process_code => sub { - my ($context,$records,$row_offset) = @_; - ping_all_dbs(); - foreach my $row (@$records) { - my $lnp = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp->new($row); - my $lnp_number = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers->new({ %$lnp }); - my $lnp_provider = $context->{carrier_map}->{ - $lnp->carrier_hash() - }; - $lnp_number->{lnp_provider_id} = $lnp_provider->{id}; - - my %r = %$lnp_number; my @row_ext = @r{@NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::fieldnames}; - - push(@{$context->{lnp_numbers}},\@row_ext); - if ($lnp_numbers_single_row_txn and (scalar @{$context->{lnp_numbers}}) > 0) { - while (defined (my $lnp_number = shift @{$context->{lnp_numbers}})) { - if ($skip_errors) { - eval { _insert_lnp_numbers($context,[$lnp_number]); }; - _warn($context,$@) if $@; - } else { - _insert_lnp_numbers($context,[$lnp_number]); - } - } - } - } - - if (not $lnp_numbers_single_row_txn and (scalar @{$context->{lnp_numbers}}) > 0) { - if ($skip_errors) { - eval { _insert_lnp_numbers($context,$context->{lnp_numbers}); }; - _warn($context,$@) if $@; - } else { - _insert_lnp_numbers($context,$context->{lnp_numbers}); - } - } - return 1; - }, - init_process_context_code => sub { - my ($context)= @_; - $context->{db} = &get_xa_db(); - $context->{error_count} = 0; - $context->{warning_count} = 0; - $context->{lnp_numbers} = []; - }, - uninit_process_context_code => sub { - my ($context)= @_; - undef $context->{db}; - destroy_all_dbs(); - { - lock $warning_count; - $warning_count += $context->{warning_count}; - } - }, - multithreading => $create_lnp_multithreading, - numofthreads => $create_lnp_numofthreads, - ); - - return ($result,$warning_count); - -} - - -sub _create_lnp_numbers_checks { - - my $context = shift; - my $result = 1; - - $context->{carrier_map} = {}; - my $carriers = []; - eval { - $carriers = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::find_carriers_by_delta($NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta); - }; - if ($@) { - $result = 0; #even in skip-error mode.. - } else { - foreach my $carrier (@$carriers) { - my $lp = { - name => $carrier->{carrier_name}, - prefix => ($carrier->{carrier_prefix} // ''), - authoritative => ($carrier->{authoritative} // 0), - skip_rewrite => ($carrier->{skip_rewrite} // 0), - }; - my $lnp_provider = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_nameprefixauthoritativeskiprewrite( - $lp->{name}, - $lp->{prefix}, - $lp->{authoritative}, - $lp->{skip_rewrite}, - )->[0]; - if ($lnp_provider) { - processing_info(threadid(),"lnp provider '$lnp_provider->{name}' found",getlogger(__PACKAGE__)); - } else { - $lnp_provider = { %$lp }; - $lnp_provider->{id} = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::insert_row(undef,$lp); - processing_info(threadid(),"lnp provider '$lnp_provider->{name}' created",getlogger(__PACKAGE__)); - } - $context->{carrier_map}->{ - $carrier->carrier_hash() - } = $lnp_provider; - } - } - - return $result; -} - - -sub _insert_lnp_numbers { - my ($context,$lnp_numbers) = @_; - $context->{db}->db_do_begin( - NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::getinsertstatement($ignore_lnp_numbers_unique), - ); - eval { - $context->{db}->db_do_rowblock($lnp_numbers); - $context->{db}->db_finish(); - }; - my $err = $@; - if ($err) { - eval { - $context->{db}->db_finish(1); - }; - die($err); - } -} - - -sub delete_lnp_numbers { - - my $static_context = {}; - my $result = 1; - - destroy_all_dbs(); - my $warning_count :shared = 0; - my $result = $result && NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::process_records( - static_context => $static_context, - deltas => $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta, - process_code => sub { - my ($context,$records,$row_offset) = @_; - ping_all_dbs(); - foreach my $row (@$records) { - my $lnp = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp->new($row); - push(@{$context->{numbers}},$lnp->{number}); - if (not $lnp_numbers_batch_delete and (scalar @{$context->{numbers}}) > 0) { - while (defined (my $number = shift @{$context->{numbers}})) { - if ($skip_errors) { - eval { - NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},$number); - }; - } else { - NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},$number); - } - } - } - } - - if ($lnp_numbers_batch_delete and (scalar @{$context->{numbers}}) > 0) { - if ($skip_errors) { - eval { - NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},{ - 'IN' => $context->{numbers}, - }); - }; - } else { - NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},{ - 'IN' => $context->{numbers}, - }); - } - } - return 1; - }, - init_process_context_code => sub { - my ($context)= @_; - $context->{db} = &get_xa_db(); - $context->{error_count} = 0; - $context->{warning_count} = 0; - $context->{numbers} = []; - }, - uninit_process_context_code => sub { - my ($context)= @_; - undef $context->{db}; - destroy_all_dbs(); - { - lock $warning_count; - $warning_count += $context->{warning_count}; - } - }, - multithreading => $create_lnp_multithreading, - numofthreads => $create_lnp_numofthreads, - ) && _delete_lnp_providers($static_context); - - return ($result,$warning_count); - -} - - -sub _delete_lnp_providers { - - my $context = shift; - my $result = 1; - - my $carriers = []; - eval { - $carriers = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::find_carriers_by_delta($NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta); - }; - if ($@) { - $result = 0; #even in skip-error mode.. - } else { - foreach my $carrier (@$carriers) { - my $lp = { - name => $carrier->{carrier_name}, - prefix => ($carrier->{carrier_prefix} // ''), - authoritative => ($carrier->{authoritative} // 0), - skip_rewrite => ($carrier->{skip_rewrite} // 0), - }; - foreach my $lnp_provider (@{NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_nameprefixauthoritativeskiprewrite( - $lp->{name}, - $lp->{prefix}, - $lp->{authoritative}, - $lp->{skip_rewrite}, - )}) { - NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::delete_row(undef,$lnp_provider); - processing_info(threadid(),"lnp provider '$lnp_provider->{name}' removed",getlogger(__PACKAGE__)); - } - } - } - - return $result; -} - - -sub _error { - - my ($context,$message) = @_; - $context->{error_count} = $context->{error_count} + 1; - rowprocessingerror($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); - -} - -sub _warn { - - my ($context,$message) = @_; - $context->{warning_count} = $context->{warning_count} + 1; - rowprocessingwarn($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); - -} - -sub _info { - - my ($context,$message,$debug) = @_; - if ($debug) { - processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); - } else { - processing_info($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); - } - -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProjectConnectorPool.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProjectConnectorPool.pm deleted file mode 100644 index 318c8b1..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProjectConnectorPool.pm +++ /dev/null @@ -1,92 +0,0 @@ -package NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool; -use strict; - -## no critic - -use File::Basename; -use Cwd; -use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../'); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw( - $sqlite_db_file -); - -use NGCP::BulkProcessor::ConnectorPool qw( - get_connectorinstancename - ping -); - -use NGCP::BulkProcessor::SqlConnectors::MySQLDB; -use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode); -#use NGCP::BulkProcessor::RestConnectors::NGCPRestApi; - -use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); - -require Exporter; -our @ISA = qw(Exporter); -our @EXPORT_OK = qw( - - get_sqlite_db - sqlite_db_tableidentifier - - destroy_dbs - destroy_all_dbs - - ping_dbs - ping_all_dbs -); - -my $sqlite_dbs = {}; - -sub get_sqlite_db { - - my ($instance_name,$reconnect) = @_; - my $name = get_connectorinstancename($instance_name); #threadid(); #shift; - - if (not defined $sqlite_dbs->{$name}) { - $sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); #$name); - if (not defined $reconnect) { - $reconnect = 1; - } - } - if ($reconnect) { - $sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file); - } - - return $sqlite_dbs->{$name}; - -} - -sub sqlite_db_tableidentifier { - - my ($get_target_db,$tablename) = @_; - my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; - return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file)); - -} - -sub ping_dbs { - -} - -sub ping_all_dbs { - ping_dbs(); - NGCP::BulkProcessor::ConnectorPool::ping_dbs(); -} - -sub destroy_dbs { - - foreach my $name (keys %$sqlite_dbs) { - cleartableinfo($sqlite_dbs->{$name}); - undef $sqlite_dbs->{$name}; - delete $sqlite_dbs->{$name}; - } - -} - -sub destroy_all_dbs() { - destroy_dbs(); - NGCP::BulkProcessor::ConnectorPool::destroy_dbs(); -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Settings.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Settings.pm deleted file mode 100644 index 610755e..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Settings.pm +++ /dev/null @@ -1,218 +0,0 @@ -package NGCP::BulkProcessor::Projects::ETL::Lnp::Settings; -use strict; - -## no critic - -use threads::shared qw(); - -use File::Basename qw(fileparse); - -use NGCP::BulkProcessor::Globals qw( - $working_path - $enablemultithreading - $cpucount - create_path -); - -use NGCP::BulkProcessor::Logging qw( - getlogger - scriptinfo - configurationinfo -); - -use NGCP::BulkProcessor::LogError qw( - fileerror - filewarn - configurationwarn - configurationerror -); - -use NGCP::BulkProcessor::LoadConfig qw( - split_tuple - parse_regexp -); -use NGCP::BulkProcessor::Utils qw(prompt); - -use NGCP::BulkProcessor::Array qw(contains); - -require Exporter; -our @ISA = qw(Exporter); -our @EXPORT_OK = qw( - update_settings - - $sqlite_db_file - - check_dry - - $input_path - - $defaultsettings - $defaultconfig - - $dry - $skip_errors - $force - - $import_multithreading - - $lnp_filename - $lnp_rownum_start - $lnp_import_numofthreads - $ignore_lnp_unique - $lnp_import_single_row_txn - - $expand_numbers_code - - $create_lnp_multithreading - $create_lnp_numofthreads - - $delete_lnp_multithreading - $delete_lnp_numofthreads - - $ignore_lnp_numbers_unique - $lnp_numbers_single_row_txn - - $lnp_numbers_batch_delete -); - -our $defaultconfig = 'config.cfg'; -our $defaultsettings = 'settings.yml'; - -our $input_path = $working_path . 'input/'; - -our $force = 0; -our $dry = 0; -our $skip_errors = 0; - -our $sqlite_db_file = 'sqlite'; - -our $import_multithreading = 1; - -our $lnp_filename = undef; -our $lnp_rownum_start = 2; -our $lnp_import_numofthreads = $cpucount; -our $ignore_lnp_unique = 0; -our $lnp_import_single_row_txn = 0; -our $expand_numbers_code = undef; - -our $create_lnp_multithreading = 1; -our $create_lnp_numofthreads = $cpucount; - -our $delete_lnp_multithreading = 1; -our $delete_lnp_numofthreads = $cpucount; - -our $ignore_lnp_numbers_unique = 0; -our $lnp_numbers_single_row_txn = 0; - -our $lnp_numbers_batch_delete = 1; - -sub update_settings { - - my ($data,$configfile) = @_; - - if (defined $data) { - - my $result = 1; - my $regexp_result; - - #&$configurationinfocode("testinfomessage",$configlogger); - - $result &= _prepare_working_paths(1); - - $sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file}; - - $lnp_filename = _get_import_filename($lnp_filename,$data,'lnp_filename'); - unless ($lnp_filename and -e $lnp_filename) { - configurationerror($configfile,"invalid lnp filename",getlogger(__PACKAGE__)); - } - $lnp_rownum_start = $data->{lnp_rownum_start} if exists $data->{lnp_rownum_start}; - $lnp_import_single_row_txn = $data->{lnp_import_single_row_txn} if exists $data->{lnp_import_single_row_txn}; - $ignore_lnp_unique = $data->{ignore_lnp_unique} if exists $data->{ignore_lnp_unique}; - - $import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading}; - $lnp_import_numofthreads = _get_numofthreads($lnp_import_numofthreads,$data,'lnp_import_numofthreads'); - - $dry = $data->{dry} if exists $data->{dry}; - $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; - - $expand_numbers_code = $data->{expand_numbers} if exists $data->{expand_numbers}; - if (defined $expand_numbers_code and 'CODE' ne ref $expand_numbers_code) { - configurationerror($configfile,"expand_numbers coderef required",getlogger(__PACKAGE__)); - } - - $create_lnp_multithreading = $data->{create_lnp_multithreading} if exists $data->{create_lnp_multithreading}; - $create_lnp_numofthreads = _get_numofthreads($create_lnp_numofthreads,$data,'create_lnp_numofthreads'); - - $delete_lnp_multithreading = $data->{delete_lnp_multithreading} if exists $data->{delete_lnp_multithreading}; - $delete_lnp_numofthreads = _get_numofthreads($delete_lnp_numofthreads,$data,'delete_lnp_numofthreads'); - - $ignore_lnp_numbers_unique = $data->{ignore_lnp_numbers_unique} if exists $data->{ignore_lnp_numbers_unique}; - $lnp_numbers_single_row_txn = $data->{lnp_numbers_single_row_txn} if exists $data->{lnp_numbers_single_row_txn}; - - $lnp_numbers_batch_delete = $data->{lnp_numbers_batch_delete} if exists $data->{lnp_numbers_batch_delete}; - - return $result; - - } - return 0; - -} - -sub _prepare_working_paths { - - my ($create) = @_; - my $result = 1; - my $path_result; - - ($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__)); - $result &= $path_result; - - return $result; - -} - -sub _get_numofthreads { - my ($default_value,$data,$key) = @_; - my $numofthreads = $default_value; - $numofthreads = $data->{$key} if exists $data->{$key}; - $numofthreads = $cpucount if $numofthreads > $cpucount; - return $numofthreads; -} - -sub _get_sqlite_db_file { - my ($run,$name) = @_; - return ((defined $run and length($run) > 0) ? $run . '_' : '') . $name; -} - -sub _get_import_filename { - my ($old_value,$data,$key) = @_; - my $import_filename = $old_value; - $import_filename = $data->{$key} if exists $data->{$key}; - if (defined $import_filename and length($import_filename) > 0) { - $import_filename = $input_path . $import_filename unless -e $import_filename; - } - return $import_filename; -} - -sub check_dry { - - if ($dry) { - scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__)); - return 1; - } else { - scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__)); - if (!$force) { - if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) { - return 1; - } else { - return 0; - } - } else { - scriptinfo('force option applied',getlogger(__PACKAGE__)); - return 1; - } - } - -} - -1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/config.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/config.cfg deleted file mode 100644 index 7e74fbb..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/config.cfg +++ /dev/null @@ -1,62 +0,0 @@ -##general settings: -working_path = /home/rkrenn/temp/lnp -cpucount = 4 -enablemultithreading = 1 - -##gearman/service listener config: -jobservers = 127.0.0.1:4730 - -##NGCP MySQL connectivity - "accounting" db: -accounting_host = 192.168.0.96 -accounting_port = 3306 -accounting_databasename = accounting -accounting_username = root -accounting_password = - -##NGCP MySQL connectivity - "billing" db: -billing_host = 192.168.0.96 -billing_port = 3306 -billing_databasename = billing -billing_username = root -billing_password = - -##NGCP MySQL connectivity - "provisioning" db: -provisioning_host = 192.168.0.96 -provisioning_port = 3306 -provisioning_databasename = provisioning -provisioning_username = root -provisioning_password = - -##NGCP MySQL connectivity - "kamailio" db: -kamailio_host = 192.168.0.96 -kamailio_port = 3306 -kamailio_databasename = kamailio -kamailio_username = root -kamailio_password = - -##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: -xa_host = 192.168.0.96 -xa_port = 3306 -xa_databasename = ngcp -xa_username = root -xa_password = - -##NGCP REST-API connectivity: -ngcprestapi_uri = https://127.0.0.1:1443 -ngcprestapi_username = administrator -ngcprestapi_password = administrator -ngcprestapi_realm = api_admin_http - -##sending email: -emailenable = 0 -erroremailrecipient = -warnemailrecipient = -completionemailrecipient = rkrenn@sipwise.com -doneemailrecipient = - -##logging: -fileloglevel = INFO -#DEBUG -screenloglevel = INFO -#INFO -emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/process.pl b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/process.pl deleted file mode 100644 index d31d8e6..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/process.pl +++ /dev/null @@ -1,291 +0,0 @@ -use strict; - -## no critic - -use File::Basename; -use Cwd; -use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); - -use Getopt::Long qw(GetOptions); -use Fcntl qw(LOCK_EX LOCK_NB); - -use NGCP::BulkProcessor::Globals qw(); -use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw( - update_settings - - check_dry - - $defaultsettings - $defaultconfig - $dry - $skip_errors - $force - -); - -use NGCP::BulkProcessor::Logging qw( - init_log - getlogger - $attachmentlogfile - scriptinfo - cleanuplogfiles - $currentlogfile -); -use NGCP::BulkProcessor::LogError qw ( - completion - done - scriptwarn - scripterror - filewarn - fileerror -); -use NGCP::BulkProcessor::LoadConfig qw( - load_config - $SIMPLE_CONFIG_TYPE - $YAML_CONFIG_TYPE - $ANY_CONFIG_TYPE -); -use NGCP::BulkProcessor::Array qw(removeduplicates); -use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); -use NGCP::BulkProcessor::Mail qw( - cleanupmsgfiles -); - -use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp qw(); -use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw(); -use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw(); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw(destroy_all_dbs); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::Import qw( - load_file -); - -use NGCP::BulkProcessor::Projects::ETL::Lnp::ProcessLnp qw( - create_lnp_numbers - delete_lnp_numbers -); - -scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet - -my @TASK_OPTS = (); - -my $tasks = []; - -my $cleanup_task_opt = 'cleanup'; -push(@TASK_OPTS,$cleanup_task_opt); - -my $load_file_task_opt = 'load_file'; -push(@TASK_OPTS,$load_file_task_opt); - -my $create_lnp_task_opt = 'create_lnp'; -push(@TASK_OPTS,$create_lnp_task_opt); - -my $delete_lnp_task_opt = 'delete_lnp'; -push(@TASK_OPTS,$delete_lnp_task_opt); - -if (init()) { - main(); - exit(0); -} else { - exit(1); -} - -sub init { - - my $configfile = $defaultconfig; - my $settingsfile = $defaultsettings; - - return 0 unless GetOptions( - "config=s" => \$configfile, - "settings=s" => \$settingsfile, - "task=s" => $tasks, - "dry" => \$dry, - "skip-errors" => \$skip_errors, - "force" => \$force, - ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); - - $tasks = removeduplicates($tasks,1); - - my $result = load_config($configfile); - init_log(); - $result &= load_config($settingsfile,\&update_settings,$YAML_CONFIG_TYPE); - return $result; - -} - -sub main() { - - my @messages = (); - my @attachmentfiles = (); - my $result = 1; - my $completion = 0; - - if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { - scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; - foreach my $task (@$tasks) { - - if (lc($cleanup_task_opt) eq lc($task)) { - $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_task_opt,$result); - - } elsif (lc($load_file_task_opt) eq lc($task)) { - $result &= load_file_task(\@messages) if taskinfo($load_file_task_opt,$result); - - } elsif (lc($create_lnp_task_opt) eq lc($task)) { - if (taskinfo($create_lnp_task_opt,$result,1)) { - next unless check_dry(); - $result &= create_lnp_task(\@messages); - $completion |= 1; - } - - } elsif (lc($delete_lnp_task_opt) eq lc($task)) { - if (taskinfo($delete_lnp_task_opt,$result,1)) { - next unless check_dry(); - $result &= delete_lnp_task(\@messages); - $completion |= 1; - } - - } else { - $result = 0; - scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); - last; - } - } - } else { - $result = 0; - scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); - } - - push(@attachmentfiles,$attachmentlogfile); - if ($completion) { - completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); - } else { - done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); - } - - return $result; -} - -sub taskinfo { - my ($task,$result) = @_; - scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); - return $result; -} - -sub cleanup_task { - my ($messages,$clean_generated) = @_; - my $result = 0; - if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { - eval { - #cleanupcvsdirs() if $clean_generated; - cleanupdbfiles() if $clean_generated; - cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); - cleanupmsgfiles(\&fileerror,\&filewarn); - #cleanupcertfiles(); - #cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; - $result = 1; - }; - } - if ($@ or !$result) { - #print $@; - push(@$messages,'working directory cleanup INCOMPLETE'); - return 0; - } else { - push(@$messages,'working directory folders cleaned up'); - return 1; - } -} - -sub load_file_task { - - my ($messages) = @_; - my ($result,$warning_count) = (0,0); - eval { - ($result,$warning_count) = load_file(); - }; - #print $@; - my $err = $@; - my $stats = ": $warning_count warnings"; - eval { - $stats .= "\n total file LNP records: " . - NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta() . ' rows'; - my $added_count = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta( - $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta - ); - $stats .= "\n new: $added_count rows"; - my $existing_count = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta( - $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::updated_delta - ); - $stats .= "\n existing: $existing_count rows"; - my $deleted_count = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta( - $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta - ); - $stats .= "\n removed: $deleted_count rows"; - }; - if ($err or !$result) { - push(@$messages,"loading LNP file INCOMPLETE$stats"); - } else { - push(@$messages,"loading LNP file completed$stats"); - } - destroy_all_dbs(); #every task should leave with closed connections. - return $result; - -} - -sub create_lnp_task { - - my ($messages) = @_; - my ($result,$warning_count) = (0,0); - eval { - ($result,$warning_count) = create_lnp_numbers(); - }; - #print $@; - my $err = $@; - my $stats = ": $warning_count warnings"; - eval { - $stats .= "\n total mariadb LNP providers: " . - NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::countby_name() . ' rows'; - $stats .= "\n total mariadb LNP numbers: " . - NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::countby_lnpproviderid_number() . ' rows'; - }; - if ($err or !$result) { - push(@$messages,"creating LNP numbers INCOMPLETE$stats"); - } else { - push(@$messages,"creating LNP numbers completed$stats"); - } - destroy_all_dbs(); #every task should leave with closed connections. - return 1; #$result; - -} - -sub delete_lnp_task { - - my ($messages) = @_; - my ($result,$warning_count) = (0,0); - eval { - ($result,$warning_count) = delete_lnp_numbers(); - }; - #print $@; - my $err = $@; - my $stats = ": $warning_count warnings"; - eval { - $stats .= "\n total mariadb LNP providers: " . - NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::countby_name() . ' rows'; - $stats .= "\n total mariadb LNP numbers: " . - NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::countby_lnpproviderid_number() . ' rows'; - }; - if ($err or !$result) { - push(@$messages,"deleting LNP numbers INCOMPLETE$stats"); - } else { - push(@$messages,"deleting LNP numbers completed$stats"); - } - destroy_all_dbs(); #every task should leave with closed connections. - return 1; #$result; - -} - -__DATA__ -This exists to allow the locking code at the beginning of the file to work. -DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/settings.yml b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/settings.yml deleted file mode 100644 index 5cf97c0..0000000 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/settings.yml +++ /dev/null @@ -1,65 +0,0 @@ - -#dry=0 -#skip_errors=0 - -import_multithreading: 1 -#lnp_import_numofthreads: - -sqlite_db_file: sqlite - -lnp_filename: ip_telekom.txt -#012_20190201233214.txt -#ip_telekom_delete.txt -#test.csv -lnp_rownum_start: 2 -lnp_import_single_row_txn: 0 -ignore_lnp_unique: 0 - -expand_numbers: !!perl/code | - { - my ($context,$row) = @_; - if ($row =~ /^Linha\d+=([^=]*)$/i) { - my ( - $DonorID, - $HolderID, - $TypeOfNumber, - $PABXMainTelephoneNumber, - $FirstTelephoneNumber, - $LastTelephoneNumber, - $PresentNRN, - $DateTimeFrom - ) = split(/,/,$1); - - my @lnp_numbers = (); - #020,012,0,212879000,212879000,212879999,D012001,2016-08-31 14:20:18 - unless ($PresentNRN =~ /^D012[0-9]{3,3}$/) { - foreach my $number ($FirstTelephoneNumber .. $LastTelephoneNumber) { - push(@lnp_numbers,[ - $PresentNRN, #'carrier_name', - $PresentNRN, #'carrier_prefix', - $number, #'number', - undef, #'routing_number', - $DateTimeFrom, #'start', - undef, #'end', - undef, #'authoritative', - undef, #'skip_rewrite', - undef, #'type', - ]); - } - } - return \@lnp_numbers; - } - - return []; - } - -create_lnp_multithreading: 1 -#create_lnp_numofthreads: 2 - -delete_lnp_multithreading: 1 -#delete_lnp_numofthreads: 2 - -ignore_lnp_numbers_unique: 0 -lnp_numbers_single_row_txn: 0 - -lnp_numbers_batch_delete: 1 \ No newline at end of file