From a50af1ca1f4b65d04d6c5c07029c0bb5702ec267 Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Fri, 14 Dec 2018 17:06:53 +0100 Subject: [PATCH] TT#49065 CCS subscriber provisioning Change-Id: Ib80102130ad943d89253dc5682614080cb4b2bee --- .../Dao/Trunk/billing/products.pm | 2 + .../Trunk/provisioning/voip_preferences.pm | 9 + lib/NGCP/BulkProcessor/FileProcessor.pm | 8 +- .../FileProcessors/XslxFileSimple.pm | 160 ++++ .../Projects/Migration/UPCAT/Check.pm | 20 +- .../UPCAT/Dao/import/CcsSubscriber.pm | 359 +++++++++ .../{Subscriber.pm => MtaSubscriber.pm} | 6 +- .../Projects/Migration/UPCAT/Import.pm | 235 ++++-- .../Projects/Migration/UPCAT/Preferences.pm | 2 +- .../Projects/Migration/UPCAT/Provisioning.pm | 683 +++++++++++++++--- .../Projects/Migration/UPCAT/Settings.pm | 188 +++-- .../Migration/UPCAT/barring_profiles.yml | 4 +- .../Projects/Migration/UPCAT/process.pl | 201 +++++- .../Projects/Migration/UPCAT/settings.cfg | 48 +- .../Migration/UPCAT/settings.debug.cfg | 49 +- 15 files changed, 1676 insertions(+), 298 deletions(-) create mode 100644 lib/NGCP/BulkProcessor/FileProcessors/XslxFileSimple.pm create mode 100755 lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/CcsSubscriber.pm rename lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/{Subscriber.pm => MtaSubscriber.pm} (99%) diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/products.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/products.pm index ad98fe7..19a2716 100755 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/products.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/products.pm @@ -21,6 +21,7 @@ our @EXPORT_OK = qw( findby_resellerid_handle $SIP_ACCOUNT_HANDLE + $PBX_ACCOUNT_HANDLE ); my $tablename = 'products'; @@ -41,6 +42,7 @@ my $expected_fieldnames = [ my $indexes = {}; our $SIP_ACCOUNT_HANDLE = 'SIP_ACCOUNT'; +our $PBX_ACCOUNT_HANDLE = 'PBX_ACCOUNT'; sub new { diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm index 089f852..3f0d69e 100755 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm @@ -44,6 +44,11 @@ our @EXPORT_OK = qw( @CF_ATTRIBUTES $RINGTIMEOUT_ATTRIBUTE + + $EXTENDED_DIALING_MODE_ATTRIBUTE + $E164_TO_RURI_ATTRIBUTE + $SERIAL_FORKING_BY_Q_VALUE_ATTRIBUTE + ); #$FORCE_OUTBOUND_CALLS_TO_PEER @@ -100,6 +105,10 @@ our @CF_ATTRIBUTES = qw(cfu cft cfna cfb); #skip sms for now our $RINGTIMEOUT_ATTRIBUTE = 'ringtimeout'; +our $EXTENDED_DIALING_MODE_ATTRIBUTE = 'extended_dialing_mode'; +our $E164_TO_RURI_ATTRIBUTE = 'e164_to_ruri'; +our $SERIAL_FORKING_BY_Q_VALUE_ATTRIBUTE = 'serial_forking_by_q_value'; + sub new { my $class = shift; diff --git a/lib/NGCP/BulkProcessor/FileProcessor.pm b/lib/NGCP/BulkProcessor/FileProcessor.pm index b7c2b23..36a7947 100755 --- a/lib/NGCP/BulkProcessor/FileProcessor.pm +++ b/lib/NGCP/BulkProcessor/FileProcessor.pm @@ -33,7 +33,7 @@ use NGCP::BulkProcessor::Utils qw(threadid); require Exporter; our @ISA = qw(Exporter); -our @EXPORT_OK = qw(); +our @EXPORT_OK = qw(create_process_context); my $thread_sleep_secs = 0.1; @@ -135,7 +135,7 @@ sub process { for (my $i = 0; $i < $self->{numofthreads}; $i++) { filethreadingdebug('starting processor thread ' . ($i + 1) . ' of ' . $self->{numofthreads},getlogger(__PACKAGE__)); my $processor = threads->create(\&_process, - _create_process_context($static_context, + create_process_context($static_context, { queue => $queue, errorstates => \%errorstates, readertid => $reader->tid(), @@ -168,7 +168,7 @@ sub process { } else { - my $context = _create_process_context($static_context,{ instance => $self, + my $context = create_process_context($static_context,{ instance => $self, filename => $file, tid => $tid, }); @@ -524,7 +524,7 @@ sub _get_stop_consumer_thread { } -sub _create_process_context { +sub create_process_context { my $context = {}; foreach my $ctx (@_) { diff --git a/lib/NGCP/BulkProcessor/FileProcessors/XslxFileSimple.pm b/lib/NGCP/BulkProcessor/FileProcessors/XslxFileSimple.pm new file mode 100644 index 0000000..6420485 --- /dev/null +++ b/lib/NGCP/BulkProcessor/FileProcessors/XslxFileSimple.pm @@ -0,0 +1,160 @@ +package NGCP::BulkProcessor::FileProcessors::XslxFileSimple; +use strict; + +## no critic + +use Excel::Reader::XLSX; qw(); + +use NGCP::BulkProcessor::Logging qw( + getlogger + fileprocessingstarted + fileprocessingdone + lines_read + processing_lines +); + +use NGCP::BulkProcessor::LogError qw( + fileprocessingfailed + fileerror +); + +use NGCP::BulkProcessor::Utils qw(threadid); + +use NGCP::BulkProcessor::FileProcessor qw(create_process_context); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::FileProcessor); +our @EXPORT_OK = qw(); + +my $default_sheet_name = undef; #'~'; +my $blocksize = 100; + +sub new { + + my $class = shift; + + my $self = NGCP::BulkProcessor::FileProcessor->new(@_); + + #$self->{numofthreads} = shift // $default_numofthreads; + $self->{custom_formats} = shift; + $self->{sheet_name} = shift // $default_sheet_name; + $self->{header_row} = shift // 0; + $self->{blocksize} = $blocksize; + + bless($self,$class); + + return $self; + +} + +sub process { + + my $self = shift; + + my %params = @_; + my ($file, + $process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code) = @params{qw/ + file + process_code + static_context + init_process_context_code + uninit_process_context_code + /}; + + fileprocessingstarted($file,getlogger(__PACKAGE__)); + my $result = 0; + my $tid = threadid(); + my $context = create_process_context($static_context,{ instance => $self, + filename => $file, + tid => $tid, + }); + eval { + my $reader = Excel::Reader::XLSX->new(); + my $workbook = $reader->read_file($file); + #my $workbook = Spreadsheet::Reader::ExcelXML->new($file); + # file => $file, + # #group_return_type => 'value', + # count_from_zero => 0, + # values_only => 1, + # empty_is_end => 1, + # group_return_type => ('HASH' eq ref $self->{custom_formats} ? 'value' : 'xml_value'), + # from_the_edge => 0, + # empty_return_type => 'undef_string', + # spaces_are_empty => 1, + # merge_data => 0, + # column_formats => 0, + #); + if (defined $init_process_context_code and 'CODE' eq ref $init_process_context_code) { + &$init_process_context_code($context); + } + if (not defined $workbook) { + fileerror('processing file - error reading file ' . $file . ': ' . $reader->error(),getlogger(__PACKAGE__)); + } else { + my $sheet; + if ($self->{sheet_name}) { + $sheet = $workbook->worksheet($self->{sheet_name}); + #xls2csvinfo('converting the ' . $sheet->name() . ' worksheet',getlogger(__PACKAGE__)); + } else { + $sheet = $workbook->worksheet(0); + #if (@{$workbook->worksheets()} > 1) { + # xls2csvinfo('multiple worksheets found, converting ' . $sheet->name(),getlogger(__PACKAGE__)); + #} + } + if (not defined $sheet) { + #fileerror('processing file - error reading file ' . $file . ': ' . $workbook->error(),getlogger(__PACKAGE__)); + fileerror('invalid spreadsheet',getlogger(__PACKAGE__)); + } else { + $result = 1; + #_info($context,"worksheet '" . $worksheet->get_name() . "' opened"); + + #$worksheet->set_custom_formats($self->{custom_formats}) if 'HASH' eq ref $self->{custom_formats}; + #$worksheet->set_custom_formats({ + # 2 =>'yyyy-mm-dd', + #}); + #$worksheet->set_headers($self->{header_row}) if defined $self->{header_row}; + #if ($worksheet->header_row_set()) { + # $worksheet->go_to_or_past_row($worksheet->get_excel_position($worksheet->get_last_header_row())); + #} + + my $i = 0; + processing_lines($tid,$i,$self->{blocksize},undef,getlogger(__PACKAGE__)); + #my $value; + my @rows = (); + while ($result) { + #$value = $worksheet->fetchrow_arrayref; + my $row = $sheet->next_row(); + last unless $row; #if (not $value or 'EOF' eq $value); + my @vals = $row->values(); + #$i++; + #next if not ref $value; + push(@rows,\@vals); + if ((scalar @rows) >= $self->{blocksize}) { + $result &= &$process_code($context,\@rows,$i); + $i += scalar @rows; + processing_lines($tid,$i,$self->{blocksize},undef,getlogger(__PACKAGE__)); + @rows = (); + } + } + $result &= &$process_code($context,\@rows,$i); + } + } + }; + $result &= 0 if $@; + eval { + if (defined $uninit_process_context_code and 'CODE' eq ref $uninit_process_context_code) { + &$uninit_process_context_code($context); + } + }; + if ($result) { + fileprocessingdone($file,getlogger(__PACKAGE__)); + } else { + fileprocessingfailed($file,getlogger(__PACKAGE__)); + } + return $result; + +} + +1; \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Check.pm b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Check.pm index 3d59972..35e071b 100755 --- a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Check.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Check.pm @@ -29,12 +29,13 @@ use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); #use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); #use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw(); #use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw(); -#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw(); use NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users qw(); -#use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw(); +use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw(); -use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw(); +use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber qw(); +use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Resellers qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Domains qw(); @@ -116,7 +117,10 @@ sub check_import_db_tables { my $message_prefix = 'import db tables - '; - ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber'); + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber'); + $result &= $check_result; push(@$messages,$message); + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber'); $result &= $check_result; push(@$messages,$message); return $result; @@ -163,8 +167,8 @@ sub check_provisioning_db_tables { #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations'); #$result &= $check_result; push(@$messages,$message); - #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources'); - #$result &= $check_result; push(@$messages,$message); + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources'); + $result &= $check_result; push(@$messages,$message); return $result; @@ -183,8 +187,8 @@ sub check_kamailio_db_tables { ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users'); $result &= $check_result; push(@$messages,$message); - #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::kamailio::location'); - #$result &= $check_result; push(@$messages,$message); + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::kamailio::location'); + $result &= $check_result; push(@$messages,$message); return $result; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/CcsSubscriber.pm b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/CcsSubscriber.pm new file mode 100755 index 0000000..7146138 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/CcsSubscriber.pm @@ -0,0 +1,359 @@ +package NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber; +use strict; + +## no critic + +use NGCP::BulkProcessor::Projects::Migration::UPCAT::ProjectConnectorPool qw( + get_import_db + destroy_all_dbs +); +#import_db_tableidentifier + +use NGCP::BulkProcessor::SqlProcessor qw( + registertableinfo + create_targettable + checktableinfo + copy_row + + insert_stmt + + process_table +); +use NGCP::BulkProcessor::SqlRecord qw(); + +#use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + create_table + gettablename + check_table + getinsertstatement + getupsertstatement + + findby_service_number + countby_service_number + + findby_switch_number + + update_delta + findby_delta + countby_delta + + $deleted_delta + $updated_delta + $added_delta + + process_records + + @fieldnames +); + +#findby_ccacsn +#countby_ccacsn + +#findby_domain_sipusername +#findby_domain_webusername +#list_domain_billingprofilename_resellernames +#findby_sipusername +#list_barring_resellernames + +my $tablename = 'ccs_subscriber'; +my $get_db = \&get_import_db; + +our @fieldnames = ( + "service_number", + "switch_number", + "icm", + "routing_type", + "customer", + "target_number", + "comment", + + 'rownum', +); +my $expected_fieldnames = [ + @fieldnames, + 'delta', +]; + +# table creation: +my $primarykey_fieldnames = [ 'service_number' ]; +my $indexes = { + + $tablename . '_switch_number' => [ 'switch_number(12)' ], + + $tablename . '_rownum' => [ 'rownum(11)' ], + $tablename . '_delta' => [ 'delta(7)' ],}; +#my $fixtable_statements = []; + +our $deleted_delta = 'DELETED'; +our $updated_delta = 'UPDATED'; +our $added_delta = 'ADDED'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub create_table { + + my ($truncate) = @_; + + my $db = &$get_db(); + + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); + +} + +sub findby_delta { + + my ($delta,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless defined $delta; + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('delta') . ' = ?' + ,$delta); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_service_number { + + my ($service_number,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return undef unless (defined $service_number); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('service_number') . ' = ?' + ,$service_number); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub findby_switch_number { + + my ($switch_number,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless (defined $switch_number); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('switch_number') . ' = ?' + ,$switch_number); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub update_delta { + + my ($service_number,$delta) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'UPDATE ' . $table . ' SET delta = ?'; + my @params = (); + push(@params,$delta); + if (defined $service_number) { + $stmt .= ' WHERE ' . + $db->columnidentifier('service_number') . ' = ?'; + push(@params,$service_number); + } + + return $db->db_do($stmt,@params); + +} + +sub countby_service_number { + + my ($service_number) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + if (defined $service_number) { + $stmt .= ' WHERE ' . + $db->columnidentifier('service_number') . ' = ?'; + push(@params,$service_number); + } + + return $db->db_get_value($stmt,@params); + +} + +sub countby_delta { + + my ($deltas) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1'; + my @params = (); + if (defined $deltas and 'HASH' eq ref $deltas) { + foreach my $in (keys %$deltas) { + my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $deltas and length($deltas) > 0) { + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?'; + push(@params,$deltas); + } + + return $db->db_get_value($stmt,@params); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub process_records { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + /}; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my @cols = map { $db->columnidentifier($_); } qw/switch_number/; + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,$rowblock,$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_all_dbs, + multithreading => $multithreading, + tableprocessing_threads => $numofthreads, + #'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols), + 'select' => $db->paginate_sort_query('SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),undef,undef,[{ + column => 'rownum', + numeric => 1, + dir => 1, + }]), + 'selectcount' => 'SELECT COUNT(*) FROM (SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols) . ') AS g', + ); +} + +sub getinsertstatement { + + my ($insert_ignore) = @_; + check_table(); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); + +} + +sub getupsertstatement { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' . + join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')'; + my @values = (); + foreach my $fieldname (@$expected_fieldnames) { + if ('delta' eq $fieldname) { + my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' . + $db->columnidentifier('service_number') . ' = ?'; + push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')'); + } else { + push(@values,'?'); + } + } + $upsert_stmt .= ' VALUES (' . join(',',@values) . ')'; + return $upsert_stmt; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/Subscriber.pm b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/MtaSubscriber.pm similarity index 99% rename from lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/Subscriber.pm rename to lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/MtaSubscriber.pm index 9740ae0..ad03f87 100755 --- a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/Subscriber.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Dao/import/MtaSubscriber.pm @@ -1,4 +1,4 @@ -package NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber; +package NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber; use strict; ## no critic @@ -62,7 +62,7 @@ our @EXPORT_OK = qw( # findby_sipusername # list_barring_resellernames -my $tablename = 'subscriber'; +my $tablename = 'mta_subscriber'; my $get_db = \&get_import_db; #my $get_tablename = \&import_db_tableidentifier; @@ -174,7 +174,7 @@ sub findby_ccacsn { my $db = &$get_db(); my $table = $db->tableidentifier($tablename); - return [] unless (defined $cc or defined $ac or defined $sn); + return undef unless (defined $cc or defined $ac or defined $sn); my $rows = $db->db_get_all_arrayref( 'SELECT * FROM ' . diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Import.pm b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Import.pm index c064e4c..4d17770 100755 --- a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Import.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Import.pm @@ -5,24 +5,32 @@ use strict; use threads::shared qw(); +#use Encode qw(); + use NGCP::BulkProcessor::Projects::Migration::UPCAT::Settings qw( - $provision_subscriber_rownum_start + $provision_mta_subscriber_rownum_start $import_multithreading - $subscriber_import_numofthreads - $ignore_subscriber_unique - $subscriber_import_single_row_txn + $mta_subscriber_import_numofthreads + $ignore_mta_subscriber_unique + $mta_subscriber_import_single_row_txn $skip_errors - $default_domain - $default_reseller_name - $default_billing_profile_name - $default_barring + $mta_default_domain + $mta_default_reseller_name + $mta_default_billing_profile_name + $mta_default_barring $cc_ac_map $default_cc $cc_len_min $cc_len_max $ac_len + + $ignore_ccs_subscriber_unique + $provision_ccs_subscriber_rownum_start + $ccs_subscriber_import_single_row_txn + + split_number ); use NGCP::BulkProcessor::Logging qw ( getlogger @@ -36,13 +44,14 @@ use NGCP::BulkProcessor::LogError qw( #use NGCP::BulkProcessor::Projects::Migration::UPCAT::FileProcessors::CSVFile qw(); use NGCP::BulkProcessor::FileProcessors::CSVFileSimple qw(); +use NGCP::BulkProcessor::FileProcessors::XslxFileSimple qw(); use NGCP::BulkProcessor::Projects::Migration::UPCAT::ProjectConnectorPool qw( get_import_db destroy_all_dbs ); -use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw(); +use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber qw(); use NGCP::BulkProcessor::Array qw(removeduplicates); use NGCP::BulkProcessor::Utils qw(threadid zerofill trim); @@ -51,24 +60,24 @@ use NGCP::BulkProcessor::Table qw(get_rowhash); require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( - import_subscriber - + import_mta_subscriber + import_ccs_subscriber ); -sub import_subscriber { +sub import_mta_subscriber { my (@files) = @_; - my $result = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::create_table(0); + my $result = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::create_table(0); foreach my $file (@files) { - $result &= _import_subscriber_checks($file); + $result &= _import_mta_subscriber_checks($file); } #my $importer = NGCP::BulkProcessor::Projects::Migration::UPCAT::FileProcessors::CSVFile->new($subscriber_import_numofthreads); - my $importer = NGCP::BulkProcessor::FileProcessors::CSVFileSimple->new($subscriber_import_numofthreads); + my $importer = NGCP::BulkProcessor::FileProcessors::CSVFileSimple->new($mta_subscriber_import_numofthreads); - my $upsert = _import_subscriber_reset_delta(); + my $upsert = _import_mta_subscriber_reset_delta(); destroy_all_dbs(); #close all db connections before forking.. my $warning_count :shared = 0; @@ -83,18 +92,18 @@ sub import_subscriber { my @subscriber_rows = (); foreach my $row (@$rows) { $rownum++; - next if (defined $provision_subscriber_rownum_start and $rownum < $provision_subscriber_rownum_start); + next if (defined $provision_mta_subscriber_rownum_start and $rownum < $provision_mta_subscriber_rownum_start); next if (scalar @$row) == 0; $row = [ map { local $_ = $_; trim($_); $_ =~ s/^"//; $_ =~ s/"$//r; } @$row ]; - my $record = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber->new($row); - $record->{reseller_name} = $default_reseller_name; + my $record = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber->new($row); + $record->{reseller_name} = $mta_default_reseller_name; ($record->{sip_username},$record->{domain}) = split('@',$record->{_txt_sw_username},2); - $record->{domain} //= $default_domain; - $record->{billing_profile_name} = $default_billing_profile_name; - ($record->{cc},$record->{ac},$record->{sn}) = _split_dn($record->{_dn}); + $record->{domain} //= $mta_default_domain; + $record->{billing_profile_name} = $mta_default_billing_profile_name; + ($record->{cc},$record->{ac},$record->{sn}) = split_number($record->{_dn}); $record->{web_username} = undef; $record->{web_password} = undef; - $record->{barring} = $default_barring; + $record->{barring} = $mta_default_barring; #$record->{allowed_ips} #"channels", #"voicemail", @@ -103,35 +112,35 @@ sub import_subscriber { $record->{filenum} = $filenum; $record->{filename} = $file; - my %r = %$record; my @row_ext = @r{@NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::fieldnames}; + my %r = %$record; my @row_ext = @r{@NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::fieldnames}; if ($context->{upsert}) { push(@row_ext,$record->{cc},$record->{ac},$record->{sn}); } else { - push(@row_ext,$NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::added_delta); + push(@row_ext,$NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::added_delta); } push(@subscriber_rows,\@row_ext); # if &{$context->{check_number_code}}($context,$record); #my %r = %$record; #$record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::contact_fieldnames}]); #next unless _unfold_number_ranges($context,$record,\@subscriber_rows); - if ($subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) { + if ($mta_subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) { while (defined (my $subscriber_row = shift @subscriber_rows)) { if ($skip_errors) { - eval { _insert_subscriber_rows($context,[$subscriber_row]); }; + eval { _insert_mta_subscriber_rows($context,[$subscriber_row]); }; _warn($context,$@) if $@; } else { - _insert_subscriber_rows($context,[$subscriber_row]); + _insert_mta_subscriber_rows($context,[$subscriber_row]); } } } } - if (not $subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) { + if (not $mta_subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) { if ($skip_errors) { - eval { _insert_subscriber_rows($context,\@subscriber_rows); }; + eval { _insert_mta_subscriber_rows($context,\@subscriber_rows); }; _warn($context,$@) if $@; } else { - _insert_subscriber_rows($context,\@subscriber_rows); + _insert_mta_subscriber_rows($context,\@subscriber_rows); } } #use Data::Dumper; @@ -184,31 +193,31 @@ sub import_subscriber { } -sub _import_subscriber_checks { +sub _import_mta_subscriber_checks { my ($file) = @_; my $result = 1; return $result; } -sub _import_subscriber_reset_delta { +sub _import_mta_subscriber_reset_delta { my $upsert = 0; - if (NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_ccacsn() > 0) { + if (NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() > 0) { processing_info(threadid(),'resetting delta of ' . - NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::update_delta(undef,undef,undef, - $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::deleted_delta) . - ' subscriber records',getlogger(__PACKAGE__)); + NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::update_delta(undef,undef,undef, + $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::deleted_delta) . + ' mta subscriber records',getlogger(__PACKAGE__)); $upsert |= 1; } return $upsert; } -sub _insert_subscriber_rows { +sub _insert_mta_subscriber_rows { my ($context,$subscriber_rows) = @_; $context->{db}->db_do_begin( ($context->{upsert} ? - NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::getupsertstatement() - : NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::getinsertstatement($ignore_subscriber_unique)), + NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::getupsertstatement() + : NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::getinsertstatement($ignore_mta_subscriber_unique)), #NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::gettablename(), #lock ); @@ -225,42 +234,132 @@ sub _insert_subscriber_rows { } } -sub _split_dn { - my ($dn) = @_; - my ($cc,$ac,$sn) = ('','',$dn); - if ($cc_ac_map) { - if ($default_cc) { - $cc = $default_cc; - $dn =~ s/^0//; - $sn = $dn; - } else { - foreach my $cc_length ($cc_len_min .. $cc_len_max) { - my ($_cc,$_dn) = (substr($dn,0,$cc_length), substr($dn,$cc_length)); - if (exists $cc_ac_map->{$_cc}) { - $cc = $_cc; - $sn = $_dn; - $dn = $_dn; - last; + +sub import_ccs_subscriber { + + my ($file) = @_; + # create tables: + my $result = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::create_table(0); + + # checks, e.g. other table must be present: + # ..none.. + + # prepare parse: + my $importer = NGCP::BulkProcessor::FileProcessors::XslxFileSimple->new(); #$user_password_import_numofthreads); + + my $upsert = _import_ccs_subscriber_reset_delta(); + + # launch: + destroy_all_dbs(); #close all db connections before forking.. + my $warning_count :shared = 0; + return ($result && $importer->process( + file => $file, + process_code => sub { + my ($context,$rows,$row_offset) = @_; + my $rownum = $row_offset; + my @subscriber_rows = (); + foreach my $row (@$rows) { + $rownum++; + next if (defined $provision_ccs_subscriber_rownum_start and $rownum < $provision_ccs_subscriber_rownum_start); + next if (scalar @$row) == 0 or (scalar @$row) == 1; + $row = [ map { local $_ = $_; trim($_); $_; } @$row ]; #Encode::encode('utf8',Encode::decode('cp1252',$_)); + my $record = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber->new($row); + $record->{rownum} = $rownum; + my %r = %$record; my @row_ext = @r{@NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::fieldnames}; + if ($context->{upsert}) { + push(@row_ext,$record->{service_number}); + } else { + push(@row_ext,$NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::added_delta); } + push(@subscriber_rows,\@row_ext); # if &{$context->{check_number_code}}($context,$record); + + #my %r = %$record; + #$record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::contact_fieldnames}]); + #next unless _unfold_number_ranges($context,$record,\@subscriber_rows); + if ($ccs_subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) { + while (defined (my $subscriber_row = shift @subscriber_rows)) { + if ($skip_errors) { + eval { _insert_ccs_subscriber_rows($context,[$subscriber_row]); }; + _warn($context,$@) if $@; + } else { + _insert_ccs_subscriber_rows($context,[$subscriber_row]); + } + } + } + } - } - if (exists $cc_ac_map->{$cc}) { - my $ac_map = $cc_ac_map->{$cc}; - foreach my $ac_length ($ac_len->{$cc}->{min} .. $ac_len->{$cc}->{max}) { - my ($_ac,$_sn) = (substr($dn,0,$ac_length), substr($dn,$ac_length)); - if (exists $ac_map->{$_ac}) { - $ac = $_ac; - $sn = $_sn; - #$dn = ''; - last; + + if (not $ccs_subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) { + if ($skip_errors) { + eval { _insert_ccs_subscriber_rows($context,\@subscriber_rows); }; + _warn($context,$@) if $@; + } else { + _insert_ccs_subscriber_rows($context,\@subscriber_rows); } } - } + + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_import_db(); # keep ref count low.. + $context->{upsert} = $upsert; + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + multithreading => $import_multithreading + ),$warning_count); + +} + +sub _import_ccs_subscriber_reset_delta { + my $upsert = 0; + if (NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::countby_service_number() > 0) { + processing_info(threadid(),'resetting delta of ' . + NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::update_delta(undef, + $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::deleted_delta) . + ' ccs subscriber records',getlogger(__PACKAGE__)); + $upsert |= 1; } - return ($cc,$ac,$sn); + return $upsert; +} +sub _insert_ccs_subscriber_rows { + my ($context,$subscriber_rows) = @_; + $context->{db}->db_do_begin( + ($context->{upsert} ? + NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::getupsertstatement() + : NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::getinsertstatement($ignore_ccs_subscriber_unique)), + #NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::gettablename(), + #lock + ); + eval { + $context->{db}->db_do_rowblock($subscriber_rows); + $context->{db}->db_finish(); + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_finish(1); + }; + die($err); + } } + + + + + sub _error { my ($context,$message) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Preferences.pm b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Preferences.pm index 393a4e7..fa0644f 100755 --- a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Preferences.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Preferences.pm @@ -29,7 +29,7 @@ use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); -use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw(); +#use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw(); diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Provisioning.pm b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Provisioning.pm index aab30aa..8cb7319 100755 --- a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Provisioning.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Provisioning.pm @@ -10,18 +10,32 @@ use String::MkPasswd qw(); use JSON -support_by_pp, -no_export; use Tie::IxHash; +use Encode qw(); + use NGCP::BulkProcessor::Projects::Migration::UPCAT::Settings qw( $dry $skip_errors $report_filename - $provision_subscriber_multithreading - $provision_subscriber_numofthreads - $webpassword_length - $webusername_length - $sippassword_length + $provision_mta_subscriber_multithreading + $provision_mta_subscriber_numofthreads + $mta_webpassword_length + $mta_webusername_length + $mta_sippassword_length $barring_profiles + + $provision_ccs_subscriber_multithreading + $provision_ccs_subscriber_numofthreads + + split_number + + $ccs_reseller_name + $ccs_billing_profile_name + $ccs_domain + $ccs_sippassword_length + + @css_trusted_source_ips ); use NGCP::BulkProcessor::Logging qw ( @@ -35,7 +49,8 @@ use NGCP::BulkProcessor::LogError qw( fileerror ); -use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw(); +use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber qw(); +use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::products qw(); @@ -60,10 +75,10 @@ use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw(); #use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); #use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw(); #use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw(); -#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw(); +use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw(); use NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users qw(); -#use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw(); +use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Subscribers qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Customers qw(); @@ -93,8 +108,8 @@ use NGCP::BulkProcessor::RandomString qw(createtmpstring); require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( - provision_subscribers - + provision_mta_subscribers + provision_ccs_subscribers ); my $split_ipnets_pattern = join('|',( @@ -108,15 +123,17 @@ my $file_lock :shared = undef; my $default_barring = 'default'; -sub provision_subscribers { +my $ccs_contact_identifier_field = 'gpp9'; + +sub provision_mta_subscribers { my $static_context = { now => timestamp(), _rowcount => undef }; - my $result = _provision_subscribers_checks($static_context); + my $result = _provision_mta_subscribers_checks($static_context); destroy_all_dbs(); my $warning_count :shared = 0; my %nonunique_contacts :shared = (); - return ($result && NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::process_records( + return ($result && NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::process_records( static_context => $static_context, process_code => sub { my ($context,$records,$row_offset) = @_; @@ -125,8 +142,8 @@ sub provision_subscribers { my @report_data = (); foreach my $domain_sipusername (@$records) { $context->{_rowcount} += 1; - next unless _provision_susbcriber($context, - NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::findby_domain_sipusername(@$domain_sipusername)); + next unless _provision_mta_susbcriber($context, + NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::findby_domain_sipusername(@$domain_sipusername)); push(@report_data,_get_report_obj($context)); } #cleanup_aig_sequence_ids($context); @@ -163,8 +180,8 @@ sub provision_subscribers { } }, load_recursive => 0, - multithreading => $provision_subscriber_multithreading, - numofthreads => $provision_subscriber_numofthreads, + multithreading => $provision_mta_subscriber_multithreading, + numofthreads => $provision_mta_subscriber_numofthreads, ),$warning_count,\%nonunique_contacts); } @@ -202,10 +219,10 @@ sub _get_report_obj { return \%dump; } -sub _provision_susbcriber { +sub _provision_mta_susbcriber { my ($context,$subscriber_group) = @_; - return 0 unless _provision_susbcriber_init_context($context,$subscriber_group); + return 0 unless _provision_mta_susbcriber_init_context($context,$subscriber_group); eval { lock $db_lock; @@ -221,14 +238,14 @@ sub _provision_susbcriber { if ((scalar @$existing_billing_voip_subscribers) == 0) { - _update_contact($context); + _update_mta_contact($context); _update_contract($context); #{ # lock $db_lock; #concurrent writes to voip_numbers causes deadlocks _update_subscriber($context); _create_aliases($context); #} - _update_preferences($context); + _update_mta_preferences($context); #_set_registrations($context); #_set_callforwards($context); #todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it @@ -261,25 +278,25 @@ sub _provision_susbcriber { } -sub _provision_subscribers_checks { +sub _provision_mta_subscribers_checks { my ($context) = @_; my $result = 1; my $subscribercount = 0; eval { - $subscribercount = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_ccacsn(); + $subscribercount = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn(); }; if ($@ or $subscribercount == 0) { - rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__)); + rowprocessingerror(threadid(),'please import mta subscribers first',getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. } else { - processing_info(threadid(),"$subscribercount subscriber found",getlogger(__PACKAGE__)); + processing_info(threadid(),"$subscribercount mta subscriber found",getlogger(__PACKAGE__)); } my $domain_billingprofilename_resellernames = []; eval { - $domain_billingprofilename_resellernames = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::list_domain_billingprofilename_resellernames(); + $domain_billingprofilename_resellernames = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::list_domain_billingprofilename_resellernames(); }; if ($@ or (scalar @$domain_billingprofilename_resellernames) == 0) { rowprocessingerror(threadid(),"no domains/billing profile names/reseller names",getlogger(__PACKAGE__)); @@ -380,66 +397,11 @@ sub _provision_subscribers_checks { processing_info(threadid(),"$NGCP::BulkProcessor::Dao::Trunk::billing::products::SIP_ACCOUNT_HANDLE product found",getlogger(__PACKAGE__)); } - $context->{attributes} = {}; - - eval { - $context->{attributes}->{allowed_clis} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_CLIS_ATTRIBUTE); - }; - if ($@ or not defined $context->{attributes}->{allowed_clis}) { - rowprocessingerror(threadid(),'cannot find allowed_clis attribute',getlogger(__PACKAGE__)); - $result = 0; #even in skip-error mode.. - } else { - processing_info(threadid(),"allowed_clis attribute found",getlogger(__PACKAGE__)); - } - - eval { - $context->{attributes}->{cli} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CLI_ATTRIBUTE); - }; - if ($@ or not defined $context->{attributes}->{cli}) { - rowprocessingerror(threadid(),'cannot find cli attribute',getlogger(__PACKAGE__)); - $result = 0; #even in skip-error mode.. - } else { - processing_info(threadid(),"cli attribute found",getlogger(__PACKAGE__)); - } - - eval { - $context->{attributes}->{ac} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::AC_ATTRIBUTE); - }; - if ($@ or not defined $context->{attributes}->{ac}) { - rowprocessingerror(threadid(),'cannot find ac attribute',getlogger(__PACKAGE__)); - $result = 0; #even in skip-error mode.. - } else { - processing_info(threadid(),"ac attribute found",getlogger(__PACKAGE__)); - } - - eval { - $context->{attributes}->{cc} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CC_ATTRIBUTE); - }; - if ($@ or not defined $context->{attributes}->{cc}) { - rowprocessingerror(threadid(),'cannot find cc attribute',getlogger(__PACKAGE__)); - $result = 0; #even in skip-error mode.. - } else { - processing_info(threadid(),"cc attribute found",getlogger(__PACKAGE__)); - } - - eval { - $context->{attributes}->{account_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ACCOUNT_ID_ATTRIBUTE); - }; - if ($@ or not defined $context->{attributes}->{account_id}) { - rowprocessingerror(threadid(),'cannot find account_id attribute',getlogger(__PACKAGE__)); - $result = 0; #even in skip-error mode.. - } else { - processing_info(threadid(),"account_id attribute found",getlogger(__PACKAGE__)); - } + $result = _provision_subscribers_base_prefs_checks($context,$result); my $barring_resellernames = []; eval { - $barring_resellernames = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::list_barring_resellernames(); + $barring_resellernames = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::list_barring_resellernames(); }; if ($@) { rowprocessingerror(threadid(),'error retrieving barrings',getlogger(__PACKAGE__)); @@ -530,7 +492,7 @@ sub _check_ncos_level { return $result; } -sub _update_contact { +sub _update_mta_contact { my ($context) = @_; @@ -697,7 +659,7 @@ sub _update_subscriber { } -sub _update_preferences { +sub _update_mta_preferences { my ($context) = @_; @@ -830,7 +792,7 @@ sub _create_aliases { return $result; } -sub _provision_susbcriber_init_context { +sub _provision_mta_susbcriber_init_context { my ($context,$subscriber_group) = @_; @@ -944,7 +906,7 @@ sub _provision_susbcriber_init_context { } unless (defined $context->{prov_subscriber}->{password} and length($context->{prov_subscriber}->{password}) > 0) { - my $generated = _generate_sippassword(); + my $generated = _generate_sippassword($mta_sippassword_length); $context->{prov_subscriber}->{password} = $generated; _info($context,"empty sip_password, using generated '$generated'",1); } @@ -964,14 +926,14 @@ sub _provision_susbcriber_init_context { @{NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::findby_domain_webusername( $first->{domain},$webusername)}; if ((scalar keys %webusername_dupes) > 1) { - my $generated = _generate_webusername(); #$first->{sip_username}; + my $generated = _generate_webusername($mta_webusername_length); #$first->{sip_username}; _info($context,"duplicate web_username '$webusername', using generated '$generated'",1); $context->{prov_subscriber}->{webusername} = $generated; } #$context->{prov_subscriber}->{webpassword} = $first->{web_password}; if (not (defined $context->{prov_subscriber}->{webpassword} and length($context->{prov_subscriber}->{webpassword}) > 0)) { - my $generated = _generate_webpassword(); + my $generated = _generate_webpassword($mta_webpassword_length); _info($context,"empty web_password for web_username '$webusername', using generated '$generated'",1); $context->{prov_subscriber}->{webpassword} = $generated; #} elsif (defined $first->{web_password} and length($first->{web_password}) < 8) { @@ -1046,6 +1008,7 @@ sub _provision_susbcriber_init_context { sub _generate_webpassword { + my $webpassword_length = shift; return String::MkPasswd::mkpasswd( -length => $webpassword_length, -minnum => 1, -minlower => 1, -minupper => 1, -minspecial => 1, @@ -1054,10 +1017,12 @@ sub _generate_webpassword { } sub _generate_sippassword { + my $sippassword_length = shift; return createtmpstring($sippassword_length); } sub _generate_webusername { + my $webusername_length = shift; return createtmpstring($webusername_length); } @@ -1069,6 +1034,546 @@ sub _apply_reseller_mapping { return $reseller_name; } + + + +sub provision_ccs_subscribers { + + my $static_context = { now => timestamp(), _rowcount => undef }; + my $result = _provision_ccs_subscribers_checks($static_context); + + destroy_all_dbs(); + my $warning_count :shared = 0; + return ($result && NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::process_records( + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + ping_all_dbs(); + $context->{_rowcount} = $row_offset; + my @report_data = (); + foreach my $switch_number (@$records) { + $context->{_rowcount} += 1; + next unless _provision_ccs_susbcriber($context, + NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::findby_switch_number(@$switch_number)); + push(@report_data,_get_report_obj($context)); + } + #cleanup_aig_sequence_ids($context); + if (defined $report_filename) { + lock $file_lock; + open(my $fh, '>>', $report_filename) or fileerror('cannot open file ' . $report_filename . ': ' . $!,getlogger(__PACKAGE__)); + binmode($fh); + print $fh JSON::to_json(\@report_data,{ allow_nonref => 1, allow_blessed => 1, convert_blessed => 1, pretty => 1, }); + close $fh; + } + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_xa_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + load_recursive => 0, + multithreading => $provision_ccs_subscriber_multithreading, + numofthreads => $provision_ccs_subscriber_numofthreads, + ),$warning_count); + +} + +sub _provision_ccs_susbcriber { + my ($context,$subscriber_group) = @_; + + return 0 unless _provision_ccs_susbcriber_init_context($context,$subscriber_group); + + eval { + lock $db_lock; + $context->{db}->db_begin(); + #_warn($context,'AutoCommit is on') if $context->{db}->{drh}->{AutoCommit}; + + my $existing_billing_voip_subscribers = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::findby_domainid_username_states( + $context->{db}, + $context->{domain}->{id}, + $context->{prov_subscriber}->{username}, + { 'NOT IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::TERMINATED_STATE} + ); + + if ((scalar @$existing_billing_voip_subscribers) == 0) { + + _update_ccs_contact($context); + _update_contract($context); + #{ + # lock $db_lock; #concurrent writes to voip_numbers causes deadlocks + _update_subscriber($context); + _create_aliases($context); + #} + _update_ccs_preferences($context); + _set_registrations($context); + ##_set_callforwards($context); + ##todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it + ##in this main provisioning loop, or align it in separate run-modes, according to the files given. + + } else { + _warn($context,(scalar @$existing_billing_voip_subscribers) . ' existing billing subscribers found, skipping'); + } + + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } + + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + }; + if ($skip_errors) { + _warn($context, $err); + } else { + _error($context, $err); + } + } + + return 1; + +} + +sub _provision_ccs_subscribers_checks { + my ($context) = @_; + + my $result = 1; + + my $subscribercount = 0; + eval { + $subscribercount = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::countby_service_number(); + }; + if ($@ or $subscribercount == 0) { + rowprocessingerror(threadid(),'please import ccs subscribers first',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"$subscribercount ccs subscribers found",getlogger(__PACKAGE__)); + } + + eval { + $context->{reseller} = NGCP::BulkProcessor::Dao::Trunk::billing::resellers::findby_name($ccs_reseller_name); + }; + if ($@ or not defined $context->{reseller}) { + rowprocessingerror(threadid(),"cannot find reseller '$ccs_reseller_name'",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"reseller '$ccs_reseller_name' found",getlogger(__PACKAGE__)); + eval { + $context->{domain} = NGCP::BulkProcessor::Dao::Trunk::billing::domains::findby_domain($ccs_domain); + if (defined $context->{domain} + and NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers::countby_domainid_resellerid($context->{domain}->{id},$context->{reseller}->{id}) == 0) { + undef $context->{domain}; + } + }; + if ($@ or not defined $context->{domain}) { + rowprocessingerror(threadid(),"cannot find billing domain '$ccs_domain' (of reseller '$ccs_reseller_name')",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + + processing_info(threadid(),"billing domain '$ccs_domain' (of reseller '$ccs_reseller_name') found",getlogger(__PACKAGE__)); + eval { + $context->{domain}->{prov_domain} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains::findby_domain($ccs_domain); + }; + if ($@ or not defined $context->{domain}->{prov_domain}) { + rowprocessingerror(threadid(),"cannot find provisioning domain '$ccs_domain'",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"provisioning domain '$ccs_domain' found",getlogger(__PACKAGE__)); + } + + } + + eval { + my $billing_profiles = NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles::findby_resellerid_name_handle($context->{reseller}->{id},$ccs_billing_profile_name); + $context->{billing_profile} = $billing_profiles->[0] if (scalar @$billing_profiles) == 1; + }; + if ($@ or not defined $context->{billing_profile}) { + rowprocessingerror(threadid(),"cannot find billing profile '$ccs_billing_profile_name' (of reseller '$ccs_reseller_name')",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"billing profile '$ccs_billing_profile_name' (of reseller '$ccs_reseller_name') found",getlogger(__PACKAGE__)); + } + + } + + eval { + $context->{pbx_account_product} = NGCP::BulkProcessor::Dao::Trunk::billing::products::findby_resellerid_handle(undef, + $NGCP::BulkProcessor::Dao::Trunk::billing::products::PBX_ACCOUNT_HANDLE)->[0]; + }; + if ($@ or not defined $context->{pbx_account_product}) { + rowprocessingerror(threadid(),"cannot find $NGCP::BulkProcessor::Dao::Trunk::billing::products::PBX_ACCOUNT_HANDLE product",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"$NGCP::BulkProcessor::Dao::Trunk::billing::products::PBX_ACCOUNT_HANDLE product found",getlogger(__PACKAGE__)); + } + + $result = _provision_subscribers_base_prefs_checks($context,$result); + + #extended_dialing_mode, e164_to_ruri and serial_forking_by_q_value + + eval { + $context->{attributes}->{extended_dialing_mode} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::EXTENDED_DIALING_MODE_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{extended_dialing_mode}) { + rowprocessingerror(threadid(),'cannot find extended_dialing_mode attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"extended_dialing_mode attribute found",getlogger(__PACKAGE__)); + } + + eval { + $context->{attributes}->{e164_to_ruri} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::E164_TO_RURI_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{e164_to_ruri}) { + rowprocessingerror(threadid(),'cannot find e164_to_ruri attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"e164_to_ruri attribute found",getlogger(__PACKAGE__)); + } + + eval { + $context->{attributes}->{serial_forking_by_q_value} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::SERIAL_FORKING_BY_Q_VALUE_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{serial_forking_by_q_value}) { + rowprocessingerror(threadid(),'cannot find serial_forking_by_q_value attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"serial_forking_by_q_value attribute found",getlogger(__PACKAGE__)); + } + + return $result; +} + +sub _provision_subscribers_base_prefs_checks { + + my ($context,$result) = @_; + + $context->{attributes} = {}; + + eval { + $context->{attributes}->{allowed_clis} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_CLIS_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{allowed_clis}) { + rowprocessingerror(threadid(),'cannot find allowed_clis attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"allowed_clis attribute found",getlogger(__PACKAGE__)); + } + + eval { + $context->{attributes}->{cli} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CLI_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{cli}) { + rowprocessingerror(threadid(),'cannot find cli attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"cli attribute found",getlogger(__PACKAGE__)); + } + + eval { + $context->{attributes}->{ac} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::AC_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{ac}) { + rowprocessingerror(threadid(),'cannot find ac attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"ac attribute found",getlogger(__PACKAGE__)); + } + + eval { + $context->{attributes}->{cc} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CC_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{cc}) { + rowprocessingerror(threadid(),'cannot find cc attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"cc attribute found",getlogger(__PACKAGE__)); + } + + eval { + $context->{attributes}->{account_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ACCOUNT_ID_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{account_id}) { + rowprocessingerror(threadid(),'cannot find account_id attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"account_id attribute found",getlogger(__PACKAGE__)); + } + + return $result; + +} + +sub _provision_ccs_susbcriber_init_context { + + my ($context,$subscriber_group) = @_; + + my $result = 1; + + $context->{log_info} = []; + $context->{log_warning} = []; + $context->{log_error} = []; + + my $first = $subscriber_group->[0]; + + unless (defined $first->{switch_number} and length($first->{switch_number}) > 0) { + _warn($context,'empty switch_number ignored'); + $result = 0; + } + + $context->{numbers} = {}; + my $primary_number = {}; + ($primary_number->{cc},$primary_number->{ac},$primary_number->{sn}) = split_number($first->{switch_number}); + $primary_number->{number} = $primary_number->{cc} . $primary_number->{ac} . $primary_number->{sn}; + $context->{numbers}->{primary} = $primary_number; + my @service_numbers = (); + foreach my $subscriber (@$subscriber_group) { + $subscriber->{comment} = Encode::decode('utf8',$subscriber->{comment}) if defined $subscriber->{comment}; #mark as utf-8 + #if ($subscriber->{comment} =~ /gek/i) { + # print "blah"; + #} + if (defined $subscriber->{comment} and $subscriber->{comment} =~ /k(\x{00dc}|\x{00fc})ndig/i) { + #if (defined $subscriber->{comment} and $subscriber->{comment} =~ /k..ndig/i) { + _warn($context,"$subscriber->{customer} $subscriber->{service_number} $subscriber->{comment}, skipping"); + next; + } + my $alias = {}; + ($alias->{cc},$alias->{ac},$alias->{sn}) = split_number($subscriber->{service_number}); + $alias->{number} = $alias->{cc} . $alias->{ac} . $alias->{sn}; + push(@service_numbers,$alias); + } + $context->{numbers}->{other} = \@service_numbers; + #should we skip if there are no service numbers? + if ((scalar @service_numbers) == 0) { + $result = 0; + } + + #$context->{domain} = ; + #$context->{reseller} = ; + #$context->{billing_profile} = ; + + $first->{sip_username} = $primary_number->{number}; + $first->{sip_password} = _generate_sippassword($ccs_sippassword_length); + $first->{web_username} = undef; + $first->{web_password} = undef; + + $context->{prov_subscriber} = {}; + $context->{prov_subscriber}->{username} = $first->{sip_username}; + $context->{prov_subscriber}->{password} = $first->{sip_password}; + $context->{prov_subscriber}->{webusername} = $first->{web_username}; + $context->{prov_subscriber}->{webpassword} = $first->{web_password}; + #my $webusername = $first->{web_username}; + $context->{prov_subscriber}->{is_pbx_pilot} = '1'; + + $context->{prov_subscriber}->{uuid} = create_uuid(); + $context->{prov_subscriber}->{domain_id} = $context->{domain}->{prov_domain}->{id}; + + $context->{bill_subscriber} = {}; + $context->{bill_subscriber}->{username} = $first->{sip_username}; + $context->{bill_subscriber}->{domain_id} = $context->{domain}->{id}; + $context->{bill_subscriber}->{uuid} = $context->{prov_subscriber}->{uuid}; + + $context->{contract} = { + #external_id => $first->{customer}, + create_timestamp => $context->{now}, + product_id => $context->{pbx_account_product}->{id}, + contact => { + reseller_id => $context->{reseller}->{id}, + + #firstname => $subscriber->{first_name}, + #lastname => $subscriber->{last_name}, + #compregnum => $subscriber->{company_registration_number}, + company => $first->{customer}, + #street => $subscriber->{street}, + #postcode => $subscriber->{postal_code}, + #city => $subscriber->{city_name}, + ##country => $context->{contract}->{contact}->{country}, + #phonenumber => $subscriber->{phone_number}, + #email => $subscriber->{email}, + #vatnum => $subscriber->{vat_number}, + ##$contact_hash_field => $subscriber->{contact_hash}, + + gpp0 => $first->{comment}, + + $ccs_contact_identifier_field => ($first->{customer} eq '???' ? $first->{customer} . '_' . $first->{switch_number} : $first->{customer}), + }, + }; + + $context->{voip_numbers} = {}; + $context->{voip_numbers}->{primary} = undef; + $context->{voip_numbers}->{other} = []; + $context->{aliases} = {}; + $context->{aliases}->{primary} = undef; + $context->{aliases}->{other} = []; + + $context->{voicemail_user} = {}; + $context->{voicemail_user}->{customer_id} = $context->{prov_subscriber}->{uuid}; + $context->{voicemail_user}->{mailbox} = $context->{numbers}->{primary}->{number}; + $context->{voicemail_user}->{password} = sprintf("%04d", int(rand 10000)); + + my @registrations = (); + my @trusted_sources = (); + my $q = 1.0; + foreach my $ip (@css_trusted_source_ips) { + push(@registrations,{ + q => $q, + username => $first->{sip_username}, + domain => $context->{domain}->{domain}, + contact => 'sip:' . $first->{sip_username} . '@' . $ip . ':5060', + ruid => NGCP::BulkProcessor::Dao::Trunk::kamailio::location::next_ruid(), + }); + $q = $q / 2.0; + push(@trusted_sources,{ + src_ip => $ip, + protocol => $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources::PROTOCOL_ANY, + #from_pattern => 'sip:.+' . quotemeta($context->{domain}->{domain}), + }); + } + $context->{registrations} = \@registrations; + $context->{trusted_sources} = \@trusted_sources; + + #$context->{preferences} = {}; + + #$context->{preferences}->{gpp} = [ + # $first->{"_len"}, + # $first->{"_cpe_mta_mac_address"}, + # $first->{"_cpe_model"}, + # $first->{"_cpe_vendor"}, + #]; + + return $result; + +} + +sub _update_ccs_contact { + + my ($context) = @_; + + my $existing_contacts = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_reselleridfields( + $context->{reseller}->{id}, + { $ccs_contact_identifier_field => $context->{contract}->{contact}->{$ccs_contact_identifier_field} }, + ); + if ((scalar @$existing_contacts) == 0) { + $context->{contract}->{contact}->{id} = NGCP::BulkProcessor::Dao::Trunk::billing::contacts::insert_row($context->{db}, + $context->{contract}->{contact}, + ); + $context->{contract}->{contact_id} = $context->{contract}->{contact}->{id}; + _info($context,"contact id $context->{contract}->{contact}->{id} created",1); + } else { + my $existing_contact = $existing_contacts->[0]; + if ((scalar @$existing_contacts) > 1) { + _warn($context,(scalar @$existing_contacts) . " existing contacts found, using first contact id $existing_contact->{id}"); + } else { + _info($context,"existing contact id $existing_contact->{id} found",1); + } + $context->{contract}->{contact}->{id} = $existing_contact->{id}; + $context->{contract}->{contact_id} = $context->{contract}->{contact}->{id}; + + #my $existing_contracts = NGCP::BulkProcessor::Dao::Trunk::billing::contracts::findby_contactid($existing_contact->{id}); + #if ((scalar @$existing_contracts) > 0) { + # my $existing_contract = $existing_contracts->[0]; + # if ((scalar @$existing_contracts) > 1) { + # _warn($context,(scalar @$existing_contracts) . " existing contracts found, using first contact id $existing_contract->{id}"); + # } else { + # _info($context,"existing contract id $existing_contact->{id} found",1); + # } + # $context->{contract}->{id} = $existing_contract->{id}; + # $context->{bill_subscriber}->{contract_id} = $context->{contract}->{id}; + # $context->{prov_subscriber}->{account_id} = $context->{contract}->{id}; + #} else { + # _warn($context,"no existing contract of contact id $existing_contact->{id} found, will be created"); + #} + } + $context->{contract}->{contact_id} = $context->{contract}->{contact}->{id}; + + return 1; + +} + +sub _update_ccs_preferences { + + my ($context) = @_; + + my $result = 1; + + $context->{preferences}->{extended_dialing_mode} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{extended_dialing_mode}, + 'extended_send_dialed'), value => 'extended_send_dialed' }; + + $context->{preferences}->{e164_to_ruri} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{e164_to_ruri}, + '1'), value => '1' }; + + $context->{preferences}->{serial_forking_by_q_value} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{serial_forking_by_q_value}, + '1'), value => '1' }; + + #if (defined $context->{preferences}->{gpp}) { + # my $gpp_idx = 0; + # foreach my $gpp_val (@{$context->{preferences}->{gpp}}) { + # my $gpp_attr = 'gpp' . $gpp_idx; + # $context->{preferences}->{$gpp_attr} = { id => set_subscriber_preference($context, + # $context->{prov_subscriber}->{id}, + # $context->{attributes}->{$gpp_attr}, + # $gpp_attr), value => $gpp_attr }; + # _info($context,"$gpp_attr preference set to $gpp_val",1); + # $gpp_idx++; + # } + #} + + return $result; + +} + +sub _set_registrations { + + my ($context) = @_; + my $result = 1; + foreach my $registration (@{$context->{registrations}}) { + #print "blah"; + $registration->{id} = NGCP::BulkProcessor::Dao::Trunk::kamailio::location::insert_row($context->{db}, + %$registration); + _info($context,"permanent registration $registration->{contact} added",1); + } + foreach my $trusted_source (@{$context->{trusted_sources}}) { + #print "blah"; + $trusted_source->{id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources::insert_row($context->{db},{ + %$trusted_source, + subscriber_id => $context->{prov_subscriber}->{id}, + uuid => $context->{prov_subscriber}->{uuid}, + }); + _info($context,"trusted source $trusted_source->{protocol} $trusted_source->{src_ip} from $trusted_source->{from_pattern} added",1); + } + return $result; + +} + sub _error { my ($context,$message) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Settings.pm index 8626a78..3e1e93b 100755 --- a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/Settings.pm @@ -27,7 +27,7 @@ use NGCP::BulkProcessor::LoadConfig qw( split_tuple parse_regexp ); -use NGCP::BulkProcessor::Utils qw(prompt timestampdigits); +use NGCP::BulkProcessor::Utils qw(prompt timestampdigits check_ipnet); #format_number check_ipnet require Exporter; @@ -36,6 +36,7 @@ our @EXPORT_OK = qw( update_settings update_cc_ac_map update_barring_profiles + split_number check_dry $input_path @@ -52,22 +53,22 @@ our @EXPORT_OK = qw( $force $import_db_file - @subscriber_filenames - $subscriber_import_numofthreads - $ignore_subscriber_unique - $subscriber_import_single_row_txn + @mta_subscriber_filenames + $mta_subscriber_import_numofthreads + $ignore_mta_subscriber_unique + $mta_subscriber_import_single_row_txn - $provision_subscriber_rownum_start - $provision_subscriber_multithreading - $provision_subscriber_numofthreads - $webpassword_length - $webusername_length - $sippassword_length + $provision_mta_subscriber_rownum_start + $provision_mta_subscriber_multithreading + $provision_mta_subscriber_numofthreads + $mta_webpassword_length + $mta_webusername_length + $mta_sippassword_length - $default_domain - $default_reseller_name - $default_billing_profile_name - $default_barring + $mta_default_domain + $mta_default_reseller_name + $mta_default_billing_profile_name + $mta_default_barring $cc_ac_map_yml $cc_ac_map @@ -78,6 +79,19 @@ our @EXPORT_OK = qw( $barring_profiles_yml $barring_profiles + + $ccs_subscriber_filename + $ignore_ccs_subscriber_unique + $provision_ccs_subscriber_rownum_start + $ccs_subscriber_import_single_row_txn + $provision_ccs_subscriber_multithreading + $provision_ccs_subscriber_numofthreads + $ccs_reseller_name + $ccs_billing_profile_name + $ccs_domain + $ccs_sippassword_length + + @css_trusted_source_ips ); #$default_channels_map @@ -95,22 +109,22 @@ our $run_id = ''; our $import_db_file = _get_import_db_file($run_id,'import'); our $import_multithreading = 0; #$enablemultithreading; -our @subscriber_filenames = (); -our $subscriber_import_numofthreads = $cpucount; -our $ignore_subscriber_unique = 0; -our $subscriber_import_single_row_txn = 1; +our @mta_subscriber_filenames = (); +our $mta_subscriber_import_numofthreads = $cpucount; +our $ignore_mta_subscriber_unique = 0; +our $mta_subscriber_import_single_row_txn = 1; -our $provision_subscriber_rownum_start = 0; #all lines -our $provision_subscriber_multithreading = $enablemultithreading; -our $provision_subscriber_numofthreads = $cpucount; -our $webpassword_length = 8; -our $webusername_length = 8; -our $sippassword_length = 16; +our $provision_mta_subscriber_rownum_start = 0; #all lines +our $provision_mta_subscriber_multithreading = $enablemultithreading; +our $provision_mta_subscriber_numofthreads = $cpucount; +our $mta_webpassword_length = 8; +our $mta_webusername_length = 8; +our $mta_sippassword_length = 16; -our $default_domain = undef; -our $default_reseller_name = 'default'; -our $default_billing_profile_name = 'Default Billing Profile'; -our $default_barring = undef; +our $mta_default_domain = undef; +our $mta_default_reseller_name = 'default'; +our $mta_default_billing_profile_name = 'Default Billing Profile'; +our $mta_default_barring = undef; our $cc_ac_map_yml = 'cc_ac.yml'; our $cc_ac_map = {}; @@ -122,6 +136,20 @@ our $ac_len = {}; our $barring_profiles_yml = undef; our $barring_profiles = {}; +our $ccs_subscriber_filename = undef; +our $ignore_ccs_subscriber_unique = 0; +our $provision_ccs_subscriber_rownum_start = 0; +our $ccs_subscriber_import_single_row_txn = 1; +our $provision_ccs_subscriber_multithreading = $enablemultithreading; +our $provision_ccs_subscriber_numofthreads = $cpucount; + +our $ccs_reseller_name = 'default'; +our $ccs_billing_profile_name = 'Default Billing Profile'; +our $ccs_domain = undef; +our $ccs_sippassword_length = 16; + +our @css_trusted_source_ips = (); + sub update_settings { my ($data,$configfile) = @_; @@ -152,40 +180,64 @@ sub update_settings { # configurationerror($configfile,"import_multithreading must be disabled to preserve record order",getlogger(__PACKAGE__)); #} - @subscriber_filenames = _get_import_filenames(\@subscriber_filenames,$data,'subscriber_filenames'); - $subscriber_import_numofthreads = _get_numofthreads($cpucount,$data,'subscriber_import_numofthreads'); - $ignore_subscriber_unique = $data->{ignore_subscriber_unique} if exists $data->{ignore_subscriber_unique}; - $subscriber_import_single_row_txn = $data->{subscriber_import_single_row_txn} if exists $data->{subscriber_import_single_row_txn}; - - $provision_subscriber_rownum_start = $data->{provision_subscriber_rownum_start} if exists $data->{provision_subscriber_rownum_start}; - $provision_subscriber_multithreading = $data->{provision_subscriber_multithreading} if exists $data->{provision_subscriber_multithreading}; - $provision_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_subscriber_numofthreads'); - $webpassword_length = $data->{webpassword_length} if exists $data->{webpassword_length}; - if (not defined $webpassword_length or $webpassword_length <= 7) { - configurationerror($configfile,'webpassword_length greater than 7 required',getlogger(__PACKAGE__)); + @mta_subscriber_filenames = _get_import_filenames(\@mta_subscriber_filenames,$data,'mta_subscriber_filenames'); + $mta_subscriber_import_numofthreads = _get_numofthreads($cpucount,$data,'mta_subscriber_import_numofthreads'); + $ignore_mta_subscriber_unique = $data->{ignore_mta_subscriber_unique} if exists $data->{ignore_mta_subscriber_unique}; + $mta_subscriber_import_single_row_txn = $data->{mta_subscriber_import_single_row_txn} if exists $data->{mta_subscriber_import_single_row_txn}; + + $provision_mta_subscriber_rownum_start = $data->{provision_mta_subscriber_rownum_start} if exists $data->{provision_mta_subscriber_rownum_start}; + $provision_mta_subscriber_multithreading = $data->{provision_mta_subscriber_multithreading} if exists $data->{provision_mta_subscriber_multithreading}; + $provision_mta_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_mta_subscriber_numofthreads'); + $mta_webpassword_length = $data->{mta_webpassword_length} if exists $data->{mta_webpassword_length}; + if (not defined $mta_webpassword_length or $mta_webpassword_length <= 7) { + configurationerror($configfile,'mta_webpassword_length greater than 7 required',getlogger(__PACKAGE__)); $result = 0; } - $webusername_length = $data->{webusername_length} if exists $data->{webusername_length}; - if (not defined $webusername_length or $webusername_length <= 7) { - configurationerror($configfile,'webusername_length greater than 7 required',getlogger(__PACKAGE__)); + $mta_webusername_length = $data->{mta_webusername_length} if exists $data->{mta_webusername_length}; + if (not defined $mta_webusername_length or $mta_webusername_length <= 7) { + configurationerror($configfile,'mta_webusername_length greater than 7 required',getlogger(__PACKAGE__)); $result = 0; } - $sippassword_length = $data->{sippassword_length} if exists $data->{sippassword_length}; - if (not defined $sippassword_length or $sippassword_length <= 7) { - configurationerror($configfile,'sippassword_length greater than 7 required',getlogger(__PACKAGE__)); + $mta_sippassword_length = $data->{mta_sippassword_length} if exists $data->{mta_sippassword_length}; + if (not defined $mta_sippassword_length or $mta_sippassword_length <= 7) { + configurationerror($configfile,'mta_sippassword_length greater than 7 required',getlogger(__PACKAGE__)); $result = 0; } #$default_channels = $data->{default_channels} if exists $data->{default_channels}; - $default_domain = $data->{default_domain} if exists $data->{default_domain}; - $default_reseller_name = $data->{default_reseller_name} if exists $data->{default_reseller_name}; - $default_billing_profile_name = $data->{default_billing_profile_name} if exists $data->{default_billing_profile_name}; - $default_barring = $data->{default_barring} if exists $data->{default_barring}; + $mta_default_domain = $data->{mta_default_domain} if exists $data->{mta_default_domain}; + $mta_default_reseller_name = $data->{mta_default_reseller_name} if exists $data->{mta_default_reseller_name}; + $mta_default_billing_profile_name = $data->{mta_default_billing_profile_name} if exists $data->{mta_default_billing_profile_name}; + $mta_default_barring = $data->{mta_default_barring} if exists $data->{mta_default_barring}; + $cc_ac_map_yml = $data->{cc_ac_map_yml} if exists $data->{cc_ac_map_yml}; $default_cc = $data->{default_cc} if exists $data->{default_cc}; $barring_profiles_yml = $data->{barring_profiles_yml} if exists $data->{barring_profiles_yml}; + ($ccs_subscriber_filename) = _get_import_filenames([ $ccs_subscriber_filename ],$data,'ccs_subscriber_filename'); + $ignore_ccs_subscriber_unique = $data->{ignore_ccs_subscriber_unique} if exists $data->{ignore_ccs_subscriber_unique}; + $provision_ccs_subscriber_rownum_start = $data->{provision_ccs_subscriber_rownum_start} if exists $data->{provision_ccs_subscriber_rownum_start}; + $ccs_subscriber_import_single_row_txn = $data->{ccs_subscriber_import_single_row_txn} if exists $data->{ccs_subscriber_import_single_row_txn}; + $provision_ccs_subscriber_multithreading = $data->{provision_ccs_subscriber_multithreading} if exists $data->{provision_ccs_subscriber_multithreading}; + $provision_ccs_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_ccs_subscriber_numofthreads'); + $ccs_reseller_name = $data->{ccs_reseller_name} if exists $data->{ccs_reseller_name}; + $ccs_billing_profile_name = $data->{ccs_billing_profile_name} if exists $data->{ccs_billing_profile_name}; + $ccs_domain = $data->{ccs_domain} if exists $data->{ccs_domain}; + $ccs_sippassword_length = $data->{ccs_sippassword_length} if exists $data->{ccs_sippassword_length}; + if (not defined $ccs_sippassword_length or $ccs_sippassword_length <= 7) { + configurationerror($configfile,'ccs_sippassword_length greater than 7 required',getlogger(__PACKAGE__)); + $result = 0; + } + + @css_trusted_source_ips = split_tuple($data->{css_trusted_source_ips}) if exists $data->{css_trusted_source_ips}; + foreach my $ip (@css_trusted_source_ips) { + unless (check_ipnet($ip)) { + configurationerror($configfile,"invalid ip address '$ip'",getlogger(__PACKAGE__)); + $result = 0; + } + } + return $result; } @@ -236,6 +288,42 @@ sub update_cc_ac_map { } +sub split_number { + my ($dn) = @_; + my ($cc,$ac,$sn) = ('','',$dn); + if ($cc_ac_map) { + if ($default_cc) { + $cc = $default_cc; + $dn =~ s/^0//; + $sn = $dn; + } else { + foreach my $cc_length ($cc_len_min .. $cc_len_max) { + my ($_cc,$_dn) = (substr($dn,0,$cc_length), substr($dn,$cc_length)); + if (exists $cc_ac_map->{$_cc}) { + $cc = $_cc; + $sn = $_dn; + $dn = $_dn; + last; + } + } + } + if (exists $cc_ac_map->{$cc}) { + my $ac_map = $cc_ac_map->{$cc}; + foreach my $ac_length ($ac_len->{$cc}->{min} .. $ac_len->{$cc}->{max}) { + my ($_ac,$_sn) = (substr($dn,0,$ac_length), substr($dn,$ac_length)); + if (exists $ac_map->{$_ac}) { + $ac = $_ac; + $sn = $_sn; + #$dn = ''; + last; + } + } + } + } + return ($cc,$ac,$sn); + +} + sub update_barring_profiles { my ($data,$configfile) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/barring_profiles.yml b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/barring_profiles.yml index 31631dd..8a157cc 100755 --- a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/barring_profiles.yml +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/barring_profiles.yml @@ -1,2 +1,2 @@ -default: - default: 'NCOS 0' +UPC: + default: '0' diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/process.pl b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/process.pl index 132d18f..2fdc9bc 100755 --- a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/process.pl @@ -24,11 +24,13 @@ use NGCP::BulkProcessor::Projects::Migration::UPCAT::Settings qw( $force $run_id - @subscriber_filenames + @mta_subscriber_filenames $cc_ac_map_yml $barring_profiles_yml + $ccs_subscriber_filename + ); #$allowed_ips @@ -65,7 +67,7 @@ use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles); use NGCP::BulkProcessor::Projects::Migration::UPCAT::ProjectConnectorPool qw(destroy_all_dbs); -use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw(); +use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); @@ -84,11 +86,13 @@ use NGCP::BulkProcessor::Projects::Migration::UPCAT::Check qw( ); use NGCP::BulkProcessor::Projects::Migration::UPCAT::Import qw( - import_subscriber + import_mta_subscriber + import_ccs_subscriber ); use NGCP::BulkProcessor::Projects::Migration::UPCAT::Provisioning qw( - provision_subscribers + provision_mta_subscribers + provision_ccs_subscribers ); scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet @@ -105,13 +109,22 @@ push(@TASK_OPTS,$cleanup_task_opt); my $cleanup_all_task_opt = 'cleanup_all'; push(@TASK_OPTS,$cleanup_all_task_opt); -my $import_subscriber_task_opt = 'import_subscriber'; -push(@TASK_OPTS,$import_subscriber_task_opt); -my $import_truncate_subscriber_task_opt = 'truncate_subscriber'; -push(@TASK_OPTS,$import_truncate_subscriber_task_opt); +my $import_mta_subscriber_task_opt = 'import_mta_subscriber'; +push(@TASK_OPTS,$import_mta_subscriber_task_opt); +my $truncate_mta_subscriber_task_opt = 'truncate_mta_subscriber'; +push(@TASK_OPTS,$truncate_mta_subscriber_task_opt); + +my $create_mta_subscriber_task_opt = 'create_mta_subscriber'; +push(@TASK_OPTS,$create_mta_subscriber_task_opt); + +my $import_ccs_subscriber_task_opt = 'import_ccs_subscriber'; +push(@TASK_OPTS,$import_ccs_subscriber_task_opt); +my $truncate_ccs_subscriber_task_opt = 'truncate_ccs_subscriber'; +push(@TASK_OPTS,$truncate_ccs_subscriber_task_opt); + +my $create_ccs_subscriber_task_opt = 'create_ccs_subscriber'; +push(@TASK_OPTS,$create_ccs_subscriber_task_opt); -my $create_subscriber_task_opt = 'create_subscriber'; -push(@TASK_OPTS,$create_subscriber_task_opt); if (init()) { main(); @@ -165,18 +178,29 @@ sub main() { } elsif (lc($cleanup_all_task_opt) eq lc($task)) { $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result); - } elsif (lc($import_subscriber_task_opt) eq lc($task)) { - $result &= import_subscriber_task(\@messages) if taskinfo($import_subscriber_task_opt,$result); - } elsif (lc($import_truncate_subscriber_task_opt) eq lc($task)) { - $result &= import_truncate_subscriber_task(\@messages) if taskinfo($import_truncate_subscriber_task_opt,$result); + } elsif (lc($import_mta_subscriber_task_opt) eq lc($task)) { + $result &= import_mta_subscriber_task(\@messages) if taskinfo($import_mta_subscriber_task_opt,$result); + } elsif (lc($truncate_mta_subscriber_task_opt) eq lc($task)) { + $result &= truncate_mta_subscriber_task(\@messages) if taskinfo($truncate_mta_subscriber_task_opt,$result); - } elsif (lc($create_subscriber_task_opt) eq lc($task)) { - if (taskinfo($create_subscriber_task_opt,$result,1)) { + } elsif (lc($create_mta_subscriber_task_opt) eq lc($task)) { + if (taskinfo($create_mta_subscriber_task_opt,$result,1)) { next unless check_dry(); - $result &= create_subscriber_task(\@messages); + $result &= create_mta_subscriber_task(\@messages); $completion |= 1; } + } elsif (lc($import_ccs_subscriber_task_opt) eq lc($task)) { + $result &= import_ccs_subscriber_task(\@messages) if taskinfo($import_ccs_subscriber_task_opt,$result); + } elsif (lc($truncate_ccs_subscriber_task_opt) eq lc($task)) { + $result &= truncate_ccs_subscriber_task(\@messages) if taskinfo($truncate_ccs_subscriber_task_opt,$result); + + } elsif (lc($create_ccs_subscriber_task_opt) eq lc($task)) { + if (taskinfo($create_ccs_subscriber_task_opt,$result,1)) { + next unless check_dry(); + $result &= create_ccs_subscriber_task(\@messages); + $completion |= 1; + } } else { $result = 0; @@ -260,58 +284,58 @@ sub cleanup_task { } } -sub import_subscriber_task { +sub import_mta_subscriber_task { my ($messages) = @_; my ($result,$warning_count) = (0,0); eval { - ($result,$warning_count) = import_subscriber(@subscriber_filenames); + ($result,$warning_count) = import_mta_subscriber(@mta_subscriber_filenames); }; my $err = $@; my $stats = ": $warning_count warnings"; eval { - $stats .= "\n total subscriber records: " . - NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_ccacsn() . ' rows'; - my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_delta( - $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::added_delta + $stats .= "\n total mta subscriber records: " . + NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() . ' rows'; + my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::added_delta ); $stats .= "\n new: $added_count rows"; - my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_delta( - $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::updated_delta + my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::updated_delta ); $stats .= "\n existing: $existing_count rows"; - my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_delta( - $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::deleted_delta + my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::deleted_delta ); $stats .= "\n removed: $deleted_count rows"; }; if ($err or !$result) { - push(@$messages,"importing subscribers INCOMPLETE$stats"); + push(@$messages,"importing mta subscribers INCOMPLETE$stats"); } else { - push(@$messages,"importing subscribers completed$stats"); + push(@$messages,"importing mta subscribers completed$stats"); } destroy_all_dbs(); #every task should leave with closed connections. return $result; } -sub import_truncate_subscriber_task { +sub truncate_mta_subscriber_task { my ($messages) = @_; my $result = 0; eval { - $result = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::create_table(1); + $result = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::create_table(1); }; my $err = $@; my $stats = ''; eval { - $stats .= "\n total subscriber records: " . - NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_dn() . ' rows'; + $stats .= "\n total mta subscriber records: " . + NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() . ' rows'; }; if ($err or !$result) { - push(@$messages,"truncating imported subscribers INCOMPLETE$stats"); + push(@$messages,"truncating imported mta subscribers INCOMPLETE$stats"); } else { - push(@$messages,"truncating imported subscribers completed$stats"); + push(@$messages,"truncating imported mta subscribers completed$stats"); } destroy_all_dbs(); #every task should leave with closed connections. return $result; @@ -320,12 +344,12 @@ sub import_truncate_subscriber_task { -sub create_subscriber_task { +sub create_mta_subscriber_task { my ($messages) = @_; my ($result,$warning_count,$nonunique_contacts) = (0,0,{}); eval { - ($result,$warning_count,$nonunique_contacts) = provision_subscribers(); + ($result,$warning_count,$nonunique_contacts) = provision_mta_subscribers(); }; my $err = $@; my $stats = ": $warning_count warnings"; @@ -353,9 +377,9 @@ sub create_subscriber_task { if (scalar keys %$nonunique_contacts) > 0; }; if ($err or !$result) { - push(@$messages,"create subscribers INCOMPLETE$stats"); + push(@$messages,"create mta subscribers INCOMPLETE$stats"); } else { - push(@$messages,"create subscribers completed$stats"); + push(@$messages,"create mta subscribers completed$stats"); #if (not $dry) { # push(@$messages,"YOU MIGHT WANT TO RESTART KAMAILIO FOR PERMANENT REGISTRATIONS TO COME INTO EFFECT"); #} @@ -365,6 +389,105 @@ sub create_subscriber_task { } +sub import_ccs_subscriber_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = import_ccs_subscriber($ccs_subscriber_filename); + }; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + $stats .= "\n total ccs subscriber records: " . + NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::countby_service_number() . ' rows'; + my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::added_delta + ); + $stats .= "\n new: $added_count rows"; + my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::updated_delta + ); + $stats .= "\n existing: $existing_count rows"; + my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::deleted_delta + ); + $stats .= "\n removed: $deleted_count rows"; + }; + if ($err or !$result) { + push(@$messages,"importing ccs subscribers INCOMPLETE$stats"); + } else { + push(@$messages,"importing ccs subscribers completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return $result; + +} + +sub truncate_ccs_subscriber_task { + + my ($messages) = @_; + my $result = 0; + eval { + $result = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::create_table(1); + }; + my $err = $@; + my $stats = ''; + eval { + $stats .= "\n total ccs subscriber records: " . + NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::CcsSubscriber::countby_service_number() . ' rows'; + }; + if ($err or !$result) { + push(@$messages,"truncating imported ccs subscribers INCOMPLETE$stats"); + } else { + push(@$messages,"truncating imported ccs subscribers completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return $result; + +} + +sub create_ccs_subscriber_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = provision_ccs_subscribers(); + }; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + $stats .= "\n total contracts: " . + NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid(undef,undef) . ' rows'; + $stats .= "\n total subscribers: " . + NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(undef,undef) . ' rows'; + + $stats .= "\n total aliases: " . + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::countby_subscriberidisprimary(undef,undef) . ' rows'; + $stats .= "\n primary aliases: " . + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::countby_subscriberidisprimary(undef,1) . ' rows'; + + #$stats .= "\n call forwards: " . + # NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef,undef) . ' rows'; + + #$stats .= "\n registrations: " . + # NGCP::BulkProcessor::Dao::Trunk::kamailio::location::countby_usernamedomain(undef,undef) . ' rows'; + + #$stats .= "\n trusted sources: " . + # NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources::countby_subscriberid(undef) . ' rows'; + }; + if ($err or !$result) { + push(@$messages,"create ccs subscribers INCOMPLETE$stats"); + } else { + push(@$messages,"create ccs subscribers completed$stats"); + if (not $dry) { + push(@$messages,"YOU MIGHT WANT TO RESTART KAMAILIO FOR PERMANENT REGISTRATIONS TO COME INTO EFFECT"); + } + } + destroy_all_dbs(); + return $result; + +} #END { # # this should not be required explicitly, but prevents Log4Perl's diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/settings.cfg index d30fb01..e3f1398 100755 --- a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/settings.cfg @@ -4,25 +4,39 @@ import_multithreading = 0 #subscriber_filenames = /home/rkrenn/temp/upcat/export1.csv,/home/rkrenn/temp/upcat/export2.csv -subscriber_filenames = /home/rkrenn/temp/upcat/ExportTestMigration.csv -subscriber_import_numofthreads = 2 -ignore_subscriber_unique = 0 -subscriber_import_single_row_txn = 1 +mta_subscriber_filenames = /home/rkrenn/temp/upcat/ExportTestMigration.csv +mta_subscriber_import_numofthreads = 2 +ignore_mta_subscriber_unique = 0 +mta_subscriber_import_single_row_txn = 1 + +provision_mta_subscriber_rownum_start = 2 +provision_mta_subscriber_multithreading = 1 +provision_mta_subscriber_numofthreads = 2 +mta_webpassword_length = 8 +mta_webusername_length = 8 +mta_sippassword_length = 16 + +mta_default_domain = d20.upc.at +mta_default_reseller_name = UPC +mta_default_billing_profile_name = Default Billing UPC +mta_default_barring = + +cc_ac_map_yml = cc_ac.yml +default_cc = 43 +barring_profiles_yml = barring_profiles.yml -provision_subscriber_rownum_start = 2 -provision_subscriber_multithreading = 1 -provision_subscriber_numofthreads = 2 -webpassword_length = 8 -webusername_length = 8 -sippassword_length = 16 report_filename = provision.txt #report_filename = provision_%s.json #default_channels = 1 -default_domain = d20.upc.at -default_reseller_name = default -default_billing_profile_name = Default Billing Profile -default_barring = -cc_ac_map_yml = cc_ac.yml -default_cc = 43 -barring_profiles_yml = barring_profiles.yml \ No newline at end of file +ccs_subscriber_filename = /home/rkrenn/temp/upcat/CCS_ICM_Nummern.xlsx +ignore_ccs_subscriber_unique = 0 +provision_ccs_subscriber_rownum_start = 2 +ccs_subscriber_import_single_row_txn = 1 +provision_ccs_subscriber_multithreading = 1 +provision_ccs_subscriber_numofthreads = 2 +ccs_reseller_name = UPC +ccs_billing_profile_name = Default Billing UPC +ccs_domain = in-ivr.upc.at +ccs_sippassword_length = 16 +css_trusted_source_ips = 80.110.2.228,80.110.2.229 \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/settings.debug.cfg index d30fb01..36f1073 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/settings.debug.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Migration/UPCAT/settings.debug.cfg @@ -4,25 +4,40 @@ import_multithreading = 0 #subscriber_filenames = /home/rkrenn/temp/upcat/export1.csv,/home/rkrenn/temp/upcat/export2.csv -subscriber_filenames = /home/rkrenn/temp/upcat/ExportTestMigration.csv -subscriber_import_numofthreads = 2 -ignore_subscriber_unique = 0 -subscriber_import_single_row_txn = 1 +mta_subscriber_filenames = /home/rkrenn/temp/upcat/ExportTestMigration.csv +mta_subscriber_import_numofthreads = 2 +ignore_mta_subscriber_unique = 0 +mta_subscriber_import_single_row_txn = 1 + +provision_mta_subscriber_rownum_start = 2 +provision_mta_subscriber_multithreading = 1 +provision_mta_subscriber_numofthreads = 2 +mta_webpassword_length = 8 +mta_webusername_length = 8 +mta_sippassword_length = 16 + +mta_default_domain = d20.upc.at +mta_default_reseller_name = UPC +mta_default_billing_profile_name = Default Billing UPC +mta_default_barring = + +cc_ac_map_yml = cc_ac.yml +default_cc = 43 +barring_profiles_yml = barring_profiles.yml -provision_subscriber_rownum_start = 2 -provision_subscriber_multithreading = 1 -provision_subscriber_numofthreads = 2 -webpassword_length = 8 -webusername_length = 8 -sippassword_length = 16 report_filename = provision.txt #report_filename = provision_%s.json #default_channels = 1 -default_domain = d20.upc.at -default_reseller_name = default -default_billing_profile_name = Default Billing Profile -default_barring = -cc_ac_map_yml = cc_ac.yml -default_cc = 43 -barring_profiles_yml = barring_profiles.yml \ No newline at end of file +ccs_subscriber_filename = /home/rkrenn/temp/upcat/CCS_ICM_Nummern.xlsx +ignore_ccs_subscriber_unique = 0 +provision_ccs_subscriber_rownum_start = 2 +ccs_subscriber_import_single_row_txn = 1 +provision_ccs_subscriber_multithreading = 1 +provision_ccs_subscriber_numofthreads = 2 +ccs_reseller_name = UPC +ccs_billing_profile_name = Default Billing UPC +ccs_domain = d20.upc.at +#in-ivr.upc.at +ccs_sippassword_length = 16 +css_trusted_source_ips = 80.110.2.228,80.110.2.229 \ No newline at end of file