From 7dbb015f27c8961dc1510697582e931553b51c52 Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Wed, 6 Sep 2017 08:16:38 +0200 Subject: [PATCH] TT#21083 implement teletek importer #3 + allow to specify multiple subscriber files to import + allow to specify multiple allowed_cli files to import Change-Id: I80f83542f541a0d8857d477158dc043cfabb93ba --- .../Teletek/FileProcessors/CSVFile.pm | 10 + .../Projects/Migration/Teletek/Import.pm | 298 +++++++++--------- .../Projects/Migration/Teletek/Settings.pm | 28 +- .../Projects/Migration/Teletek/process.pl | 8 +- .../Projects/Migration/Teletek/settings.cfg | 5 +- 5 files changed, 188 insertions(+), 161 deletions(-) diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/FileProcessors/CSVFile.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/FileProcessors/CSVFile.pm index bda57db..9d38aa5 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/FileProcessors/CSVFile.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/FileProcessors/CSVFile.pm @@ -66,6 +66,16 @@ sub new { } +sub process { + + my $self = shift; + + $self->{line_number} = 1; + + return $self->SUPER::process(@_); + +} + sub extractlines { my ($context,$buffer_ref,$lines) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm index 5d78999..ee39146 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm @@ -51,11 +51,13 @@ our @EXPORT_OK = qw( sub import_subscriber { - my ($file) = @_; + my (@files) = @_; my $result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::create_table(0); - $result &= _import_subscriber_checks($file); + foreach my $file (@files) { + $result &= _import_subscriber_checks($file); + } my $importer = NGCP::BulkProcessor::Projects::Migration::Teletek::FileProcessors::CSVFile->new($subscriber_import_numofthreads); @@ -63,108 +65,112 @@ sub import_subscriber { destroy_all_dbs(); #close all db connections before forking.. my $warning_count :shared = 0; - return ($result && $importer->process( - file => $file, - process_code => sub { - my ($context,$rows,$row_offset) = @_; - my $rownum = $row_offset; - my @subscriber_rows = (); - foreach my $row (@$rows) { - $rownum++; - next if (scalar @$row) == 0; - my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($row); - $record->{rownum} = $rownum; - my @subscriber_row; - my %r; - if ($subscriber_import_unfold_ranges and $record->{sn} =~ /\.+$/) { - #if ($record->{sn} == '2861..') { - #print "x"; - #} - my $pow = scalar (() = $record->{sn} =~ /\./g); - _warn($context,"number range $record->{sn} results in " . 10**$pow . ' numbers') if $pow > 2; - $record->{sn} =~ s/\.+$//g; - $record->{range} = 0; - %r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]); - my $base_sn = $record->{sn}; - %r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames}; - if ($context->{upsert}) { - push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn}); - } else { - push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta); - } - push(@subscriber_rows, [@subscriber_row]); - for (my $i = 0; $i < 10**$pow; $i++) { - $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($record); - #@subscriber_row = @$row; - $record->{sn} = $base_sn . zerofill($i,$pow); - $record->{range} = 1; + foreach my $file (@files) { + $result &= $importer->process( + file => $file, + process_code => sub { + my ($context,$rows,$row_offset) = @_; + my $rownum = $row_offset; + my @subscriber_rows = (); + foreach my $row (@$rows) { + $rownum++; + next if (scalar @$row) == 0; + my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($row); + $record->{rownum} = $rownum; + my @subscriber_row; + my %r; + if ($subscriber_import_unfold_ranges and $record->{sn} =~ /\.+$/) { + #if ($record->{sn} == '2861..') { + #print "x"; + #} + my $pow = scalar (() = $record->{sn} =~ /\./g); + _warn($context,"number range $record->{sn} results in " . 10**$pow . ' numbers') if $pow > 2; + $record->{sn} =~ s/\.+$//g; + $record->{range} = 0; + %r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]); + my $base_sn = $record->{sn}; %r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames}; if ($context->{upsert}) { push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn}); } else { push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta); } - push(@subscriber_rows,[@subscriber_row]); - } - #if ($base_sn == '2861') { - #print "x"; - #last; - #} - - } else { - $record->{range} = 0; - %r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]); - %r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames}; - if ($context->{upsert}) { - push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn}); + push(@subscriber_rows, [@subscriber_row]); + for (my $i = 0; $i < 10**$pow; $i++) { + $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($record); + #@subscriber_row = @$row; + $record->{sn} = $base_sn . zerofill($i,$pow); + $record->{range} = 1; + %r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames}; + if ($context->{upsert}) { + push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn}); + } else { + push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta); + } + push(@subscriber_rows,[@subscriber_row]); + } + #if ($base_sn == '2861') { + #print "x"; + #last; + #} + } else { - push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta); + $record->{range} = 0; + %r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]); + %r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames}; + if ($context->{upsert}) { + push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn}); + } else { + push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta); + } + push(@subscriber_rows,\@subscriber_row); } - push(@subscriber_rows,\@subscriber_row); } - } - if ((scalar @subscriber_rows) > 0) { - if ($subscriber_import_single_row_txn) { - foreach my $subscriber_row (@subscriber_rows) { + if ((scalar @subscriber_rows) > 0) { + if ($subscriber_import_single_row_txn) { + foreach my $subscriber_row (@subscriber_rows) { + if ($skip_errors) { + eval { _insert_subscriber_rows($context,[$subscriber_row]); }; + _warn($context,$@) if $@; + } else { + _insert_subscriber_rows($context,[$subscriber_row]); + } + } + } else { if ($skip_errors) { - eval { _insert_subscriber_rows($context,[$subscriber_row]); }; + eval { _insert_subscriber_rows($context,\@subscriber_rows); }; _warn($context,$@) if $@; } else { - _insert_subscriber_rows($context,[$subscriber_row]); + _insert_subscriber_rows($context,\@subscriber_rows); } } - } else { - if ($skip_errors) { - eval { _insert_subscriber_rows($context,\@subscriber_rows); }; - _warn($context,$@) if $@; - } else { - _insert_subscriber_rows($context,\@subscriber_rows); - } } - } -#use Data::Dumper; -#print Dumper(\@subscriber_rows); - return 1; - }, - init_process_context_code => sub { - my ($context)= @_; - $context->{db} = &get_import_db(); # keep ref count low.. - $context->{upsert} = $upsert; - $context->{error_count} = 0; - $context->{warning_count} = 0; - }, - uninit_process_context_code => sub { - my ($context)= @_; - undef $context->{db}; - destroy_all_dbs(); - { - lock $warning_count; - $warning_count += $context->{warning_count}; - } - }, - multithreading => $import_multithreading - ),$warning_count); + #use Data::Dumper; + #print Dumper(\@subscriber_rows); + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_import_db(); # keep ref count low.. + $context->{upsert} = $upsert; + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + multithreading => $import_multithreading + ); + } + + return ($result,$warning_count); } @@ -242,11 +248,13 @@ sub _insert_subscriber_rows { sub import_allowedcli { - my ($file) = @_; + my (@files) = @_; my $result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::create_table(0); - $result &= _import_allowedcli_checks($file); + foreach my $file (@files) { + $result &= _import_allowedcli_checks($file); + } my $importer = NGCP::BulkProcessor::Projects::Migration::Teletek::FileProcessors::CSVFile->new($allowedcli_import_numofthreads); @@ -254,67 +262,71 @@ sub import_allowedcli { destroy_all_dbs(); #close all db connections before forking.. my $warning_count :shared = 0; - return ($result && $importer->process( - file => $file, - process_code => sub { - my ($context,$rows,$row_offset) = @_; - my $rownum = $row_offset; - my @allowedcli_rows = (); - foreach my $row (@$rows) { - $rownum++; - next if (scalar @$row) == 0; - my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli->new($row); - $record->{rownum} = $rownum; - my %r = %$record; my @allowedcli_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::fieldnames}; - if ($context->{upsert}) { - push(@allowedcli_row,$record->{cc},$record->{ac},$record->{sn}); - } else { - push(@allowedcli_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::added_delta); + foreach my $file (@files) { + $result &= $importer->process( + file => $file, + process_code => sub { + my ($context,$rows,$row_offset) = @_; + my $rownum = $row_offset; + my @allowedcli_rows = (); + foreach my $row (@$rows) { + $rownum++; + next if (scalar @$row) == 0; + my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli->new($row); + $record->{rownum} = $rownum; + my %r = %$record; my @allowedcli_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::fieldnames}; + if ($context->{upsert}) { + push(@allowedcli_row,$record->{cc},$record->{ac},$record->{sn}); + } else { + push(@allowedcli_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::added_delta); + } + push(@allowedcli_rows,\@allowedcli_row); } - push(@allowedcli_rows,\@allowedcli_row); - } - if ((scalar @allowedcli_rows) > 0) { - if ($allowedcli_import_single_row_txn) { - foreach my $allowedcli_row (@allowedcli_rows) { + if ((scalar @allowedcli_rows) > 0) { + if ($allowedcli_import_single_row_txn) { + foreach my $allowedcli_row (@allowedcli_rows) { + if ($skip_errors) { + eval { _insert_allowedcli_rows($context,[$allowedcli_row]); }; + _warn($context,$@) if $@; + } else { + _insert_allowedcli_rows($context,[$allowedcli_row]); + } + } + } else { if ($skip_errors) { - eval { _insert_allowedcli_rows($context,[$allowedcli_row]); }; + eval { _insert_allowedcli_rows($context,\@allowedcli_rows); }; _warn($context,$@) if $@; } else { - _insert_allowedcli_rows($context,[$allowedcli_row]); + _insert_allowedcli_rows($context,\@allowedcli_rows); } } - } else { - if ($skip_errors) { - eval { _insert_allowedcli_rows($context,\@allowedcli_rows); }; - _warn($context,$@) if $@; - } else { - _insert_allowedcli_rows($context,\@allowedcli_rows); - } } - } -#use Data::Dumper; -#print Dumper(\@subscriber_rows); - return 1; - }, - init_process_context_code => sub { - my ($context)= @_; - $context->{db} = &get_import_db(); # keep ref count low.. - $context->{upsert} = $upsert; - $context->{error_count} = 0; - $context->{warning_count} = 0; - }, - uninit_process_context_code => sub { - my ($context)= @_; - undef $context->{db}; - destroy_all_dbs(); - { - lock $warning_count; - $warning_count += $context->{warning_count}; - } - }, - multithreading => $import_multithreading - ),$warning_count); + #use Data::Dumper; + #print Dumper(\@subscriber_rows); + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_import_db(); # keep ref count low.. + $context->{upsert} = $upsert; + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + multithreading => $import_multithreading + ); + } + + return ($result,$warning_count); } diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm index e5706b2..736b007 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm @@ -47,14 +47,14 @@ our @EXPORT_OK = qw( $force $import_db_file - $subscriber_filename + @subscriber_filenames $subscriber_import_numofthreads $ignore_subscriber_unique $subscriber_import_single_row_txn $subscriber_import_unfold_ranges - $allowedcli_filename + @allowedcli_filenames $allowedcli_import_numofthreads $ignore_allowedcli_unique $allowedcli_import_single_row_txn @@ -101,13 +101,13 @@ our $run_id = ''; our $import_db_file = _get_import_db_file($run_id,'import'); our $import_multithreading = $enablemultithreading; -our $subscriber_filename = undef; +our @subscriber_filenames = (); our $subscriber_import_numofthreads = $cpucount; our $ignore_subscriber_unique = 0; our $subscriber_import_single_row_txn = 1; our $subscriber_import_unfold_ranges = 1; -our $allowedcli_filename = undef; +our @allowedcli_filenames = (); our $allowedcli_import_numofthreads = $cpucount; our $ignore_allowedcli_unique = 0; our $allowedcli_import_single_row_txn = 1; @@ -158,13 +158,13 @@ sub update_settings { $import_db_file = _get_import_db_file($run_id,'import'); $import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading}; - $subscriber_filename = _get_import_filename($subscriber_filename,$data,'subscriber_filename'); + @subscriber_filenames = _get_import_filenames(\@subscriber_filenames,$data,'subscriber_filenames'); $subscriber_import_numofthreads = _get_numofthreads($cpucount,$data,'subscriber_import_numofthreads'); $ignore_subscriber_unique = $data->{ignore_subscriber_unique} if exists $data->{ignore_subscriber_unique}; $subscriber_import_single_row_txn = $data->{subscriber_import_single_row_txn} if exists $data->{subscriber_import_single_row_txn}; $subscriber_import_unfold_ranges = $data->{subscriber_import_unfold_ranges} if exists $data->{subscriber_import_unfold_ranges}; - $allowedcli_filename = _get_import_filename($allowedcli_filename,$data,'allowedcli_filename'); + @allowedcli_filenames = _get_import_filenames(\@allowedcli_filenames,$data,'allowedcli_filenames'); $allowedcli_import_numofthreads = _get_numofthreads($cpucount,$data,'allowedcli_import_numofthreads'); $ignore_allowedcli_unique = $data->{ignore_allowedcli_unique} if exists $data->{ignore_allowedcli_unique}; $allowedcli_import_single_row_txn = $data->{allowedcli_import_single_row_txn} if exists $data->{allowedcli_import_single_row_txn}; @@ -249,14 +249,18 @@ sub _get_import_db_file { return ((defined $run and length($run) > 0) ? $run . '_' : '') . $name; } -sub _get_import_filename { +sub _get_import_filenames { my ($old_value,$data,$key) = @_; - my $import_filename = $old_value; - $import_filename = $data->{$key} if exists $data->{$key}; - if (defined $import_filename and length($import_filename) > 0) { - $import_filename = $input_path . $import_filename unless -e $import_filename; + my @import_filenames = @$old_value; + @import_filenames = split_tuple($data->{$key}) if exists $data->{$key}; + my @result = (); + foreach my $import_filename (@import_filenames) { + if (defined $import_filename and length($import_filename) > 0) { + $import_filename = $input_path . $import_filename unless -e $import_filename; + push(@result,$import_filename); + } } - return $import_filename; + return @result; } sub check_dry { diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl index 800e3f0..0a8e7c9 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl @@ -20,10 +20,10 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( $skip_errors $force $run_id - $subscriber_filename + @subscriber_filenames - $allowedcli_filename + @allowedcli_filenames ); #$allowed_ips @@ -313,7 +313,7 @@ sub import_subscriber_task { my ($messages) = @_; my ($result,$warning_count) = (0,0); eval { - ($result,$warning_count) = import_subscriber($subscriber_filename); + ($result,$warning_count) = import_subscriber(@subscriber_filenames); }; my $err = $@; my $stats = ($skip_errors ? ": $warning_count warnings" : ''); @@ -375,7 +375,7 @@ sub import_allowedcli_task { my ($messages) = @_; my ($result,$warning_count) = (0,0); eval { - ($result,$warning_count) = import_allowedcli($allowedcli_filename); + ($result,$warning_count) = import_allowedcli(@allowedcli_filenames); }; my $err = $@; my $stats = ($skip_errors ? ": $warning_count warnings" : ''); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg index 61a6f49..a4864fa 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg @@ -7,7 +7,8 @@ import_multithreading = 0 #subscriber_filename = /home/rkrenn/test/teletek/teletek_format_test1.txt #subscriber_filename = /home/rkrenn/test/teletek/teletek_format_test2.txt #subscriber_filename = /home/rkrenn/test/teletek/export_kundinfo_leica2.csv -subscriber_filename = /home/rkrenn/temp/teletek/export_nummer_sip3.csv +#subscriber_filename = /home/rkrenn/temp/teletek/export_nummer_sip3.csv +subscriber_filenames = /home/rkrenn/temp/teletek/export_kundinfo_leica.170823.csv,/home/rkrenn/temp/teletek/export_nummer_sip3.170904.csv subscriber_import_numofthreads = 2 ignore_subscriber_unique = 1 subscriber_import_single_row_txn = 1 @@ -16,7 +17,7 @@ subscriber_import_unfold_ranges = 1 #allowedcli_filename = /home/rkrenn/test/teletek/export_multiple_DID_Leica.csv #allowedcli_filename = /home/rkrenn/temp/teletek/export_screeningOnly_170824.csv -allowedcli_filename = /home/rkrenn/temp/teletek/export_MultipleDID_170823.csv +allowedcli_filenames = /home/rkrenn/temp/teletek/export_MultipleDID_170823.csv allowedcli_import_numofthreads = 2 ignore_allowedcli_unique = 1 allowedcli_import_single_row_txn = 1