TT#21083 implement teletek importer #3

+ allow to specify multiple subscriber files to import
+ allow to specify multiple allowed_cli files to import

Change-Id: I80f83542f541a0d8857d477158dc043cfabb93ba
changes/27/15227/1
Rene Krenn 8 years ago
parent 53f32134cd
commit 7dbb015f27

@ -66,6 +66,16 @@ sub new {
}
sub process {
my $self = shift;
$self->{line_number} = 1;
return $self->SUPER::process(@_);
}
sub extractlines {
my ($context,$buffer_ref,$lines) = @_;

@ -51,11 +51,13 @@ our @EXPORT_OK = qw(
sub import_subscriber {
my ($file) = @_;
my (@files) = @_;
my $result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::create_table(0);
$result &= _import_subscriber_checks($file);
foreach my $file (@files) {
$result &= _import_subscriber_checks($file);
}
my $importer = NGCP::BulkProcessor::Projects::Migration::Teletek::FileProcessors::CSVFile->new($subscriber_import_numofthreads);
@ -63,108 +65,112 @@ sub import_subscriber {
destroy_all_dbs(); #close all db connections before forking..
my $warning_count :shared = 0;
return ($result && $importer->process(
file => $file,
process_code => sub {
my ($context,$rows,$row_offset) = @_;
my $rownum = $row_offset;
my @subscriber_rows = ();
foreach my $row (@$rows) {
$rownum++;
next if (scalar @$row) == 0;
my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($row);
$record->{rownum} = $rownum;
my @subscriber_row;
my %r;
if ($subscriber_import_unfold_ranges and $record->{sn} =~ /\.+$/) {
#if ($record->{sn} == '2861..') {
#print "x";
#}
my $pow = scalar (() = $record->{sn} =~ /\./g);
_warn($context,"number range $record->{sn} results in " . 10**$pow . ' numbers') if $pow > 2;
$record->{sn} =~ s/\.+$//g;
$record->{range} = 0;
%r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]);
my $base_sn = $record->{sn};
%r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames};
if ($context->{upsert}) {
push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta);
}
push(@subscriber_rows, [@subscriber_row]);
for (my $i = 0; $i < 10**$pow; $i++) {
$record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($record);
#@subscriber_row = @$row;
$record->{sn} = $base_sn . zerofill($i,$pow);
$record->{range} = 1;
foreach my $file (@files) {
$result &= $importer->process(
file => $file,
process_code => sub {
my ($context,$rows,$row_offset) = @_;
my $rownum = $row_offset;
my @subscriber_rows = ();
foreach my $row (@$rows) {
$rownum++;
next if (scalar @$row) == 0;
my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($row);
$record->{rownum} = $rownum;
my @subscriber_row;
my %r;
if ($subscriber_import_unfold_ranges and $record->{sn} =~ /\.+$/) {
#if ($record->{sn} == '2861..') {
#print "x";
#}
my $pow = scalar (() = $record->{sn} =~ /\./g);
_warn($context,"number range $record->{sn} results in " . 10**$pow . ' numbers') if $pow > 2;
$record->{sn} =~ s/\.+$//g;
$record->{range} = 0;
%r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]);
my $base_sn = $record->{sn};
%r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames};
if ($context->{upsert}) {
push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta);
}
push(@subscriber_rows,[@subscriber_row]);
}
#if ($base_sn == '2861') {
#print "x";
#last;
#}
} else {
$record->{range} = 0;
%r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]);
%r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames};
if ($context->{upsert}) {
push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn});
push(@subscriber_rows, [@subscriber_row]);
for (my $i = 0; $i < 10**$pow; $i++) {
$record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($record);
#@subscriber_row = @$row;
$record->{sn} = $base_sn . zerofill($i,$pow);
$record->{range} = 1;
%r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames};
if ($context->{upsert}) {
push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta);
}
push(@subscriber_rows,[@subscriber_row]);
}
#if ($base_sn == '2861') {
#print "x";
#last;
#}
} else {
push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta);
$record->{range} = 0;
%r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]);
%r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames};
if ($context->{upsert}) {
push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta);
}
push(@subscriber_rows,\@subscriber_row);
}
push(@subscriber_rows,\@subscriber_row);
}
}
if ((scalar @subscriber_rows) > 0) {
if ($subscriber_import_single_row_txn) {
foreach my $subscriber_row (@subscriber_rows) {
if ((scalar @subscriber_rows) > 0) {
if ($subscriber_import_single_row_txn) {
foreach my $subscriber_row (@subscriber_rows) {
if ($skip_errors) {
eval { _insert_subscriber_rows($context,[$subscriber_row]); };
_warn($context,$@) if $@;
} else {
_insert_subscriber_rows($context,[$subscriber_row]);
}
}
} else {
if ($skip_errors) {
eval { _insert_subscriber_rows($context,[$subscriber_row]); };
eval { _insert_subscriber_rows($context,\@subscriber_rows); };
_warn($context,$@) if $@;
} else {
_insert_subscriber_rows($context,[$subscriber_row]);
_insert_subscriber_rows($context,\@subscriber_rows);
}
}
} else {
if ($skip_errors) {
eval { _insert_subscriber_rows($context,\@subscriber_rows); };
_warn($context,$@) if $@;
} else {
_insert_subscriber_rows($context,\@subscriber_rows);
}
}
}
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_import_db(); # keep ref count low..
$context->{upsert} = $upsert;
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
multithreading => $import_multithreading
),$warning_count);
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_import_db(); # keep ref count low..
$context->{upsert} = $upsert;
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
multithreading => $import_multithreading
);
}
return ($result,$warning_count);
}
@ -242,11 +248,13 @@ sub _insert_subscriber_rows {
sub import_allowedcli {
my ($file) = @_;
my (@files) = @_;
my $result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::create_table(0);
$result &= _import_allowedcli_checks($file);
foreach my $file (@files) {
$result &= _import_allowedcli_checks($file);
}
my $importer = NGCP::BulkProcessor::Projects::Migration::Teletek::FileProcessors::CSVFile->new($allowedcli_import_numofthreads);
@ -254,67 +262,71 @@ sub import_allowedcli {
destroy_all_dbs(); #close all db connections before forking..
my $warning_count :shared = 0;
return ($result && $importer->process(
file => $file,
process_code => sub {
my ($context,$rows,$row_offset) = @_;
my $rownum = $row_offset;
my @allowedcli_rows = ();
foreach my $row (@$rows) {
$rownum++;
next if (scalar @$row) == 0;
my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli->new($row);
$record->{rownum} = $rownum;
my %r = %$record; my @allowedcli_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::fieldnames};
if ($context->{upsert}) {
push(@allowedcli_row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@allowedcli_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::added_delta);
foreach my $file (@files) {
$result &= $importer->process(
file => $file,
process_code => sub {
my ($context,$rows,$row_offset) = @_;
my $rownum = $row_offset;
my @allowedcli_rows = ();
foreach my $row (@$rows) {
$rownum++;
next if (scalar @$row) == 0;
my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli->new($row);
$record->{rownum} = $rownum;
my %r = %$record; my @allowedcli_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::fieldnames};
if ($context->{upsert}) {
push(@allowedcli_row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@allowedcli_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::added_delta);
}
push(@allowedcli_rows,\@allowedcli_row);
}
push(@allowedcli_rows,\@allowedcli_row);
}
if ((scalar @allowedcli_rows) > 0) {
if ($allowedcli_import_single_row_txn) {
foreach my $allowedcli_row (@allowedcli_rows) {
if ((scalar @allowedcli_rows) > 0) {
if ($allowedcli_import_single_row_txn) {
foreach my $allowedcli_row (@allowedcli_rows) {
if ($skip_errors) {
eval { _insert_allowedcli_rows($context,[$allowedcli_row]); };
_warn($context,$@) if $@;
} else {
_insert_allowedcli_rows($context,[$allowedcli_row]);
}
}
} else {
if ($skip_errors) {
eval { _insert_allowedcli_rows($context,[$allowedcli_row]); };
eval { _insert_allowedcli_rows($context,\@allowedcli_rows); };
_warn($context,$@) if $@;
} else {
_insert_allowedcli_rows($context,[$allowedcli_row]);
_insert_allowedcli_rows($context,\@allowedcli_rows);
}
}
} else {
if ($skip_errors) {
eval { _insert_allowedcli_rows($context,\@allowedcli_rows); };
_warn($context,$@) if $@;
} else {
_insert_allowedcli_rows($context,\@allowedcli_rows);
}
}
}
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_import_db(); # keep ref count low..
$context->{upsert} = $upsert;
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
multithreading => $import_multithreading
),$warning_count);
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_import_db(); # keep ref count low..
$context->{upsert} = $upsert;
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
multithreading => $import_multithreading
);
}
return ($result,$warning_count);
}

@ -47,14 +47,14 @@ our @EXPORT_OK = qw(
$force
$import_db_file
$subscriber_filename
@subscriber_filenames
$subscriber_import_numofthreads
$ignore_subscriber_unique
$subscriber_import_single_row_txn
$subscriber_import_unfold_ranges
$allowedcli_filename
@allowedcli_filenames
$allowedcli_import_numofthreads
$ignore_allowedcli_unique
$allowedcli_import_single_row_txn
@ -101,13 +101,13 @@ our $run_id = '';
our $import_db_file = _get_import_db_file($run_id,'import');
our $import_multithreading = $enablemultithreading;
our $subscriber_filename = undef;
our @subscriber_filenames = ();
our $subscriber_import_numofthreads = $cpucount;
our $ignore_subscriber_unique = 0;
our $subscriber_import_single_row_txn = 1;
our $subscriber_import_unfold_ranges = 1;
our $allowedcli_filename = undef;
our @allowedcli_filenames = ();
our $allowedcli_import_numofthreads = $cpucount;
our $ignore_allowedcli_unique = 0;
our $allowedcli_import_single_row_txn = 1;
@ -158,13 +158,13 @@ sub update_settings {
$import_db_file = _get_import_db_file($run_id,'import');
$import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading};
$subscriber_filename = _get_import_filename($subscriber_filename,$data,'subscriber_filename');
@subscriber_filenames = _get_import_filenames(\@subscriber_filenames,$data,'subscriber_filenames');
$subscriber_import_numofthreads = _get_numofthreads($cpucount,$data,'subscriber_import_numofthreads');
$ignore_subscriber_unique = $data->{ignore_subscriber_unique} if exists $data->{ignore_subscriber_unique};
$subscriber_import_single_row_txn = $data->{subscriber_import_single_row_txn} if exists $data->{subscriber_import_single_row_txn};
$subscriber_import_unfold_ranges = $data->{subscriber_import_unfold_ranges} if exists $data->{subscriber_import_unfold_ranges};
$allowedcli_filename = _get_import_filename($allowedcli_filename,$data,'allowedcli_filename');
@allowedcli_filenames = _get_import_filenames(\@allowedcli_filenames,$data,'allowedcli_filenames');
$allowedcli_import_numofthreads = _get_numofthreads($cpucount,$data,'allowedcli_import_numofthreads');
$ignore_allowedcli_unique = $data->{ignore_allowedcli_unique} if exists $data->{ignore_allowedcli_unique};
$allowedcli_import_single_row_txn = $data->{allowedcli_import_single_row_txn} if exists $data->{allowedcli_import_single_row_txn};
@ -249,14 +249,18 @@ sub _get_import_db_file {
return ((defined $run and length($run) > 0) ? $run . '_' : '') . $name;
}
sub _get_import_filename {
sub _get_import_filenames {
my ($old_value,$data,$key) = @_;
my $import_filename = $old_value;
$import_filename = $data->{$key} if exists $data->{$key};
if (defined $import_filename and length($import_filename) > 0) {
$import_filename = $input_path . $import_filename unless -e $import_filename;
my @import_filenames = @$old_value;
@import_filenames = split_tuple($data->{$key}) if exists $data->{$key};
my @result = ();
foreach my $import_filename (@import_filenames) {
if (defined $import_filename and length($import_filename) > 0) {
$import_filename = $input_path . $import_filename unless -e $import_filename;
push(@result,$import_filename);
}
}
return $import_filename;
return @result;
}
sub check_dry {

@ -20,10 +20,10 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
$skip_errors
$force
$run_id
$subscriber_filename
@subscriber_filenames
$allowedcli_filename
@allowedcli_filenames
);
#$allowed_ips
@ -313,7 +313,7 @@ sub import_subscriber_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = import_subscriber($subscriber_filename);
($result,$warning_count) = import_subscriber(@subscriber_filenames);
};
my $err = $@;
my $stats = ($skip_errors ? ": $warning_count warnings" : '');
@ -375,7 +375,7 @@ sub import_allowedcli_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = import_allowedcli($allowedcli_filename);
($result,$warning_count) = import_allowedcli(@allowedcli_filenames);
};
my $err = $@;
my $stats = ($skip_errors ? ": $warning_count warnings" : '');

@ -7,7 +7,8 @@ import_multithreading = 0
#subscriber_filename = /home/rkrenn/test/teletek/teletek_format_test1.txt
#subscriber_filename = /home/rkrenn/test/teletek/teletek_format_test2.txt
#subscriber_filename = /home/rkrenn/test/teletek/export_kundinfo_leica2.csv
subscriber_filename = /home/rkrenn/temp/teletek/export_nummer_sip3.csv
#subscriber_filename = /home/rkrenn/temp/teletek/export_nummer_sip3.csv
subscriber_filenames = /home/rkrenn/temp/teletek/export_kundinfo_leica.170823.csv,/home/rkrenn/temp/teletek/export_nummer_sip3.170904.csv
subscriber_import_numofthreads = 2
ignore_subscriber_unique = 1
subscriber_import_single_row_txn = 1
@ -16,7 +17,7 @@ subscriber_import_unfold_ranges = 1
#allowedcli_filename = /home/rkrenn/test/teletek/export_multiple_DID_Leica.csv
#allowedcli_filename = /home/rkrenn/temp/teletek/export_screeningOnly_170824.csv
allowedcli_filename = /home/rkrenn/temp/teletek/export_MultipleDID_170823.csv
allowedcli_filenames = /home/rkrenn/temp/teletek/export_MultipleDID_170823.csv
allowedcli_import_numofthreads = 2
ignore_allowedcli_unique = 1
allowedcli_import_single_row_txn = 1

Loading…
Cancel
Save