TT#18886 implement teletek importer #7

+ parse permanent registrations file

Change-Id: If82fbe133b2645e760fc7243ab572c823b804fa9
changes/61/15561/1
Rene Krenn 9 years ago
parent 5b04e7653a
commit 0a8d34836c

@ -43,6 +43,7 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber q
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::CallForward qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration qw();
use NGCP::BulkProcessor::RestRequests::Trunk::Resellers qw();
use NGCP::BulkProcessor::RestRequests::Trunk::Domains qw();
@ -141,6 +142,9 @@ sub check_import_db_tables {
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::CallForward');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration');
$result &= $check_result; push(@$messages,$message);
return $result;
}

@ -0,0 +1,282 @@
package NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration;
use strict;
## no critic
use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw(
get_import_db
destroy_all_dbs
);
#import_db_tableidentifier
use NGCP::BulkProcessor::SqlProcessor qw(
registertableinfo
create_targettable
checktableinfo
copy_row
insert_stmt
);
#process_table
use NGCP::BulkProcessor::SqlRecord qw();
#use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
create_table
gettablename
check_table
getinsertstatement
getupsertstatement
@fieldnames
findby_sipusername
countby_sipcontact
update_delta
findby_delta
countby_delta
$deleted_delta
$updated_delta
$added_delta
);
my $tablename = 'registration';
my $get_db = \&get_import_db;
#my $get_tablename = \&import_db_tableidentifier;
our @fieldnames = (
"sip_username",
"domain",
"sip_contact",
#calculated fields at the end!
'rownum',
'filename',
);
my $expected_fieldnames = [
@fieldnames,
'delta',
];
# table creation:
my $primarykey_fieldnames = [ 'sip_username' ];
my $indexes = {
$tablename . '_rownum' => [ 'rownum(11)' ],
$tablename . '_delta' => [ 'delta(7)' ],
};
#my $fixtable_statements = [];
our $deleted_delta = 'DELETED';
our $updated_delta = 'UPDATED';
our $added_delta = 'ADDED';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub create_table {
my ($truncate) = @_;
my $db = &$get_db();
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
sub findby_delta {
my ($delta,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return [] unless defined $delta;
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('delta') . ' = ?'
,$delta);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_sipusername {
my ($sip_username,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
#return [] unless (defined $cc or defined $ac or defined $sn);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('sip_username') . ' = ?'
,$sip_username);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub countby_sipcontact {
my ($sip_contact) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
if (defined $sip_contact) {
$stmt .= ' WHERE ' .
$db->columnidentifier('sip_contact') . ' = ?';
push(@params,$sip_contact);
}
return $db->db_get_value($stmt,@params);
}
sub update_delta {
my ($sip_username,$delta) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'UPDATE ' . $table . ' SET delta = ?';
my @params = ();
push(@params,$delta);
if (defined $sip_username) {
$stmt .= ' WHERE ' .
$db->columnidentifier('sip_username') . ' = ?';
push(@params,$sip_username);
}
return $db->db_do($stmt,@params);
}
sub countby_delta {
my ($deltas) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1';
my @params = ();
if (defined $deltas and 'HASH' eq ref $deltas) {
foreach my $in (keys %$deltas) {
my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in}));
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
push(@params,@values);
}
} elsif (defined $deltas and length($deltas) > 0) {
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?';
push(@params,$deltas);
}
return $db->db_get_value($stmt,@params);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
sub getupsertstatement {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' .
join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')';
my @values = ();
foreach my $fieldname (@$expected_fieldnames) {
if ('delta' eq $fieldname) {
my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' .
$db->columnidentifier('sip_username') . ' = ?';
push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')');
} else {
push(@values,'?');
}
}
$upsert_stmt .= ' VALUES (' . join(',',@values) . ')';
return $upsert_stmt;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -25,6 +25,10 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
$ignore_callforward_unique
$callforward_import_single_row_txn
$registration_import_numofthreads
$ignore_registration_unique
$registration_import_single_row_txn
$skip_errors
);
use NGCP::BulkProcessor::Logging qw (
@ -59,6 +63,7 @@ our @EXPORT_OK = qw(
import_allowedcli
import_clir
import_callforward
import_registration
);
sub import_subscriber {
@ -602,14 +607,6 @@ sub _insert_clir_rows {
}
}
sub import_callforward {
my (@files) = @_;
@ -643,7 +640,7 @@ sub import_callforward {
$record->{sn} //= '';
$record->{rownum} = $rownum;
$record->{filename} = $file;
if (my $subscriber = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) {
$record->{sip_username} = $subscriber->{sip_username};
} elsif (my $allowedcli = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) {
@ -779,6 +776,161 @@ sub _insert_callforward_rows {
sub import_registration {
my (@files) = @_;
my $result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::create_table(0);
foreach my $file (@files) {
$result &= _import_registration_checks($file);
}
my $importer = NGCP::BulkProcessor::Projects::Migration::Teletek::FileProcessors::CSVFile->new($registration_import_numofthreads);
my $upsert = _import_registration_reset_delta();
destroy_all_dbs(); #close all db connections before forking..
my $warning_count :shared = 0;
foreach my $file (@files) {
$result &= $importer->process(
file => $file,
process_code => sub {
my ($context,$rows,$row_offset) = @_;
my $rownum = $row_offset;
my @registration_rows = ();
foreach my $row (@$rows) {
$rownum++;
next if (scalar @$row) == 0;
$row = [ map { local $_ = $_; trim($_); } @$row ];
my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration->new($row);
$record->{rownum} = $rownum;
$record->{filename} = $file;
if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) {
if ($skip_errors) {
_warn($context,"sip username $record->{sip_username} not found");
} else {
_error($context,"sip username $record->{sip_username} not found");
}
next;
}
# prevent db's unique constraint violation:
if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::findby_sipusername($record->{sip_username})) {
if ($skip_errors) {
_warn($context,"duplicate sip username $record->{sip_username}");
} else {
_error($context,"duplicate sip username $record->{sip_username}");
}
next;
}
my %r = %$record; my @registration_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::fieldnames};
if ($context->{upsert}) {
push(@registration_row,$record->{sip_username});
} else {
push(@registration_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::added_delta);
}
push(@registration_rows,\@registration_row);
if ($registration_import_single_row_txn and (scalar @registration_rows) > 0) {
while (defined (my $registration_row = shift @registration_rows)) {
if ($skip_errors) {
eval { _insert_registration_rows($context,[$registration_row]); };
_warn($context,$@) if $@;
} else {
_insert_registration_rows($context,[$registration_row]);
}
}
}
}
if (not $registration_import_single_row_txn and (scalar @registration_rows) > 0) {
if ($skip_errors) {
eval { _insert_registration_rows($context,\@registration_rows); };
_warn($context,$@) if $@;
} else {
_insert_registration_rows($context,\@registration_rows);
}
}
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_import_db(); # keep ref count low..
$context->{upsert} = $upsert;
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
multithreading => $import_multithreading
);
}
return ($result,$warning_count);
}
sub _import_registration_checks {
my ($file) = @_;
my $result = 1;
my $subscribercount = 0;
eval {
$subscribercount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::countby_ccacsn();
};
if ($@ or $subscribercount == 0) {
fileprocessingerror($file,'please import subscribers first',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
return $result;
}
sub _import_registration_reset_delta {
my $upsert = 0;
if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_sipcontact() > 0) {
processing_info(threadid(),'resetting delta of ' .
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::update_delta(undef,
$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::deleted_delta) .
' registration records',getlogger(__PACKAGE__));
$upsert |= 1;
}
return $upsert;
}
sub _insert_registration_rows {
my ($context,$registration_rows) = @_;
$context->{db}->db_do_begin(
($context->{upsert} ?
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::getupsertstatement()
: NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::getinsertstatement($ignore_registration_unique)),
#NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::gettablename(),
#lock
);
eval {
$context->{db}->db_do_rowblock($registration_rows);
$context->{db}->db_finish();
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_finish(1);
};
die($err);
}
}
sub _error {

@ -76,6 +76,11 @@ our @EXPORT_OK = qw(
$ignore_callforward_unique
$callforward_import_single_row_txn
@registration_filenames
$registration_import_numofthreads
$ignore_registration_unique
$registration_import_single_row_txn
$provision_subscriber_multithreading
$provision_subscriber_numofthreads
$webpassword_length
@ -144,6 +149,11 @@ our $callforward_import_numofthreads = $cpucount;
our $ignore_callforward_unique = 0;
our $callforward_import_single_row_txn = 1;
our @registration_filenames = ();
our $registration_import_numofthreads = $cpucount;
our $ignore_registration_unique = 0;
our $registration_import_single_row_txn = 1;
our $provision_subscriber_multithreading = $enablemultithreading;
our $provision_subscriber_numofthreads = $cpucount;
our $webpassword_length = 8;
@ -212,6 +222,11 @@ sub update_settings {
$ignore_callforward_unique = $data->{ignore_callforward_unique} if exists $data->{ignore_callforward_unique};
$callforward_import_single_row_txn = $data->{callforward_import_single_row_txn} if exists $data->{callforward_import_single_row_txn};
@registration_filenames = _get_import_filenames(\@registration_filenames,$data,'registration_filenames');
$registration_import_numofthreads = _get_numofthreads($cpucount,$data,'registration_import_numofthreads');
$ignore_registration_unique = $data->{ignore_registration_unique} if exists $data->{ignore_registration_unique};
$registration_import_single_row_txn = $data->{registration_import_single_row_txn} if exists $data->{registration_import_single_row_txn};
$provision_subscriber_multithreading = $data->{provision_subscriber_multithreading} if exists $data->{provision_subscriber_multithreading};
$provision_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_subscriber_numofthreads');
$webpassword_length = $data->{webpassword_length} if exists $data->{webpassword_length};

@ -31,6 +31,8 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
@clir_filenames
@callforward_filenames
@registration_filenames
);
#$allowed_ips
@ -71,6 +73,7 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber q
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::CallForward qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
@ -94,6 +97,7 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Import qw(
import_allowedcli
import_clir
import_callforward
import_registration
);
use NGCP::BulkProcessor::Projects::Migration::Teletek::Provisioning qw(
@ -144,6 +148,11 @@ push(@TASK_OPTS,$import_callforward_task_opt);
my $import_truncate_callforward_task_opt = 'truncate_callforward';
push(@TASK_OPTS,$import_truncate_callforward_task_opt);
my $import_registration_task_opt = 'import_registration';
push(@TASK_OPTS,$import_registration_task_opt);
my $import_truncate_registration_task_opt = 'truncate_registration';
push(@TASK_OPTS,$import_truncate_registration_task_opt);
my $create_subscriber_task_opt = 'create_subscriber';
push(@TASK_OPTS,$create_subscriber_task_opt);
@ -220,6 +229,11 @@ sub main() {
} elsif (lc($import_truncate_callforward_task_opt) eq lc($task)) {
$result &= import_truncate_callforward_task(\@messages) if taskinfo($import_truncate_callforward_task_opt,$result);
} elsif (lc($import_registration_task_opt) eq lc($task)) {
$result &= import_registration_task(\@messages) if taskinfo($import_registration_task_opt,$result);
} elsif (lc($import_truncate_registration_task_opt) eq lc($task)) {
$result &= import_truncate_registration_task(\@messages) if taskinfo($import_truncate_registration_task_opt,$result);
} elsif (lc($create_subscriber_task_opt) eq lc($task)) {
if (taskinfo($create_subscriber_task_opt,$result,1)) {
next unless check_dry();
@ -551,6 +565,71 @@ sub import_truncate_callforward_task {
sub import_registration_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = import_registration(@registration_filenames);
};
my $err = $@;
my $stats = ": $warning_count warnings";
eval {
$stats .= "\n total registration records: " .
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_sipcontact() . ' rows';
my $added_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_delta(
$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::added_delta
);
$stats .= "\n new: $added_count rows";
my $existing_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_delta(
$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::updated_delta
);
$stats .= "\n existing: $existing_count rows";
my $deleted_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_delta(
$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::deleted_delta
);
$stats .= "\n removed: $deleted_count rows";
};
if ($err or !$result) {
push(@$messages,"importing registrations INCOMPLETE$stats");
} else {
push(@$messages,"importing registrations completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
}
sub import_truncate_registration_task {
my ($messages) = @_;
my $result = 0;
eval {
$result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::create_table(1);
};
my $err = $@;
my $stats = '';
eval {
$stats .= "\n total registration records: " .
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Registration::countby_sipcontact() . ' rows';
};
if ($err or !$result) {
push(@$messages,"truncating imported registrations INCOMPLETE$stats");
} else {
push(@$messages,"truncating imported registrations completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
}
sub create_subscriber_task {
my ($messages) = @_;

@ -34,6 +34,11 @@ callforward_import_numofthreads = 2
ignore_callforward_unique = 0
callforward_import_single_row_txn = 1
registration_filenames = /home/rkrenn/temp/teletek/export_PermanentReg_170823.csv
registration_import_numofthreads = 2
ignore_registration_unique = 0
registration_import_single_row_txn = 1
provision_subscriber_multithreading = 1
#provision_subscriber_numofthreads = 6
webpassword_length = 8

Loading…
Cancel
Save