diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm index 9bb5e72c..7cd27cd6 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm @@ -43,6 +43,7 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber q use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::CallForward qw(); +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Resellers qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Domains qw(); @@ -141,6 +142,9 @@ sub check_import_db_tables { ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::CallForward'); $result &= $check_result; push(@$messages,$message); + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration'); + $result &= $check_result; push(@$messages,$message); + return $result; } diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Registration.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Registration.pm new file mode 100644 index 00000000..5cacc3e0 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Registration.pm @@ -0,0 +1,282 @@ +package NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration; +use strict; + +## no critic + +use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw( + get_import_db + destroy_all_dbs +); +#import_db_tableidentifier + +use NGCP::BulkProcessor::SqlProcessor qw( + registertableinfo + create_targettable + checktableinfo + copy_row + + insert_stmt + + +); +#process_table +use NGCP::BulkProcessor::SqlRecord qw(); + +#use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + create_table + gettablename + check_table + getinsertstatement + getupsertstatement + + @fieldnames + + findby_sipusername + countby_sipcontact + + update_delta + findby_delta + countby_delta + + $deleted_delta + $updated_delta + $added_delta + +); + +my $tablename = 'registration'; +my $get_db = \&get_import_db; +#my $get_tablename = \&import_db_tableidentifier; + +our @fieldnames = ( + "sip_username", + "domain", + "sip_contact", + + #calculated fields at the end! + 'rownum', + 'filename', +); + +my $expected_fieldnames = [ + @fieldnames, + 'delta', +]; + +# table creation: +my $primarykey_fieldnames = [ 'sip_username' ]; +my $indexes = { + $tablename . '_rownum' => [ 'rownum(11)' ], + $tablename . '_delta' => [ 'delta(7)' ], +}; +#my $fixtable_statements = []; + +our $deleted_delta = 'DELETED'; +our $updated_delta = 'UPDATED'; +our $added_delta = 'ADDED'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub create_table { + + my ($truncate) = @_; + + my $db = &$get_db(); + + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); + +} + +sub findby_delta { + + my ($delta,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless defined $delta; + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('delta') . ' = ?' + ,$delta); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_sipusername { + + my ($sip_username,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + #return [] unless (defined $cc or defined $ac or defined $sn); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('sip_username') . ' = ?' + ,$sip_username); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub countby_sipcontact { + + my ($sip_contact) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + if (defined $sip_contact) { + $stmt .= ' WHERE ' . + $db->columnidentifier('sip_contact') . ' = ?'; + push(@params,$sip_contact); + } + + return $db->db_get_value($stmt,@params); + +} + +sub update_delta { + + my ($sip_username,$delta) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'UPDATE ' . $table . ' SET delta = ?'; + my @params = (); + push(@params,$delta); + if (defined $sip_username) { + $stmt .= ' WHERE ' . + $db->columnidentifier('sip_username') . ' = ?'; + push(@params,$sip_username); + } + + return $db->db_do($stmt,@params); + +} + + +sub countby_delta { + + my ($deltas) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1'; + my @params = (); + if (defined $deltas and 'HASH' eq ref $deltas) { + foreach my $in (keys %$deltas) { + my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $deltas and length($deltas) > 0) { + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?'; + push(@params,$deltas); + } + + return $db->db_get_value($stmt,@params); + +} + + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + + +sub getinsertstatement { + + my ($insert_ignore) = @_; + check_table(); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); + +} + +sub getupsertstatement { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' . + join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')'; + my @values = (); + foreach my $fieldname (@$expected_fieldnames) { + if ('delta' eq $fieldname) { + my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' . + $db->columnidentifier('sip_username') . ' = ?'; + push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')'); + } else { + push(@values,'?'); + } + } + $upsert_stmt .= ' VALUES (' . join(',',@values) . ')'; + return $upsert_stmt; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm index 802196eb..94972d90 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm @@ -25,6 +25,10 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( $ignore_callforward_unique $callforward_import_single_row_txn + $registration_import_numofthreads + $ignore_registration_unique + $registration_import_single_row_txn + $skip_errors ); use NGCP::BulkProcessor::Logging qw ( @@ -59,6 +63,7 @@ our @EXPORT_OK = qw( import_allowedcli import_clir import_callforward + import_registration ); sub import_subscriber { @@ -602,14 +607,6 @@ sub _insert_clir_rows { } } - - - - - - - - sub import_callforward { my (@files) = @_; @@ -643,7 +640,7 @@ sub import_callforward { $record->{sn} //= ''; $record->{rownum} = $rownum; $record->{filename} = $file; - + if (my $subscriber = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) { $record->{sip_username} = $subscriber->{sip_username}; } elsif (my $allowedcli = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) { @@ -779,6 +776,161 @@ sub _insert_callforward_rows { +sub import_registration { + + my (@files) = @_; + + my $result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::create_table(0); + + foreach my $file (@files) { + $result &= _import_registration_checks($file); + } + + my $importer = NGCP::BulkProcessor::Projects::Migration::Teletek::FileProcessors::CSVFile->new($registration_import_numofthreads); + + my $upsert = _import_registration_reset_delta(); + + destroy_all_dbs(); #close all db connections before forking.. + my $warning_count :shared = 0; + foreach my $file (@files) { + $result &= $importer->process( + file => $file, + process_code => sub { + my ($context,$rows,$row_offset) = @_; + my $rownum = $row_offset; + my @registration_rows = (); + foreach my $row (@$rows) { + $rownum++; + next if (scalar @$row) == 0; + $row = [ map { local $_ = $_; trim($_); } @$row ]; + my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration->new($row); + $record->{rownum} = $rownum; + $record->{filename} = $file; + + if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) { + if ($skip_errors) { + _warn($context,"sip username $record->{sip_username} not found"); + } else { + _error($context,"sip username $record->{sip_username} not found"); + } + next; + } + # prevent db's unique constraint violation: + if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::findby_sipusername($record->{sip_username})) { + if ($skip_errors) { + _warn($context,"duplicate sip username $record->{sip_username}"); + } else { + _error($context,"duplicate sip username $record->{sip_username}"); + } + next; + } + + my %r = %$record; my @registration_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::fieldnames}; + if ($context->{upsert}) { + push(@registration_row,$record->{sip_username}); + } else { + push(@registration_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::added_delta); + } + push(@registration_rows,\@registration_row); + if ($registration_import_single_row_txn and (scalar @registration_rows) > 0) { + while (defined (my $registration_row = shift @registration_rows)) { + if ($skip_errors) { + eval { _insert_registration_rows($context,[$registration_row]); }; + _warn($context,$@) if $@; + } else { + _insert_registration_rows($context,[$registration_row]); + } + } + } + } + + if (not $registration_import_single_row_txn and (scalar @registration_rows) > 0) { + if ($skip_errors) { + eval { _insert_registration_rows($context,\@registration_rows); }; + _warn($context,$@) if $@; + } else { + _insert_registration_rows($context,\@registration_rows); + } + } + #use Data::Dumper; + #print Dumper(\@subscriber_rows); + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_import_db(); # keep ref count low.. + $context->{upsert} = $upsert; + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + multithreading => $import_multithreading + ); + } + + return ($result,$warning_count); + +} + +sub _import_registration_checks { + my ($file) = @_; + my $result = 1; + my $subscribercount = 0; + eval { + $subscribercount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::countby_ccacsn(); + }; + if ($@ or $subscribercount == 0) { + fileprocessingerror($file,'please import subscribers first',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } + + return $result; +} + +sub _import_registration_reset_delta { + my $upsert = 0; + if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_sipcontact() > 0) { + processing_info(threadid(),'resetting delta of ' . + NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::update_delta(undef, + $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::deleted_delta) . + ' registration records',getlogger(__PACKAGE__)); + $upsert |= 1; + } + return $upsert; +} + +sub _insert_registration_rows { + my ($context,$registration_rows) = @_; + $context->{db}->db_do_begin( + ($context->{upsert} ? + NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::getupsertstatement() + : NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::getinsertstatement($ignore_registration_unique)), + #NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::gettablename(), + #lock + ); + eval { + $context->{db}->db_do_rowblock($registration_rows); + $context->{db}->db_finish(); + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_finish(1); + }; + die($err); + } +} + + + sub _error { diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm index a056be9d..38476bab 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm @@ -76,6 +76,11 @@ our @EXPORT_OK = qw( $ignore_callforward_unique $callforward_import_single_row_txn + @registration_filenames + $registration_import_numofthreads + $ignore_registration_unique + $registration_import_single_row_txn + $provision_subscriber_multithreading $provision_subscriber_numofthreads $webpassword_length @@ -144,6 +149,11 @@ our $callforward_import_numofthreads = $cpucount; our $ignore_callforward_unique = 0; our $callforward_import_single_row_txn = 1; +our @registration_filenames = (); +our $registration_import_numofthreads = $cpucount; +our $ignore_registration_unique = 0; +our $registration_import_single_row_txn = 1; + our $provision_subscriber_multithreading = $enablemultithreading; our $provision_subscriber_numofthreads = $cpucount; our $webpassword_length = 8; @@ -212,6 +222,11 @@ sub update_settings { $ignore_callforward_unique = $data->{ignore_callforward_unique} if exists $data->{ignore_callforward_unique}; $callforward_import_single_row_txn = $data->{callforward_import_single_row_txn} if exists $data->{callforward_import_single_row_txn}; + @registration_filenames = _get_import_filenames(\@registration_filenames,$data,'registration_filenames'); + $registration_import_numofthreads = _get_numofthreads($cpucount,$data,'registration_import_numofthreads'); + $ignore_registration_unique = $data->{ignore_registration_unique} if exists $data->{ignore_registration_unique}; + $registration_import_single_row_txn = $data->{registration_import_single_row_txn} if exists $data->{registration_import_single_row_txn}; + $provision_subscriber_multithreading = $data->{provision_subscriber_multithreading} if exists $data->{provision_subscriber_multithreading}; $provision_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_subscriber_numofthreads'); $webpassword_length = $data->{webpassword_length} if exists $data->{webpassword_length}; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl index b534399e..dac5b009 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl @@ -31,6 +31,8 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( @clir_filenames @callforward_filenames + + @registration_filenames ); #$allowed_ips @@ -71,6 +73,7 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber q use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::CallForward qw(); +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); @@ -94,6 +97,7 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Import qw( import_allowedcli import_clir import_callforward + import_registration ); use NGCP::BulkProcessor::Projects::Migration::Teletek::Provisioning qw( @@ -144,6 +148,11 @@ push(@TASK_OPTS,$import_callforward_task_opt); my $import_truncate_callforward_task_opt = 'truncate_callforward'; push(@TASK_OPTS,$import_truncate_callforward_task_opt); +my $import_registration_task_opt = 'import_registration'; +push(@TASK_OPTS,$import_registration_task_opt); +my $import_truncate_registration_task_opt = 'truncate_registration'; +push(@TASK_OPTS,$import_truncate_registration_task_opt); + my $create_subscriber_task_opt = 'create_subscriber'; push(@TASK_OPTS,$create_subscriber_task_opt); @@ -220,6 +229,11 @@ sub main() { } elsif (lc($import_truncate_callforward_task_opt) eq lc($task)) { $result &= import_truncate_callforward_task(\@messages) if taskinfo($import_truncate_callforward_task_opt,$result); + } elsif (lc($import_registration_task_opt) eq lc($task)) { + $result &= import_registration_task(\@messages) if taskinfo($import_registration_task_opt,$result); + } elsif (lc($import_truncate_registration_task_opt) eq lc($task)) { + $result &= import_truncate_registration_task(\@messages) if taskinfo($import_truncate_registration_task_opt,$result); + } elsif (lc($create_subscriber_task_opt) eq lc($task)) { if (taskinfo($create_subscriber_task_opt,$result,1)) { next unless check_dry(); @@ -551,6 +565,71 @@ sub import_truncate_callforward_task { + +sub import_registration_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = import_registration(@registration_filenames); + }; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + $stats .= "\n total registration records: " . + NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_sipcontact() . ' rows'; + my $added_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::added_delta + ); + $stats .= "\n new: $added_count rows"; + my $existing_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::updated_delta + ); + $stats .= "\n existing: $existing_count rows"; + my $deleted_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::deleted_delta + ); + $stats .= "\n removed: $deleted_count rows"; + }; + if ($err or !$result) { + push(@$messages,"importing registrations INCOMPLETE$stats"); + } else { + push(@$messages,"importing registrations completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return $result; + +} + + +sub import_truncate_registration_task { + + my ($messages) = @_; + my $result = 0; + eval { + $result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::create_table(1); + }; + my $err = $@; + my $stats = ''; + eval { + $stats .= "\n total registration records: " . + NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_sipcontact() . ' rows'; + }; + if ($err or !$result) { + push(@$messages,"truncating imported registrations INCOMPLETE$stats"); + } else { + push(@$messages,"truncating imported registrations completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return $result; + +} + + + + + + sub create_subscriber_task { my ($messages) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg index debb7ecb..1e793586 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg @@ -34,6 +34,11 @@ callforward_import_numofthreads = 2 ignore_callforward_unique = 0 callforward_import_single_row_txn = 1 +registration_filenames = /home/rkrenn/temp/teletek/export_PermanentReg_170823.csv +registration_import_numofthreads = 2 +ignore_registration_unique = 0 +registration_import_single_row_txn = 1 + provision_subscriber_multithreading = 1 #provision_subscriber_numofthreads = 6 webpassword_length = 8