diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm index 613a30b..dfcb5c1 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_dbaliases.pm @@ -71,9 +71,12 @@ sub findby_subscriberid_username { my $table = $db->tableidentifier($tablename); my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . - $db->columnidentifier('subscriber_id') . ' = ?' . - ' AND ' . $db->columnidentifier('username') . ' = ?'; - my @params = ($subscriber_id,$username); + $db->columnidentifier('subscriber_id') . ' = ?'; + my @params = ($subscriber_id); + if (defined $username) { + $stmt .= ' AND ' . $db->columnidentifier('username') . ' = ?'; + push(@params,$username); + } my $rows = $xa_db->db_get_all_arrayref($stmt,@params); return buildrecords_fromrows($rows,$load_recursive); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm index 05c12d4..6b63256 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/provisioning/voip_preferences.pm @@ -37,6 +37,7 @@ our @EXPORT_OK = qw( $ALLOWED_IPS_GRP_ATTRIBUTE $CONCURRENT_MAX_TOTAL_ATTRIBUTE + $CONCURRENT_MAX_PER_ACCOUNT ); #$FORCE_OUTBOUND_CALLS_TO_PEER @@ -84,6 +85,8 @@ our $FORCE_INBOUND_CALLS_TO_PEER = 'force_inbound_calls_to_peer'; our $ALLOWED_IPS_GRP_ATTRIBUTE = 'allowed_ips_grp'; our $CONCURRENT_MAX_TOTAL_ATTRIBUTE = 'concurrent_max_total'; +our $CONCURRENT_MAX_PER_ACCOUNT_ATTRIBUTE = 'concurrent_max_per_account'; +our $CLIR_ATTRIBUTE = 'clir'; sub new { diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm index 404e171..a4f2c95 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Check.pm @@ -40,6 +40,8 @@ use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); use NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw(); +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Resellers qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Domains qw(); @@ -97,8 +99,8 @@ sub check_billing_db_tables { ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers'); $result &= $check_result; push(@$messages,$message); - #($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels'); - #$result &= $check_result; push(@$messages,$message); + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels'); + $result &= $check_result; push(@$messages,$message); ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers'); $result &= $check_result; push(@$messages,$message); @@ -129,6 +131,12 @@ sub check_import_db_tables { ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber'); $result &= $check_result; push(@$messages,$message); + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli'); + $result &= $check_result; push(@$messages,$message); + + ($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir'); + $result &= $check_result; push(@$messages,$message); + return $result; } diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/AllowedCli.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/AllowedCli.pm index aa9c7cb..58cfbc4 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/AllowedCli.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/AllowedCli.pm @@ -67,6 +67,7 @@ our @fieldnames = ( #calculated fields at the end! 'rownum', + 'filename', ); my $expected_fieldnames = [ diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Clir.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Clir.pm new file mode 100644 index 0000000..ff17df9 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Clir.pm @@ -0,0 +1,287 @@ +package NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir; +use strict; + +## no critic + +use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw( + get_import_db + destroy_all_dbs +); +#import_db_tableidentifier + +use NGCP::BulkProcessor::SqlProcessor qw( + registertableinfo + create_targettable + checktableinfo + copy_row + + insert_stmt + + +); +#process_table +use NGCP::BulkProcessor::SqlRecord qw(); + +#use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + create_table + gettablename + check_table + getinsertstatement + getupsertstatement + + @fieldnames + + findby_sipusername + countby_clir + + update_delta + findby_delta + countby_delta + + $deleted_delta + $updated_delta + $added_delta + +); +#@fieldnames +#@contact_fieldnames +#process_records +#findby_ccacsn +#countby_ccacsn +#list_domain_billingprofilename_resellernames + +my $tablename = 'clir'; +my $get_db = \&get_import_db; +#my $get_tablename = \&import_db_tableidentifier; + +our @fieldnames = ( + "sip_username", + "clir", + + #calculated fields at the end! + 'rownum', + 'filename', +); + +my $expected_fieldnames = [ + @fieldnames, + 'delta', +]; + +# table creation: +my $primarykey_fieldnames = [ 'sip_username' ]; +my $indexes = { + $tablename . '_rownum' => [ 'rownum(11)' ], + $tablename . '_delta' => [ 'delta(7)' ], +}; +#my $fixtable_statements = []; + +our $deleted_delta = 'DELETED'; +our $updated_delta = 'UPDATED'; +our $added_delta = 'ADDED'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub create_table { + + my ($truncate) = @_; + + my $db = &$get_db(); + + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); + +} + +sub findby_delta { + + my ($delta,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless defined $delta; + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('delta') . ' = ?' + ,$delta); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_sipusername { + + my ($sip_username,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + #return [] unless (defined $cc or defined $ac or defined $sn); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('sip_username') . ' = ?' + ,$sip_username); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub countby_clir { + + my ($clir) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @params = (); + if (defined $clir) { + $stmt .= ' WHERE ' . + $db->columnidentifier('clir') . ' = ?'; + push(@params,$clir); + } + + return $db->db_get_value($stmt,@params); + +} + +sub update_delta { + + my ($sip_username,$delta) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'UPDATE ' . $table . ' SET delta = ?'; + my @params = (); + push(@params,$delta); + if (defined $sip_username) { + $stmt .= ' WHERE ' . + $db->columnidentifier('sip_username') . ' = ?'; + push(@params,$sip_username); + } + + return $db->db_do($stmt,@params); + +} + + +sub countby_delta { + + my ($deltas) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1'; + my @params = (); + if (defined $deltas and 'HASH' eq ref $deltas) { + foreach my $in (keys %$deltas) { + my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $deltas and length($deltas) > 0) { + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?'; + push(@params,$deltas); + } + + return $db->db_get_value($stmt,@params); + +} + + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + + +sub getinsertstatement { + + my ($insert_ignore) = @_; + check_table(); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); + +} + +sub getupsertstatement { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' . + join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')'; + my @values = (); + foreach my $fieldname (@$expected_fieldnames) { + if ('delta' eq $fieldname) { + my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' . + $db->columnidentifier('sip_username') . ' = ?'; + push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')'); + } else { + push(@values,'?'); + } + } + $upsert_stmt .= ' VALUES (' . join(',',@values) . ')'; + return $upsert_stmt; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Subscriber.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Subscriber.pm index e9f43b9..d430e39 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Subscriber.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Dao/import/Subscriber.pm @@ -39,6 +39,7 @@ our @EXPORT_OK = qw( findby_domain_webusername list_domain_billingprofilename_resellernames findby_sipusername + list_barring_resellernames update_delta findby_delta @@ -92,6 +93,7 @@ our @fieldnames = ( 'rownum', 'range', 'contact_hash', + 'filename', ); my $expected_fieldnames = [ @fieldnames, @@ -331,6 +333,20 @@ sub list_domain_billingprofilename_resellernames { } +sub list_barring_resellernames { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my @cols = map { $db->columnidentifier($_); } qw/barrings reseller_name/; + my $stmt = 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols); + my @params = (); + + return $db->db_get_all_arrayref($stmt,@params); + +} + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm index 558b95f..41aff3a 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Import.pm @@ -15,6 +15,11 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( $allowedcli_import_numofthreads $ignore_allowedcli_unique $allowedcli_import_single_row_txn + $allowedcli_import_unfold_ranges + + $clir_import_numofthreads + $ignore_clir_unique + $clir_import_single_row_txn $skip_errors ); @@ -37,9 +42,10 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw( use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw(); +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw(); use NGCP::BulkProcessor::Array qw(removeduplicates); -use NGCP::BulkProcessor::Utils qw(threadid zerofill); +use NGCP::BulkProcessor::Utils qw(threadid zerofill trim); use NGCP::BulkProcessor::Table qw(get_rowhash); require Exporter; @@ -47,6 +53,7 @@ our @ISA = qw(Exporter); our @EXPORT_OK = qw( import_subscriber import_allowedcli + import_clir ); sub import_subscriber { @@ -75,61 +82,21 @@ sub import_subscriber { foreach my $row (@$rows) { $rownum++; next if (scalar @$row) == 0; + $row = [ map { local $_ = $_; trim($_); } @$row ]; my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($row); + $record->{cc} //= ''; + $record->{ac} //= ''; + $record->{sn} //= ''; + $record->{cc} = trim($record->{cc}); + $record->{ac} = trim($record->{ac}); + $record->{sn} = trim($record->{sn}); $record->{rownum} = $rownum; - my @subscriber_row; - my %r; - if ($subscriber_import_unfold_ranges and $record->{sn} =~ /\.+$/) { - #if ($record->{sn} == '2861..') { - #print "x"; - #} - my $pow = scalar (() = $record->{sn} =~ /\./g); - _warn($context,"number range $record->{sn} results in " . 10**$pow . ' numbers') if $pow > 2; - $record->{sn} =~ s/\.+$//g; - $record->{range} = 0; - %r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]); - my $base_sn = $record->{sn}; - %r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames}; - if ($context->{upsert}) { - push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn}); - } else { - push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta); - } - push(@subscriber_rows, [@subscriber_row]); - for (my $i = 0; $i < 10**$pow; $i++) { - $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($record); - #@subscriber_row = @$row; - $record->{sn} = $base_sn . zerofill($i,$pow); - $record->{range} = 1; - %r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames}; - if ($context->{upsert}) { - push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn}); - } else { - push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta); - } - push(@subscriber_rows,[@subscriber_row]); - } - #if ($base_sn == '2861') { - #print "x"; - #last; - #} - - } else { - $record->{range} = 0; - %r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]); - %r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames}; - if ($context->{upsert}) { - push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn}); - } else { - push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta); - } - push(@subscriber_rows,\@subscriber_row); - } - } - - if ((scalar @subscriber_rows) > 0) { - if ($subscriber_import_single_row_txn) { - foreach my $subscriber_row (@subscriber_rows) { + $record->{filename} = $file; + my %r = %$record; + $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]); + next unless _unfold_number_ranges($context,$record,\@subscriber_rows); + if ($subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) { + while (defined (my $subscriber_row = shift @subscriber_rows)) { if ($skip_errors) { eval { _insert_subscriber_rows($context,[$subscriber_row]); }; _warn($context,$@) if $@; @@ -137,23 +104,46 @@ sub import_subscriber { _insert_subscriber_rows($context,[$subscriber_row]); } } + } + } + + if (not $subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) { + if ($skip_errors) { + eval { _insert_subscriber_rows($context,\@subscriber_rows); }; + _warn($context,$@) if $@; } else { - if ($skip_errors) { - eval { _insert_subscriber_rows($context,\@subscriber_rows); }; - _warn($context,$@) if $@; - } else { - _insert_subscriber_rows($context,\@subscriber_rows); - } + _insert_subscriber_rows($context,\@subscriber_rows); } } - #use Data::Dumper; - #print Dumper(\@subscriber_rows); + #use Data::Dumper; + #print Dumper(\@subscriber_rows); return 1; }, init_process_context_code => sub { my ($context)= @_; $context->{db} = &get_import_db(); # keep ref count low.. $context->{upsert} = $upsert; + $context->{unfold_ranges} = $subscriber_import_unfold_ranges; + $context->{fieldnames} = \@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames; + $context->{added_delta} = $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta; + $context->{create_new_record_code} = sub { + return NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new(shift); + }; + $context->{check_number_code} = sub { + my ($context,$record) = @_; + my $result = 1; + my $number = $record->{cc} . $record->{ac} . $record->{sn}; + # prevent db's unique constraint violation: + if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) { + if ($skip_errors) { + _warn($context,"$record->{sip_username}: duplicate number $number"); + } else { + _error($context,"$record->{sip_username}: duplicate number $number"); + } + $result = 0; + } + return $result; + }; $context->{error_count} = 0; $context->{warning_count} = 0; }, @@ -177,22 +167,7 @@ sub import_subscriber { sub _import_subscriber_checks { my ($file) = @_; my $result = 1; - #my $optioncount = 0; - #eval { - # $optioncount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOption::countby_subscribernumber_option(); - #}; - #if ($@ or $optioncount == 0) { - # fileprocessingerror($file,'please import subscriber features first',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} - #my $userpasswordcount = 0; - #eval { - # $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::UsernamePassword::countby_fqdn(); - #}; - #if ($@ or $userpasswordcount == 0) { - # fileprocessingerror($file,'please import user passwords first',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} + return $result; } @@ -230,21 +205,6 @@ sub _insert_subscriber_rows { } } -#sub _insert_subscriber_row { -# my ($context,$subscriber_row) = @_; -# $context->{db}->db_do( -# ($context->{upsert} ? -# NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::getupsertstatement() -# : NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::getinsertstatement($ignore_subscriber_unique)), -# @$subscriber_row -# ); -#} - - - - - - sub import_allowedcli { @@ -272,40 +232,30 @@ sub import_allowedcli { foreach my $row (@$rows) { $rownum++; next if (scalar @$row) == 0; + $row = [ map { local $_ = $_; trim($_); } @$row ]; my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli->new($row); - - if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) { - my $number = ($record->{cc} // '') . ($record->{ac} // '') . ($record->{sn} // ''); - if ($skip_errors) { - _warn($context,"duplicate number: $number"); - } else { - _error($context,"duplicate number: $number"); - } - next; - } + $record->{cc} //= ''; + $record->{ac} //= ''; + $record->{sn} //= ''; + $record->{cc} = trim($record->{cc}); + $record->{ac} = trim($record->{ac}); + $record->{sn} = trim($record->{sn}); + $record->{rownum} = $rownum; + $record->{filename} = $file; if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) { + my $number = $record->{cc} . $record->{ac} . $record->{sn}; if ($skip_errors) { - _warn($context,"sip username $record->{sip_username} not found"); + _warn($context,"$number: sip username $record->{sip_username} not found"); } else { - _error($context,"sip username $record->{sip_username} not found"); + _error($context,"$number: sip username $record->{sip_username} not found"); } next; } - $record->{rownum} = $rownum; - my %r = %$record; my @allowedcli_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::fieldnames}; - if ($context->{upsert}) { - push(@allowedcli_row,$record->{cc},$record->{ac},$record->{sn}); - } else { - push(@allowedcli_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::added_delta); - } - push(@allowedcli_rows,\@allowedcli_row); - } - - if ((scalar @allowedcli_rows) > 0) { - if ($allowedcli_import_single_row_txn) { - foreach my $allowedcli_row (@allowedcli_rows) { + next unless _unfold_number_ranges($context,$record,\@allowedcli_rows); + if ($allowedcli_import_single_row_txn and (scalar @allowedcli_rows) > 0) { + while (defined (my $allowedcli_row = shift @allowedcli_rows)) { if ($skip_errors) { eval { _insert_allowedcli_rows($context,[$allowedcli_row]); }; _warn($context,$@) if $@; @@ -313,23 +263,54 @@ sub import_allowedcli { _insert_allowedcli_rows($context,[$allowedcli_row]); } } + } + } + + if (not $allowedcli_import_single_row_txn and (scalar @allowedcli_rows) > 0) { + if ($skip_errors) { + eval { _insert_allowedcli_rows($context,\@allowedcli_rows); }; + _warn($context,$@) if $@; } else { - if ($skip_errors) { - eval { _insert_allowedcli_rows($context,\@allowedcli_rows); }; - _warn($context,$@) if $@; - } else { - _insert_allowedcli_rows($context,\@allowedcli_rows); - } + _insert_allowedcli_rows($context,\@allowedcli_rows); } } - #use Data::Dumper; - #print Dumper(\@subscriber_rows); + #use Data::Dumper; + #print Dumper(\@subscriber_rows); return 1; }, init_process_context_code => sub { my ($context)= @_; $context->{db} = &get_import_db(); # keep ref count low.. $context->{upsert} = $upsert; + $context->{unfold_ranges} = $allowedcli_import_unfold_ranges; + $context->{fieldnames} = \@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::fieldnames; + $context->{added_delta} = $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::added_delta; + $context->{create_new_record_code} = sub { + return NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli->new(shift); + }; + $context->{check_number_code} = sub { + my ($context,$record) = @_; + my $result = 1; + my $number = $record->{cc} . $record->{ac} . $record->{sn}; + if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) { + if ($skip_errors) { + _warn($context,"$record->{sip_username}: duplicate number $number"); + } else { + _error($context,"$record->{sip_username}: duplicate number $number"); + } + $result = 0; + } + # prevent db's unique constraint violation: + if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) { + if ($skip_errors) { + _warn($context,"$record->{sip_username}: duplicate number $number"); + } else { + _error($context,"$record->{sip_username}: duplicate number $number"); + } + $result = 0; + } + return $result; + }; $context->{error_count} = 0; $context->{warning_count} = 0; }, @@ -361,14 +342,7 @@ sub _import_allowedcli_checks { fileprocessingerror($file,'please import subscribers first',getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. } - #my $userpasswordcount = 0; - #eval { - # $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::UsernamePassword::countby_fqdn(); - #}; - #if ($@ or $userpasswordcount == 0) { - # fileprocessingerror($file,'please import user passwords first',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} + return $result; } @@ -406,9 +380,228 @@ sub _insert_allowedcli_rows { } } +sub _unfold_number_ranges { + + my ($context,$record,$rows) = @_; + + sub create_new_record_code{} + + my $result = 0; + my @fieldnames = @{$context->{fieldnames}}; + my $cc_ac_ok = ($record->{cc} =~ /^\d*$/ and $record->{ac} =~ /^\d*$/); + my @row; + my %r; + if ($context->{unfold_ranges} and $cc_ac_ok and $record->{sn} =~ /\.+$/) { + #if ($record->{sn} == '2861..') { + #print "x"; + #} + my $pow = scalar (() = $record->{sn} =~ /\./g); + _info($context,"expanding number range '$record->{sn}' to " . 10**$pow . ' numbers'); + _warn($context,"expanding number range '$record->{sn}' results in " . 10**$pow . ' numbers') if $pow > 2; + $record->{sn} =~ s/\.+$//g; + $record->{range} = 0; + #%r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]); + my $base_sn = $record->{sn}; + %r = %$record; @row = @r{@fieldnames}; + if ($context->{upsert}) { + push(@row,$record->{cc},$record->{ac},$record->{sn}); + } else { + push(@row,$context->{added_delta}); + } + push(@$rows, [@row]) if &{$context->{check_number_code}}($context,$record); + for (my $i = 0; $i < 10**$pow; $i++) { + $record = &{$context->{create_new_record_code}}($record); + #@subscriber_row = @$row; + $record->{sn} = $base_sn . zerofill($i,$pow); + $record->{range} = 1; + %r = %$record; @row = @r{@fieldnames}; + if ($context->{upsert}) { + push(@row,$record->{cc},$record->{ac},$record->{sn}); + } else { + push(@row,$context->{added_delta}); + } + push(@$rows,[@row]) if &{$context->{check_number_code}}($context,$record); + } + #if ($base_sn == '2861') { + #print "x"; + #last; + #} + $result = 1; + } elsif ($cc_ac_ok and $record->{sn} =~ /^\d*$/) { + $record->{range} = 0; + #%r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]); + %r = %$record; @row = @r{@fieldnames}; + if ($context->{upsert}) { + push(@row,$record->{cc},$record->{ac},$record->{sn}); + } else { + push(@row,$context->{added_delta}); + } + push(@$rows,\@row) if &{$context->{check_number_code}}($context,$record); + $result = 1; + } else { + my $number = $record->{cc} . $record->{ac} . $record->{sn}; + if ($skip_errors) { + _warn($context,"invalid number: $number"); + } else { + _error($context,"invalid number: $number"); + } + $result = 0; + } + return $result; +} + +sub import_clir { + + my (@files) = @_; + + my $result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::create_table(0); + + foreach my $file (@files) { + $result &= _import_clir_checks($file); + } + + my $importer = NGCP::BulkProcessor::Projects::Migration::Teletek::FileProcessors::CSVFile->new($clir_import_numofthreads); + + my $upsert = _import_clir_reset_delta(); + + destroy_all_dbs(); #close all db connections before forking.. + my $warning_count :shared = 0; + foreach my $file (@files) { + $result &= $importer->process( + file => $file, + process_code => sub { + my ($context,$rows,$row_offset) = @_; + my $rownum = $row_offset; + my @clir_rows = (); + foreach my $row (@$rows) { + $rownum++; + next if (scalar @$row) == 0; + $row = [ map { local $_ = $_; trim($_); } @$row ]; + my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir->new($row); + $record->{rownum} = $rownum; + $record->{filename} = $file; + + if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) { + if ($skip_errors) { + _warn($context,"sip username $record->{sip_username} not found"); + } else { + _error($context,"sip username $record->{sip_username} not found"); + } + next; + } + # prevent db's unique constraint violation: + if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::findby_sipusername($record->{sip_username})) { + if ($skip_errors) { + _warn($context,"duplicate sip username $record->{sip_username}"); + } else { + _error($context,"duplicate sip username $record->{sip_username}"); + } + next; + } + + my %r = %$record; my @clir_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::fieldnames}; + if ($context->{upsert}) { + push(@clir_row,$record->{sip_username}); + } else { + push(@clir_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::added_delta); + } + push(@clir_rows,\@clir_row); + if ($clir_import_single_row_txn and (scalar @clir_rows) > 0) { + while (defined (my $clir_row = shift @clir_rows)) { + if ($skip_errors) { + eval { _insert_clir_rows($context,[$clir_row]); }; + _warn($context,$@) if $@; + } else { + _insert_clir_rows($context,[$clir_row]); + } + } + } + } + + if (not $clir_import_single_row_txn and (scalar @clir_rows) > 0) { + if ($skip_errors) { + eval { _insert_clir_rows($context,\@clir_rows); }; + _warn($context,$@) if $@; + } else { + _insert_clir_rows($context,\@clir_rows); + } + } + #use Data::Dumper; + #print Dumper(\@subscriber_rows); + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_import_db(); # keep ref count low.. + $context->{upsert} = $upsert; + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + multithreading => $import_multithreading + ); + } + + return ($result,$warning_count); + +} + +sub _import_clir_checks { + my ($file) = @_; + my $result = 1; + my $subscribercount = 0; + eval { + $subscribercount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::countby_ccacsn(); + }; + if ($@ or $subscribercount == 0) { + fileprocessingerror($file,'please import subscribers first',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } + return $result; +} +sub _import_clir_reset_delta { + my $upsert = 0; + if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_clir() > 0) { + processing_info(threadid(),'resetting delta of ' . + NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::update_delta(undef, + $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::deleted_delta) . + ' clir records',getlogger(__PACKAGE__)); + $upsert |= 1; + } + return $upsert; +} +sub _insert_clir_rows { + my ($context,$clir_rows) = @_; + $context->{db}->db_do_begin( + ($context->{upsert} ? + NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::getupsertstatement() + : NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::getinsertstatement($ignore_clir_unique)), + #NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::gettablename(), + #lock + ); + eval { + $context->{db}->db_do_rowblock($clir_rows); + $context->{db}->db_finish(); + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_finish(1); + }; + die($err); + } +} sub _error { diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Preferences.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Preferences.pm index 195a5ce..27ec874 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Preferences.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Preferences.pm @@ -3,6 +3,8 @@ use strict; ## no critic +no strict 'refs'; + use threads::shared qw(); #use List::Util qw(); @@ -13,24 +15,6 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( ); -#$batch - #$domain_name - #$reseller_id - #$subsciber_username_prefix - #$set_barring_profiles_multithreading - #$set_barring_profiles_numofthreads - #$barring_profiles - # - #$set_peer_auth_multithreading - #$set_peer_auth_numofthreads - #$peer_auth_realm - # - #$set_allowed_ips_multithreading - #$set_allowed_ips_numofthreads - #$allowed_ips - # - #$set_preference_bulk_multithreading - #$set_preference_bulk_numofthreads use NGCP::BulkProcessor::Logging qw ( getlogger @@ -69,544 +53,157 @@ require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( - clear_preferences - delete_preference - set_preference - get_preference + clear_subscriber_preferences + delete_subscriber_preference + set_subscriber_preference + get_subscriber_preference + + set_allowed_ips_preferences + cleanup_aig_sequence_ids ); -# set_barring_profiles -# set_barring_profiles_batch - -# set_peer_auth -# set_peer_auth_batch - -# set_allowed_ips -# set_allowed_ips_batch - -# set_preference_bulk -# set_preference_bulk_batch - - - -# $INIT_PEER_AUTH_MODE -# $SWITCHOVER_PEER_AUTH_MODE -# $CLEAR_PEER_AUTH_MODE - -#our $INIT_PEER_AUTH_MODE = 'init'; -#our $SWITCHOVER_PEER_AUTH_MODE = 'switchover'; -#our $CLEAR_PEER_AUTH_MODE = 'clear'; - -#sub set_barring_profiles { -# -# my $static_context = {}; -# my $result = _set_barring_profiles_checks($static_context); -# -# destroy_all_dbs(); -# my $warning_count :shared = 0; -# return ($result && NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::process_records( -# static_context => $static_context, -# process_code => sub { -# my ($context,$records,$row_offset) = @_; -# my $rownum = $row_offset; -# foreach my $imported_subscriber (@$records) { -# $rownum++; -# next unless _reset_set_barring_profile_context($context,$imported_subscriber,$rownum); -# _set_barring_profile($context); -# } -# -# #return 0; -# return 1; -# }, -# init_process_context_code => sub { -# my ($context)= @_; -# $context->{db} = &get_xa_db(); -# $context->{error_count} = 0; -# $context->{warning_count} = 0; -# # below is not mandatory.. -# _check_insert_tables(); -# }, -# uninit_process_context_code => sub { -# my ($context)= @_; -# undef $context->{db}; -# destroy_all_dbs(); -# { -# lock $warning_count; -# $warning_count += $context->{warning_count}; -# } -# }, -# load_recursive => 0, -# multithreading => $set_barring_profiles_multithreading, -# numofthreads => $set_barring_profiles_numofthreads, -# ),$warning_count); -#} -# -# -# -#sub _check_insert_tables { -# -# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::check_table(); -# -#} -# -#sub _set_subscriber_preference { -# my ($context,$set_code) = @_; -# -# eval { -# $context->{db}->db_begin(); -# #rowprocessingwarn($context->{tid},'AutoCommit is on' ,getlogger(__PACKAGE__)) if $context->{db}->{drh}->{AutoCommit}; -# -# my $existing_billing_voip_subscribers = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::findby_domainid_username_states($context->{db}, -# $context->{billing_domain}->{id},$context->{username},{ 'NOT IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::TERMINATED_STATE}); -# if ((scalar @$existing_billing_voip_subscribers) == 0) { -# -# if ($context->{subscriberdelta} eq -# $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::deleted_delta) { -# _info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' is deleted, and no active subscriber found',1); -# } else { -# _warn($context,"($context->{rownum}) no active subscriber found for susbcriber " . $context->{cli}); -# } -# } elsif ((scalar @$existing_billing_voip_subscribers) == 1) { -# $context->{billing_voip_subscriber} = $existing_billing_voip_subscribers->[0]; -# $context->{provisioning_voip_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid( -# $context->{db},$context->{billing_voip_subscriber}->{uuid}); -# if (defined $context->{provisioning_voip_subscriber}) { -# if ($context->{subscriberdelta} eq -# $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::deleted_delta) { -# -# _warn($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' is deleted, but active subscriber found'); -# -# } else { -# if (defined $set_code and 'CODE' eq ref $set_code) { -# &$set_code($context); -# } -# } -# } else { -# if ($skip_errors) { -# _warn($context,"($context->{rownum}) " . 'no provisioning subscriber found: ' . $context->{cli}); -# } else { -# _error($context,"($context->{rownum}) " . 'no provisioning subscriber found: ' . $context->{cli}); -# } -# } -# } else { -# rowprocessingwarn($context->{tid},"($context->{rownum}) " . 'multiple (' . (scalar @$existing_billing_voip_subscribers) . ') existing billing subscribers with username ' . $context->{username} . ' found, skipping' ,getlogger(__PACKAGE__)); -# } -# -# if ($dry) { -# $context->{db}->db_rollback(0); -# } else { -# $context->{db}->db_commit(); -# } -# -# }; -# my $err = $@; -# if ($err) { -# eval { -# $context->{db}->db_rollback(1); -# }; -# if ($skip_errors) { -# _warn($context,"($context->{rownum}) " . 'database error with subscriber ' . $context->{cli} . ': ' . $err); -# } else { -# _error($context,"($context->{rownum}) " . 'database error with subscriber ' . $context->{cli} . ': ' . $err); -# } -# } -# -#} -# -#sub _set_barring_profile { -# my ($context) = @_; -# _set_subscriber_preference($context,\&_set_adm_ncos); -#} -# -#sub _checks { -# -# my ($context) = @_; -# -# my $result = 1; -# #my $optioncount = 0; -# #eval { -# # $optioncount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::FeatureOption::countby_subscribernumber_option(); -# #}; -# #if ($@ or $optioncount == 0) { -# # rowprocessingerror(threadid(),'please import subscriber features first',getlogger(__PACKAGE__)); -# # $result = 0; #even in skip-error mode.. -# #} -# my $userpasswordcount = 0; -# eval { -# $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::countby_fqdn(); -# }; -# if ($@ or $userpasswordcount == 0) { -# rowprocessingerror(threadid(),'please import user passwords first',getlogger(__PACKAGE__)); -# $result = 0; #even in skip-error mode.. -# } -# my $subscribercount = 0; -# my $subscriber_barring_profiles = []; -# eval { -# $subscribercount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::countby_subscribernumber(); -# $subscriber_barring_profiles = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::list_barringprofiles(); -# }; -# if ($@ or $subscribercount == 0) { -# rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__)); -# $result = 0; #even in skip-error mode.. -# } -# -# if ($batch) { -# my $batch_size = 0; -# eval { -# $batch_size = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::countby_delta({ 'NOT IN' => -# $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::deleted_delta}); -# }; -# if ($@ or $batch_size == 0) { -# rowprocessingerror(threadid(),'please import a batch first',getlogger(__PACKAGE__)); -# $result = 0; #even in skip-error mode.. -# } -# } -# -# eval { -# $context->{billing_domain} = NGCP::BulkProcessor::Dao::Trunk::billing::domains::findby_domain($domain_name); -# if (defined $context->{billing_domain} -# and NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers::countby_domainid_resellerid($context->{billing_domain}->{id},$reseller_id) == 0) { -# undef $context->{billing_domain}; -# } -# }; -# if ($@ or not defined $context->{billing_domain}) { -# rowprocessingerror(threadid(),'cannot find billing domain',getlogger(__PACKAGE__)); -# $result = 0; #even in skip-error mode.. -# } -# -# return $result; -# -#} -# -#sub _set_barring_profiles_checks { -# my ($context) = @_; -# -# my $result = _checks($context); -# -# my $subscriber_barring_profiles = []; -# eval { -# $subscriber_barring_profiles = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::list_barringprofiles(); -# }; -# if ($@ or (scalar @$subscriber_barring_profiles) == 0) { -# rowprocessingerror(threadid(),'subscribers have no barring profiles',getlogger(__PACKAGE__)); -# $result = 0; #even in skip-error mode.. -# } -# -# $context->{ncos_level_map} = {}; -# foreach my $barring_profile (@$subscriber_barring_profiles) { -# if (not exists $barring_profiles->{$barring_profile}) { -# rowprocessingerror(threadid(),"mapping for barring profile '" . $barring_profile . "' missing",getlogger(__PACKAGE__)); -# #$result = 0; #even in skip-error mode.. -# } else { -# my $level = $barring_profiles->{$barring_profile}; -# if (not defined $level or length($level) == 0) { -# $context->{ncos_level_map}->{$barring_profile} = undef; -# } else { -# eval { -# $context->{ncos_level_map}->{$barring_profile} = NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels::findby_resellerid_level( -# $reseller_id,$level); -# }; -# if ($@ or not defined $context->{ncos_level_map}->{$barring_profile}) { -# rowprocessingerror(threadid(),"cannot find ncos level '$level'",getlogger(__PACKAGE__)); -# $result = 0; #even in skip-error mode.. -# } -# } -# } -# } -# -# eval { -# $context->{adm_ncos_id_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( -# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ADM_NCOS_ID_ATTRIBUTE); -# }; -# if ($@ or not defined $context->{adm_ncos_id_attribute}) { -# rowprocessingerror(threadid(),'cannot find adm_ncos_id attribute',getlogger(__PACKAGE__)); -# $result = 0; #even in skip-error mode.. -# } -# -# return $result; -#} -# -#sub _set_adm_ncos { -# -# my ($context) = @_; -# -# $context->{adm_ncos_id_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id}, -# $context->{adm_ncos_id_attribute},defined $context->{ncos_level} ? $context->{ncos_level}->{id} : undef); -# -# _info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ': ncos level ' . -# (defined $context->{ncos_level} ? "'" . $context->{ncos_level}->{level} . "' set" : 'cleared') . -# " for barring profile '" . $context->{barring_profile} . "'",1); -# -#} -# -#sub _reset_context { -# -# my ($context,$imported_subscriber,$rownum) = @_; -# -# my $result = 1; -# -# $context->{rownum} = $rownum; -# -# $context->{cli} = $imported_subscriber->subscribernumber(); -# $context->{e164} = {}; -# $context->{e164}->{cc} = substr($context->{cli},0,3); -# $context->{e164}->{ac} = ''; -# $context->{e164}->{sn} = substr($context->{cli},3); -# -# $context->{subscriberdelta} = $imported_subscriber->{delta}; -# -# my $userpassword = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::findby_fqdn($context->{cli}); -# if (defined $userpassword) { -# $context->{username} = (defined $subsciber_username_prefix ? $subsciber_username_prefix : '') . $userpassword->{username}; -# $context->{password} = $userpassword->{password}; -# $context->{userpassworddelta} = $userpassword->{delta}; -# } else { -# # once full username+passwords is available: -# delete $context->{username}; -# delete $context->{password}; -# delete $context->{userpassworddelta}; -# if ($context->{subscriberdelta} eq -# $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::deleted_delta) { -# -# } else { -# $result &= 0; -# -# # for now, as username+passwords are incomplete: -# #$context->{username} = $context->{e164}->{sn}; -# #$context->{password} = $context->{username}; -# #$context->{userpassworddelta} = $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::updated_delta; -# -# if ($skip_errors) { -# # for now, as username+passwords are incomplete: -# _warn($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli}); -# } else { -# _error($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli}); -# } -# } -# } -# -# delete $context->{billing_voip_subscriber}; -# delete $context->{provisioning_voip_subscriber}; -# -# return $result; -# -#} -# -#sub _reset_set_barring_profile_context { -# -# my ($context,$imported_subscriber,$rownum) = @_; -# -# my $result = _reset_context($context,$imported_subscriber,$rownum); -# -# $context->{barring_profile} = $imported_subscriber->{barring_profile}; -# $context->{ncos_level} = $context->{ncos_level_map}->{$context->{barring_profile}}; -# -# delete $context->{adm_ncos_id_preference_id}; -# -# return $result; -# -#} -# -# -# -# -# -#sub set_allowed_ips { -# -# my $static_context = {}; -# my $result = _set_allowed_ips_checks($static_context); -# -# destroy_all_dbs(); -# my $warning_count :shared = 0; -# return ($result && NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::process_records( -# static_context => $static_context, -# process_code => sub { -# my ($context,$records,$row_offset) = @_; -# my $rownum = $row_offset; -# foreach my $imported_subscriber (@$records) { -# $rownum++; -# next unless _reset_set_allowed_ips_context($context,$imported_subscriber,$rownum); -# _set_allowed_ips($context); -# } -# cleanup_aig_sequence_ids($context); -# #return 0; -# return 1; -# }, -# init_process_context_code => sub { -# my ($context)= @_; -# $context->{db} = &get_xa_db(); -# $context->{error_count} = 0; -# $context->{warning_count} = 0; -# # below is not mandatory.. -# _check_insert_tables(); -# }, -# uninit_process_context_code => sub { -# my ($context)= @_; -# undef $context->{db}; -# destroy_all_dbs(); -# { -# lock $warning_count; -# $warning_count += $context->{warning_count}; -# } -# }, -# load_recursive => 0, -# multithreading => $set_allowed_ips_multithreading, -# numofthreads => $set_allowed_ips_numofthreads, -# ),$warning_count); -#} -# -#sub cleanup_aig_sequence_ids { -# my ($context) = @_; -# eval { -# $context->{db}->db_begin(); -# if (NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::cleanup_ids($context->{db})) { -# _info($context,'voip_aig_sequence cleaned up'); -# } -# if ($dry) { -# $context->{db}->db_rollback(0); -# } else { -# $context->{db}->db_commit(); -# } -# }; -# my $err = $@; -# if ($err) { -# eval { -# $context->{db}->db_rollback(1); -# }; -# if ($skip_errors) { -# _warn($context,"database problem with voip_aig_sequence clean up: " . $err); -# } else { -# _error($context,"database problem with voip_aig_sequence clean up: " . $err); -# } -# } -#} -# -#sub _set_allowed_ips { -# my ($context) = @_; -# _set_subscriber_preference($context,\&_set_allowed_ips_preferences); -#} -# -#sub _set_allowed_ips_checks { -# my ($context) = @_; -# -# my $result = _checks($context); -# -# eval { -# $context->{allowed_ips_grp_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( -# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE); -# }; -# if ($@ or not defined $context->{allowed_ips_grp_attribute}) { -# rowprocessingerror(threadid(),'cannot find allowed_ips_grp attribute',getlogger(__PACKAGE__)); -# $result = 0; #even in skip-error mode.. -# } -# -# return $result; -#} -# -#sub _set_allowed_ips_preferences { -# -# my ($context) = @_; -# -# my $subscriber_id = $context->{provisioning_voip_subscriber}->{id}; -# my $attribute = $context->{allowed_ips_grp_attribute}; -# -# my $allowed_ips_grp_attribute_preference = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid( -# $context->{db},$subscriber_id,$attribute->{id})->[0]; -# -# if (defined $allowed_ips_grp_attribute_preference) { -# $context->{allowed_ip_group_id} = $allowed_ips_grp_attribute_preference->{value}; -# $context->{allowed_ips_grp_attribute_preference_id} = $allowed_ips_grp_attribute_preference->{id}; -# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::delete_groupid($context->{db},$context->{allowed_ip_group_id}); -# _info($context,"($context->{rownum}) " . 'allowed ips group for subscriber ' . $context->{cli} . ' exists, ipnets deleted',1); -# } else { -# $context->{allowed_ip_group_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::increment($context->{db}); -# _info($context,"($context->{rownum}) " . 'new allowed ips group id for subscriber ' . $context->{cli} . ' aquired',1); -# } -# -# $context->{allowed_ips_grp_ipnet_ids} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::insert_rows($context->{db},$context->{allowed_ip_group_id},$context->{allowed_ips}); -# _info($context,"($context->{rownum}) " . 'new allowed ips group id for subscriber ' . $context->{cli} . ' aquired',1); -# -# if (not defined $allowed_ips_grp_attribute_preference) { -# $context->{allowed_ips_grp_attribute_preference_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db}, -# attribute_id => $attribute->{id}, -# subscriber_id => $subscriber_id, -# value => $context->{allowed_ip_group_id}, -# ); -# _info($context,"($context->{rownum}) " . 'new allowed ips group preference value for subscriber ' . $context->{cli} . ' added',1); -# } -# -#} -# -#sub _reset_set_allowed_ips_context { -# -# my ($context,$imported_subscriber,$rownum) = @_; -# -# my $result = _reset_context($context,$imported_subscriber,$rownum); -# -# $context->{allowed_ips} = $allowed_ips; -# -# delete $context->{allowed_ip_group_id}; -# delete $context->{allowed_ips_grp_attribute_preference_id}; -# delete $context->{allowed_ips_grp_ipnet_ids}; -# -# return $result; -# -#} - - - - - -sub clear_preferences { - my ($context,$subscriber_id,$attribute,$except_value) = @_; - return NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::delete_preferences($context->{db}, - $subscriber_id, $attribute->{id}, defined $except_value ? { 'NOT IN' => $except_value } : undef); +sub cleanup_aig_sequence_ids { + my ($context) = @_; + eval { + $context->{db}->db_begin(); + if (NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::cleanup_ids($context->{db})) { + _info($context,'voip_aig_sequence cleaned up'); + } + if ($dry) { + $context->{db}->db_rollback(0); + } else { + $context->{db}->db_commit(); + } + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_rollback(1); + }; + if ($skip_errors) { + _warn($context,"database problem with voip_aig_sequence clean up: " . $err); + } else { + _error($context,"database problem with voip_aig_sequence clean up: " . $err); + } + } +} + + +sub set_allowed_ips_preferences { + + my ($context,$subscriber_id,$sip_username,$attribute,$allowed_ips) = @_; + + #my $subscriber_id = $context->{prov_subscriber}->{id} ; + #my $attribute = $context->{attributes}->{allowed_ips_grp}; + + my $allowed_ips_grp_attribute_preference = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid( + $context->{db},$subscriber_id,$attribute->{id})->[0]; + + my ($allowed_ip_group_id,$allowed_ip_group_preferrence_id); + + if (defined $allowed_ips_grp_attribute_preference) { + $allowed_ip_group_id = $allowed_ips_grp_attribute_preference->{value}; + $allowed_ip_group_preferrence_id = $allowed_ips_grp_attribute_preference->{id}; + NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::delete_groupid($context->{db},$allowed_ip_group_id); + _info($context,"allowed ips group for subscriber $sip_username exists, ipnets deleted",1); + } else { + $allowed_ip_group_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::increment($context->{db}); + _info($context,"new allowed ips group id for subscriber $sip_username aquired",1); + } + + my $allowed_ips_grp_ipnet_ids = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::insert_rows($context->{db},$allowed_ip_group_id,$allowed_ips); + _info($context,"ipnets for allowed ips group for subscriber $sip_username created",1); + + if (not defined $allowed_ips_grp_attribute_preference) { + $allowed_ip_group_preferrence_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db}, + attribute_id => $attribute->{id}, + subscriber_id => $subscriber_id, + value => $allowed_ip_group_id, + ); + _info($context,"new allowed ips group preference value for subscriber $sip_username added",1); + } + + return ($allowed_ip_group_id,$allowed_ip_group_preferrence_id); + + #$context->{preferences}->{allowed_ips_grp} = { id => $allowed_ip_group_preferrence_id, $allowed_ip_group_id }; } -sub delete_preference { +my %get_preference_sub_names = ( + voip_usr_preferences => 'findby_subscriberid_attributeid', +); +my %preference_id_cols = ( + voip_usr_preferences => 'subscriber_id', +); + +sub clear_subscriber_preferences { + my ($context,$subscriber_id,$attribute,$except_value) = @_; + return _clear_preferences($context,'voip_usr_preferences',$subscriber_id,$attribute,$except_value); +} +sub delete_subscriber_preference { my ($context,$subscriber_id,$attribute,$value) = @_; + return _delete_preference($context,'voip_usr_preferences',$subscriber_id,$attribute,$value); +} +sub set_subscriber_preference { + my ($context,$subscriber_id,$attribute,$value) = @_; + return _set_preference($context,'voip_usr_preferences',$subscriber_id,$attribute,$value); +} +sub get_subscriber_preference { + my ($context,$subscriber_id,$attribute) = @_; + return _get_preference($context,'voip_usr_preferences',$subscriber_id,$attribute); +} + +sub _clear_preferences { + my ($context,$pref_type,$id,$attribute,$except_value) = @_; - return NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::delete_preferences($context->{db}, - $subscriber_id, $attribute->{id}, { 'IN' => $value } ); + return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db}, + $id, $attribute->{id}, defined $except_value ? { 'NOT IN' => $except_value } : undef); } -sub set_preference { - my ($context,$subscriber_id,$attribute,$value) = @_; +sub _delete_preference { + my ($context,$pref_type,$id,$attribute,$value) = @_; + + return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db}, + $id, $attribute->{id}, { 'IN' => $value } ); + +} + +sub _set_preference { + my ($context,$pref_type,$id,$attribute,$value) = @_; if ($attribute->{max_occur} == 1) { - my $old_preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db}, - $subscriber_id,$attribute->{id}); + my $old_preferences = &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::' . $get_preference_sub_names{$pref_type}}($context->{db}, + $id,$attribute->{id}); if (defined $value) { if ((scalar @$old_preferences) == 1) { - NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::update_row($context->{db},{ + &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::update_row'}($context->{db},{ id => $old_preferences->[0]->{id}, value => $value, }); return $old_preferences->[0]->{id}; } else { if ((scalar @$old_preferences) > 1) { - NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::delete_preferences($context->{db}, - $subscriber_id,$attribute->{id}); + &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db}, + $id,$attribute->{id}); } - return NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db}, + return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::insert_row'}($context->{db}, attribute_id => $attribute->{id}, - subscriber_id => $subscriber_id, + $preference_id_cols{$pref_type} => $id, value => $value, ); } } else { - NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::delete_preferences($context->{db}, - $subscriber_id,$attribute->{id}); + &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db}, + $id,$attribute->{id}); return undef; } } else { if (defined $value) { - return NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db}, + return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::insert_row'}($context->{db}, attribute_id => $attribute->{id}, - subscriber_id => $subscriber_id, + $preference_id_cols{$pref_type} => $id, value => $value, ); } else { @@ -616,11 +213,12 @@ sub set_preference { } -sub get_preference { - my ($context,$subscriber_id,$attribute) = @_; +sub _get_preference { + my ($context,$pref_type,$id,$attribute) = @_; + + my $preferences = &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::' . $get_preference_sub_names{$pref_type}}($context->{db}, + $id,$attribute->{id}); - my $preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db}, - $subscriber_id,$attribute->{id}); if ($attribute->{max_occur} == 1) { return $preferences->[0]; } else { diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Provisioning.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Provisioning.pm index eeadc4b..16842b0 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Provisioning.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Provisioning.pm @@ -16,22 +16,13 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( $provision_subscriber_multithreading $provision_subscriber_numofthreads + $webpassword_length + $webusername_length $reseller_mapping + $barring_profiles ); -#$batch - -#$reseller_id -#$domain_name -#$subsciber_username_prefix -#$billing_profile_id -#$contact_email_format -#$webpassword_length -#$generate_webpassword - -#$reprovision_upon_password_change -#$always_update_subscriber use NGCP::BulkProcessor::Logging qw ( getlogger @@ -45,6 +36,7 @@ use NGCP::BulkProcessor::LogError qw( use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw(); +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::products qw(); @@ -57,6 +49,7 @@ use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains qw(); use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); @@ -70,9 +63,12 @@ use NGCP::BulkProcessor::RestRequests::Trunk::Subscribers qw(); use NGCP::BulkProcessor::RestRequests::Trunk::Customers qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Preferences qw( - set_preference - clear_preferences - delete_preference + set_subscriber_preference + get_subscriber_preference + clear_subscriber_preferences + delete_subscriber_preference + set_allowed_ips_preferences + cleanup_aig_sequence_ids ); use NGCP::BulkProcessor::ConnectorPool qw( @@ -83,8 +79,9 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw( destroy_all_dbs ); -use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp); +use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp stringtobool check_ipnet trim); use NGCP::BulkProcessor::DSSorter qw(sort_by_configs); +use NGCP::BulkProcessor::RandomString qw(createtmpstring); require Exporter; our @ISA = qw(Exporter); @@ -93,9 +90,15 @@ our @EXPORT_OK = qw( ); +my $split_ipnets_pattern = join('|',( + quotemeta(','), + quotemeta(';'), + #quotemeta('/') +)); + sub provision_subscribers { - my $static_context = { now => timestamp() }; + my $static_context = { now => timestamp(), rowcount => undef }; my $result = _provision_subscribers_checks($static_context); destroy_all_dbs(); @@ -105,11 +108,13 @@ sub provision_subscribers { static_context => $static_context, process_code => sub { my ($context,$records,$row_offset) = @_; + $context->{rowcount} = $row_offset; foreach my $domain_sipusername (@$records) { + $context->{rowcount} += 1; next unless _provision_susbcriber($context, NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_domain_sipusername(@$domain_sipusername)); } - + cleanup_aig_sequence_ids($context); return 1; }, init_process_context_code => sub { @@ -177,6 +182,7 @@ sub _provision_susbcriber { _update_contract($context); _update_subscriber($context); _create_aliases($context); + _update_preferences($context); #todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it #in this main provisioning loop, or align it in separate run-modes, according to the files given. @@ -212,6 +218,39 @@ sub _provision_subscribers_checks { my $result = 1; + my $subscribercount = 0; + eval { + $subscribercount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::countby_ccacsn(); + }; + if ($@ or $subscribercount == 0) { + rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"$subscribercount subscriber found",getlogger(__PACKAGE__)); + } + + my $allowedclicount = 0; + eval { + $allowedclicount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::countby_ccacsn(); + }; + if ($@ or $allowedclicount == 0) { + rowprocessingerror(threadid(),'please import allowed clis first',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"$allowedclicount allowed clis found",getlogger(__PACKAGE__)); + } + + my $clircount = 0; + eval { + $clircount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_clir(); + }; + if ($@ or $clircount == 0) { + rowprocessingerror(threadid(),'please import clir first',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"$clircount clir records found",getlogger(__PACKAGE__)); + } + my $domain_billingprofilename_resellernames = []; eval { $domain_billingprofilename_resellernames = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::list_domain_billingprofilename_resellernames(); @@ -247,6 +286,7 @@ sub _provision_subscribers_checks { $result = 0; #even in skip-error mode.. } else { $context->{reseller_map}->{$resellername}->{billingprofile_map} = {}; + processing_info(threadid(),"reseller '$resellername' found",getlogger(__PACKAGE__)); } } if (not exists $context->{domain_map}->{$domain}) { @@ -254,16 +294,19 @@ sub _provision_subscribers_checks { $context->{domain_map}->{$domain} = NGCP::BulkProcessor::Dao::Trunk::billing::domains::findby_domain($domain); }; if ($@ or not $context->{domain_map}->{$domain}) { - rowprocessingerror(threadid(),"cannot find domain '$domain' (billing)",getlogger(__PACKAGE__)); + rowprocessingerror(threadid(),"cannot find billing domain '$domain'",getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. } else { + processing_info(threadid(),"billing domain '$domain' found",getlogger(__PACKAGE__)); eval { $context->{domain_map}->{$domain}->{prov_domain} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains::findby_domain($domain); }; if ($@ or not $context->{domain_map}->{$domain}->{prov_domain}) { - rowprocessingerror(threadid(),"cannot find domain '$domain' (provisioning)",getlogger(__PACKAGE__)); + rowprocessingerror(threadid(),"cannot find provisioning domain '$domain'",getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"provisioning domain '$domain' found",getlogger(__PACKAGE__)); } } } @@ -276,6 +319,8 @@ sub _provision_subscribers_checks { if ($@ or not $domain_reseller) { rowprocessingerror(threadid(),"domain $domain does not belong to reseller $resellername",getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"domain $domain belongs to reseller $resellername",getlogger(__PACKAGE__)); } if ($context->{reseller_map}->{$resellername}->{billingprofile_map} and @@ -291,6 +336,8 @@ sub _provision_subscribers_checks { if ($@ or not $context->{reseller_map}->{$resellername}->{billingprofile_map}->{$billingprofilename}) { rowprocessingerror(threadid(),"cannot find billing profile '$billingprofilename' of reseller '$resellername'",getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"billing profile '$billingprofilename' of reseller '$resellername' found",getlogger(__PACKAGE__)); } } } @@ -303,6 +350,8 @@ sub _provision_subscribers_checks { if ($@ or not defined $context->{sip_account_product}) { rowprocessingerror(threadid(),"cannot find $NGCP::BulkProcessor::Dao::Trunk::billing::products::SIP_ACCOUNT_HANDLE product",getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"$NGCP::BulkProcessor::Dao::Trunk::billing::products::SIP_ACCOUNT_HANDLE product found",getlogger(__PACKAGE__)); } $context->{attributes} = {}; @@ -314,6 +363,8 @@ sub _provision_subscribers_checks { if ($@ or not defined $context->{attributes}->{allowed_clis}) { rowprocessingerror(threadid(),'cannot find allowed_clis attribute',getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"allowed_clis attribute found",getlogger(__PACKAGE__)); } eval { @@ -323,6 +374,8 @@ sub _provision_subscribers_checks { if ($@ or not defined $context->{attributes}->{cli}) { rowprocessingerror(threadid(),'cannot find cli attribute',getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"cli attribute found",getlogger(__PACKAGE__)); } eval { @@ -332,6 +385,8 @@ sub _provision_subscribers_checks { if ($@ or not defined $context->{attributes}->{ac}) { rowprocessingerror(threadid(),'cannot find ac attribute',getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"ac attribute found",getlogger(__PACKAGE__)); } eval { @@ -341,6 +396,8 @@ sub _provision_subscribers_checks { if ($@ or not defined $context->{attributes}->{cc}) { rowprocessingerror(threadid(),'cannot find cc attribute',getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"cc attribute found",getlogger(__PACKAGE__)); } eval { @@ -350,21 +407,119 @@ sub _provision_subscribers_checks { if ($@ or not defined $context->{attributes}->{account_id}) { rowprocessingerror(threadid(),'cannot find account_id attribute',getlogger(__PACKAGE__)); $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"account_id attribute found",getlogger(__PACKAGE__)); + } + + eval { + $context->{attributes}->{concurrent_max_total} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CONCURRENT_MAX_TOTAL_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{concurrent_max_total}) { + rowprocessingerror(threadid(),'cannot find concurrent_max_total attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"concurrent_max_total attribute found",getlogger(__PACKAGE__)); + } + + eval { + $context->{attributes}->{clir} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CLIR_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{clir}) { + rowprocessingerror(threadid(),'cannot find clir attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"clir attribute found",getlogger(__PACKAGE__)); + } + + eval { + $context->{attributes}->{allowed_ips_grp} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{allowed_ips_grp}) { + rowprocessingerror(threadid(),'cannot find allowed_ips_grp attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"allowed_ips_grp attribute found",getlogger(__PACKAGE__)); } - #eval { - # $context->{peer_auth_pass_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( - # $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_PASS); - #}; - #if ($@ or not defined $context->{peer_auth_pass_attribute}) { - # rowprocessingerror(threadid(),'cannot find peer_auth_pass attribute',getlogger(__PACKAGE__)); - # $result = 0; #even in skip-error mode.. - #} + my $barring_resellernames = []; + eval { + $barring_resellernames = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::list_barring_resellernames(); + }; + if ($@) { + rowprocessingerror(threadid(),'error retrieving barrings',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + $context->{ncos_level_map} = {}; + foreach my $barring_resellername (@$barring_resellernames) { + my $resellername = _apply_reseller_mapping($barring_resellername->{reseller_name}); + #unless ($resellername) { + # rowprocessingerror(threadid(),"empty reseller name detected",getlogger(__PACKAGE__)); + # $result = 0; #even in skip-error mode.. + #} + my $barring = $barring_resellername->{barrings}; + next unless ($barring); + $result &= _check_ncos_level($context,$resellername,$barring); + } + } + + eval { + $context->{attributes}->{adm_ncos_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ADM_NCOS_ID_ATTRIBUTE); + }; + if ($@ or not defined $context->{attributes}->{adm_ncos_id}) { + rowprocessingerror(threadid(),'cannot find adm_ncos_id attribute',getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"adm_ncos_id attribute found",getlogger(__PACKAGE__)); + } return $result; } +sub _check_ncos_level { + my ($context,$resellername,$barring) = @_; + my $result = 1; + if (not exists $barring_profiles->{$resellername}) { + rowprocessingerror(threadid(),"barring mappings for reseller $resellername missing",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } elsif (not exists $barring_profiles->{$resellername}->{$barring}) { + rowprocessingerror(threadid(),"mappings for barring '" . $barring . "' of reseller $resellername missing",getlogger(__PACKAGE__)); + $result = 0; #even in skip-error mode.. + } else { + my $reseller_id = $context->{reseller_map}->{$resellername}->{id}; + $context->{ncos_level_map}->{$reseller_id} = {} unless exists $context->{ncos_level_map}->{$reseller_id}; + my $level = $barring_profiles->{$resellername}->{$barring}; + unless (exists $context->{ncos_level_map}->{$reseller_id}->{$barring}) { + if (not defined $level or length($level) == 0) { + $context->{ncos_level_map}->{$reseller_id}->{$barring} = undef; + } else { + eval { + $context->{ncos_level_map}->{$reseller_id}->{$barring} = NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels::findby_resellerid_level( + $reseller_id,$level); + }; + if ($@ or not defined $context->{ncos_level_map}->{$reseller_id}->{$barring}) { + my $err = "cannot find ncos level '$level' of reseller $resellername"; + if (not defined $context->{rowcount}) { + rowprocessingerror(threadid(),$err,getlogger(__PACKAGE__)); + } elsif ($skip_errors) { + _warn($context, $err); + } else { + _error($context, $err); + } + $result = 0; #even in skip-error mode.. + } else { + processing_info(threadid(),"ncos level '$level' of reseller $resellername found",getlogger(__PACKAGE__)); + } + } + } + } + return $result; +} + sub _update_contact { my ($context) = @_; @@ -378,6 +533,7 @@ sub _update_contact { $context->{contract}->{contact}, ); $context->{contract}->{contact_id} = $context->{contract}->{contact}->{id}; + _info($context,"contact id $context->{contract}->{contact}->{id} created",1); } return 1; @@ -407,6 +563,8 @@ sub _update_contract { $context->{contract}->{contract_balance_id} = NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances::insert_row($context->{db}, contract_id => $context->{contract}->{id}, ); + + _info($context,"contract id $context->{contract}->{id} created",1); } return 1; @@ -454,7 +612,7 @@ sub _update_subscriber { ); } - $context->{preferences}->{cli} = { id => set_preference($context, + $context->{preferences}->{cli} = { id => set_subscriber_preference($context, $context->{prov_subscriber}->{id}, $context->{attributes}->{cli}, $number->{number}), value => $number->{number} }; @@ -464,6 +622,8 @@ sub _update_subscriber { primary_number_id => $context->{voip_numbers}->{primary}->{id}, }); + _info($context,"subscriber uuid $context->{prov_subscriber}->{uuid} created",1); + #primary alias $context->{aliases}->{primary}->{id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::insert_row($context->{db}, domain_id => $context->{prov_subscriber}->{domain_id}, @@ -472,7 +632,7 @@ sub _update_subscriber { ); my @allowed_clis = (); - push(@allowed_clis,{ id => set_preference($context, + push(@allowed_clis,{ id => set_subscriber_preference($context, $context->{prov_subscriber}->{id}, $context->{attributes}->{allowed_clis}, $number->{number}), value => $number->{number}}); @@ -484,38 +644,88 @@ sub _update_subscriber { NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::delete_dbaliases($context->{db}, $context->{prov_subscriber}->{id},{ 'NOT IN' => $number->{number} }); - clear_preferences($context, + clear_subscriber_preferences($context, $context->{prov_subscriber}->{id}, $context->{attributes}->{allowed_clis}, $number->{number}); + _info($context,"primary alias $number->{number} created",1); + $context->{voicemail_user}->{id} = NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users::insert_row($context->{db}, $context->{voicemail_user}, ); - $context->{preferences}->{account_id} = { id => set_preference($context, + $context->{preferences}->{account_id} = { id => set_subscriber_preference($context, $context->{prov_subscriber}->{id}, $context->{attributes}->{account_id}, $context->{contract}->{id}), value => $context->{contract}->{id} }; if (length($number->{ac}) > 0) { - $context->{preferences}->{ac} = { id => set_preference($context, + $context->{preferences}->{ac} = { id => set_subscriber_preference($context, $context->{prov_subscriber}->{id}, $context->{attributes}->{ac}, $number->{ac}), value => $number->{ac} }; } if (length($number->{cc}) > 0) { - $context->{preferences}->{cc} = { id => set_preference($context, + $context->{preferences}->{cc} = { id => set_subscriber_preference($context, $context->{prov_subscriber}->{id}, $context->{attributes}->{cc}, $number->{cc}), value => $number->{cc} }; } + } return $result; } +sub _update_preferences { + + my ($context) = @_; + + my $result = 1; + + if (defined $context->{channels}) { + $context->{preferences}->{concurrent_max_total} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{concurrent_max_total}, + $context->{channels}), value => $context->{channels} }; + _info($context,"concurrent_max_total preference set to $context->{channels}",1); + } + + if ($context->{clir}) { + $context->{preferences}->{clir} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{clir}, + $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::TRUE), value => $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::TRUE }; + _info($context,"clir preference set to $context->{clir}",1); + } + + if ((scalar @{$context->{allowed_ips}}) > 0) { + my ($allowed_ip_group_preferrence_id, $allowed_ip_group_id) = set_allowed_ips_preferences($context, + $context->{prov_subscriber}->{id}, + $context->{prov_subscriber}->{username}, + $context->{attributes}->{allowed_ips_grp}, + $context->{allowed_ips}, + ); + $context->{preferences}->{allowed_ips_grp} = { id => $allowed_ip_group_preferrence_id, value => $allowed_ip_group_id }; + _info($context,"allowed_ips_grp preference set to $allowed_ip_group_id - " . join(',',@{$context->{allowed_ips}}),1); + } + + if (defined $context->{ncos_level}) { + $context->{preferences}->{adm_ncos_id} = { id => set_subscriber_preference($context, + $context->{prov_subscriber}->{id}, + $context->{attributes}->{adm_ncos_id}, + $context->{ncos_level}->{id}), value => $context->{ncos_level}->{id} }; + _info($context,"adm_ncos_id preference set to $context->{ncos_level}->{id} - $context->{ncos_level}->{level}",1); + } + + + + return $result; + +} + sub _create_aliases { my ($context) = @_; @@ -576,15 +786,16 @@ sub _create_aliases { push(@{$context->{aliases}->{other}},$alias); push(@usernames,$number->{number}); - delete_preference($context, + delete_subscriber_preference($context, $context->{prov_subscriber}->{id}, $context->{attributes}->{allowed_clis}, $number->{number}); - push(@{$context->{preferences}->{allowed_clis}},{ id => set_preference($context, + push(@{$context->{preferences}->{allowed_clis}},{ id => set_subscriber_preference($context, $context->{prov_subscriber}->{id}, $context->{attributes}->{allowed_clis}, $number->{number}), value => $number->{number}}); + _info($context,"alias $number->{number} created",1); } push(@voip_number_ids,$context->{voip_numbers}->{primary}->{id}); @@ -596,11 +807,23 @@ sub _create_aliases { NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::delete_dbaliases($context->{db},$context->{prov_subscriber}->{id}, { 'NOT IN' => \@usernames }); - clear_preferences($context, + clear_subscriber_preferences($context, $context->{prov_subscriber}->{id}, $context->{attributes}->{allowed_clis}, \@usernames ); + #test: + #my $allowed_clis = get_subscriber_preference($context, + # $context->{prov_subscriber}->{id}, + # $context->{attributes}->{allowed_clis}); + + #my $voip_numbers = NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::findby_subscriberid($context->{db}, + # $context->{bill_subscriber}->{id}); + + #my $aliases = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberid_username($context->{db}, + # $context->{prov_subscriber}->{id},undef); + + #_info($context,(scalar @{$context->{numbers}->{other}}) . " aliases created: " . join(',',(map { $_->{number}; } @{$context->{numbers}->{other}}))); } return $result; } @@ -619,25 +842,17 @@ sub _provision_susbcriber_init_context { } $context->{domain} = $context->{domain_map}->{$first->{domain}}; - $context->{reseller} = $context->{reseller_map}->{_apply_reseller_mapping($first->{reseller_name})}; + my $resellername = _apply_reseller_mapping($first->{reseller_name}); + $context->{reseller} = $context->{reseller_map}->{$resellername}; $context->{billing_profile} = $context->{reseller}->{billingprofile_map}->{$first->{billing_profile_name}}; $context->{prov_subscriber} = {}; $context->{prov_subscriber}->{username} = $first->{sip_username}; $context->{prov_subscriber}->{password} = $first->{sip_password}; $context->{prov_subscriber}->{webusername} = $first->{web_username}; - if (not (defined $first->{web_username} and length($first->{web_username}) > 0)) { - $context->{prov_subscriber}->{webusername} = undef; - } else { - my %webusername_dupes = map { $_->{sip_username} => 1; } - @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_domain_webusername( - $first->{domain},$context->{prov_subscriber}->{webusername})}; - if ((scalar keys %webusername_dupes) > 1) { - #_warn($context,"duplicate web_username $context->{prov_subscriber}->{webusername}, using sip_username"); - $context->{prov_subscriber}->{webusername} = $first->{sip_username}; - } - } $context->{prov_subscriber}->{webpassword} = $first->{web_password}; + my $webusername = $first->{web_username}; + $context->{prov_subscriber}->{uuid} = create_uuid(); $context->{prov_subscriber}->{domain_id} = $context->{domain}->{prov_domain}->{id}; @@ -647,17 +862,20 @@ sub _provision_susbcriber_init_context { $context->{bill_subscriber}->{uuid} = $context->{prov_subscriber}->{uuid}; undef $context->{contract}; + undef $context->{channels}; my @numbers = (); my %number_dupes = (); my %contact_dupes = (); + my %allowed_ips = (); + my %barrings = (); foreach my $subscriber (@$subscriber_group) { - my $number = ($subscriber->{cc} // '') . ($subscriber->{ac} // '') . ($subscriber->{sn} // ''); + my $number = $subscriber->{cc} . $subscriber->{ac} . $subscriber->{sn}; if (not exists $number_dupes{$number}) { push(@numbers,{ - cc => $subscriber->{cc} // '', - ac => $subscriber->{ac} // '', - sn => $subscriber->{sn} // '', + cc => $subscriber->{cc}, + ac => $subscriber->{ac}, + sn => $subscriber->{sn}, number => $number, delta => $subscriber->{delta}, additional => 0, @@ -693,15 +911,100 @@ sub _provision_susbcriber_init_context { _warn($context,'non-unique contact hash, skipped'); } } + + my $channels = $subscriber->{channels}; + if (defined $channels and length($channels) > 0) { + if (not ($channels > 0)) { + _warn($context,"invalid number of channels $subscriber->{channels}, ignoring"); + } elsif (not defined $context->{channels} or $channels > $context->{channels}) { + $context->{channels} = $channels; + } + } + #print $subscriber->{allowed_ips} . "\n"; + if (defined $subscriber->{allowed_ips} and length($subscriber->{allowed_ips}) > 0) { + foreach my $ipnet (map { local $_ = $_; trim($_); } split(/$split_ipnets_pattern/,$subscriber->{allowed_ips})) { + if (check_ipnet($ipnet)) { + if ('0.0.0.0' ne $ipnet) { + $allowed_ips{$ipnet} = 1; + } else { + _info($context,"allowed_ip '$ipnet' ignored",1); + } + } else { + _warn($context,"invalid allowed_ip '$ipnet', ignored"); + } + } + } + #$context->{allowed_ips} = \@allowed_ips; + unless (defined $context->{prov_subscriber}->{password} and length($context->{prov_subscriber}->{password}) > 0) { + $context->{prov_subscriber}->{password} = $subscriber->{sip_password}; + } + + unless (defined $context->{prov_subscriber}->{webusername} and length($context->{prov_subscriber}->{webusername}) > 0 + and defined $context->{prov_subscriber}->{webpassword} and length($context->{prov_subscriber}->{webpassword}) > 0) { + $context->{prov_subscriber}->{webusername} = $subscriber->{web_username}; + $context->{prov_subscriber}->{webpassword} = $subscriber->{web_password}; + } + + unless (defined $webusername and length($webusername) > 0) { + $webusername = $subscriber->{web_username}; + } + + if (defined $subscriber->{barrings} and length($subscriber->{barrings}) > 0) { + $barrings{$subscriber->{barrings}} = 1; + } + + } + + unless (defined $context->{prov_subscriber}->{webusername} and length($context->{prov_subscriber}->{webusername}) > 0) { + $context->{prov_subscriber}->{webusername} = $webusername; + $context->{prov_subscriber}->{webpassword} = undef; + } + + if (not (defined $context->{prov_subscriber}->{webusername} and length($context->{prov_subscriber}->{webusername}) > 0)) { + $context->{prov_subscriber}->{webusername} = undef; + $context->{prov_subscriber}->{webpassword} = undef; + _info($context,"empty web_username for sip_username '$first->{sip_username}'",1); + } else { + $webusername = $context->{prov_subscriber}->{webusername}; + my %webusername_dupes = map { $_->{sip_username} => 1; } + @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_domain_webusername( + $first->{domain},$webusername)}; + if ((scalar keys %webusername_dupes) > 1) { + my $generated = _generate_webusername(); #$first->{sip_username}; + _info($context,"duplicate web_username '$webusername', using generated '$generated'",1); + $context->{prov_subscriber}->{webusername} = $generated; + } + + #$context->{prov_subscriber}->{webpassword} = $first->{web_password}; + if (not (defined $context->{prov_subscriber}->{webpassword} and length($context->{prov_subscriber}->{webpassword}) > 0)) { + my $generated = _generate_webpassword(); + _info($context,"empty web_password for web_username '$webusername', using generated '$generated'",1); + $context->{prov_subscriber}->{webpassword} = $generated; + #} elsif (defined $first->{web_password} and length($first->{web_password}) < 8) { + # $context->{prov_subscriber}->{webpassword} = _generate_webpassword(); + # _info($context,"web_password for web_username '$first->{web_username}' is too short, using '$context->{prov_subscriber}->{webpassword}'"); + } + } + $context->{allowed_ips} = [ keys %allowed_ips ]; + $context->{ncos_level} = undef; + if ((scalar keys %barrings) > 1) { + my $combined_barring = join('_',sort keys %barrings); + #$result &= + _check_ncos_level($context,$resellername,$combined_barring); + _info($context,"barrings combination $combined_barring"); + $context->{ncos_level} = $context->{ncos_level_map}->{$context->{reseller}->{id}}->{$combined_barring}; + } elsif ((scalar keys %barrings) == 1) { + my ($barring) = keys %barrings; + $context->{ncos_level} = $context->{ncos_level_map}->{$context->{reseller}->{id}}->{$barring}; } foreach my $allowed_cli (@{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::findby_sipusername($first->{sip_username})}) { - my $number = ($allowed_cli->{cc} // '') . ($allowed_cli->{ac} // '') . ($allowed_cli->{sn} // ''); + my $number = $allowed_cli->{cc} . $allowed_cli->{ac} . $allowed_cli->{sn}; if (not exists $number_dupes{$number}) { push(@numbers,{ - cc => $allowed_cli->{cc} // '', - ac => $allowed_cli->{ac} // '', - sn => $allowed_cli->{sn} // '', + cc => $allowed_cli->{cc}, + ac => $allowed_cli->{ac}, + sn => $allowed_cli->{sn}, number => $number, delta => $allowed_cli->{delta}, additional => 1, @@ -732,6 +1035,7 @@ sub _provision_susbcriber_init_context { }, ]); $context->{numbers}->{primary} = shift(@{$context->{numbers}->{other}}); + #return 0 unless scalar @{$context->{numbers}->{other}}; $context->{voip_numbers} = {}; $context->{voip_numbers}->{primary} = undef; @@ -740,24 +1044,35 @@ sub _provision_susbcriber_init_context { $context->{aliases}->{primary} = undef; $context->{aliases}->{other} = []; - $context->{preferences} = {}; - $context->{voicemail_user} = {}; $context->{voicemail_user}->{customer_id} = $context->{prov_subscriber}->{uuid}; $context->{voicemail_user}->{mailbox} = $context->{numbers}->{primary}->{number}; $context->{voicemail_user}->{password} = sprintf("%04d", int(rand 10000)); + $context->{preferences} = {}; + $context->{clir} = 0; + if (my $clir = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::findby_sipusername($first->{sip_username})) { + $context->{clir} = stringtobool($clir->{clir}); + } + + + #$context->{counts} = {} unless defined $context->{counts}; + return $result; } -#sub _generate_webpassword { -# return String::MkPasswd::mkpasswd( -# -length => $webpassword_length, -# -minnum => 1, -minlower => 1, -minupper => 1, -minspecial => 1, -# -distribute => 1, -fatal => 1, -# ); -#} +sub _generate_webpassword { + return String::MkPasswd::mkpasswd( + -length => $webpassword_length, + -minnum => 1, -minlower => 1, -minupper => 1, -minspecial => 1, + -distribute => 1, -fatal => 1, + ); +} + +sub _generate_webusername { + return createtmpstring($webusername_length); +} sub _apply_reseller_mapping { my $reseller_name = shift; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm index a987028..b783df8 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/Settings.pm @@ -26,13 +26,15 @@ use NGCP::BulkProcessor::LoadConfig qw( split_tuple parse_regexp ); -use NGCP::BulkProcessor::Utils qw(format_number check_ipnet prompt); +use NGCP::BulkProcessor::Utils qw(prompt); +#format_number check_ipnet require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( update_settings update_reseller_mapping + update_barring_profiles check_dry $input_path @@ -55,20 +57,25 @@ our @EXPORT_OK = qw( $subscriber_import_unfold_ranges $reseller_mapping_yml $reseller_mapping + $barring_profiles_yml + $barring_profiles @allowedcli_filenames $allowedcli_import_numofthreads $ignore_allowedcli_unique $allowedcli_import_single_row_txn + $allowedcli_import_unfold_ranges + @clir_filenames + $clir_import_numofthreads + $ignore_clir_unique + $clir_import_single_row_txn $provision_subscriber_multithreading $provision_subscriber_numofthreads - $always_update_subscriber + $webpassword_length + $webusername_length - $set_allowed_ips_multithreading - $set_allowed_ips_numofthreads - $allowed_ips $set_call_forwards_multithreading $set_call_forwards_numofthreads @@ -89,6 +96,9 @@ our @EXPORT_OK = qw( ); #$concurrent_max_total +# $set_allowed_ips_multithreading +# $set_allowed_ips_numofthreads +# $allowed_ips our $defaultconfig = 'config.cfg'; our $defaultsettings = 'settings.cfg'; @@ -110,21 +120,27 @@ our $subscriber_import_single_row_txn = 1; our $subscriber_import_unfold_ranges = 1; our $reseller_mapping_yml = undef; our $reseller_mapping = {}; +our $barring_profiles_yml = undef; +our $barring_profiles = {}; our @allowedcli_filenames = (); our $allowedcli_import_numofthreads = $cpucount; our $ignore_allowedcli_unique = 0; our $allowedcli_import_single_row_txn = 1; +our $allowedcli_import_unfold_ranges = 1; - +our @clir_filenames = (); +our $clir_import_numofthreads = $cpucount; +our $ignore_clir_unique = 0; +our $clir_import_single_row_txn = 1; our $provision_subscriber_multithreading = $enablemultithreading; our $provision_subscriber_numofthreads = $cpucount; -our $always_update_subscriber = 0; - -our $set_allowed_ips_multithreading = $enablemultithreading; -our $set_allowed_ips_numofthreads = $cpucount; -our $allowed_ips = []; +our $webpassword_length = 8; +our $webusername_length = 8; +#our $set_allowed_ips_multithreading = $enablemultithreading; +#our $set_allowed_ips_numofthreads = $cpucount; +#our $allowed_ips = []; our $set_call_forwards_multithreading = $enablemultithreading; our $set_call_forwards_numofthreads = $cpucount; @@ -140,9 +156,9 @@ our $cfnumber_exclude_pattern = undef; our $cfnumber_trim_pattern = undef; our $ringtimeout = undef; -our $set_preference_bulk_multithreading = $enablemultithreading; -our $set_preference_bulk_numofthreads = $cpucount; -our $concurrent_max_total = undef; +#our $set_preference_bulk_multithreading = $enablemultithreading; +#our $set_preference_bulk_numofthreads = $cpucount; +#our $concurrent_max_total = undef; sub update_settings { @@ -168,26 +184,40 @@ sub update_settings { $subscriber_import_single_row_txn = $data->{subscriber_import_single_row_txn} if exists $data->{subscriber_import_single_row_txn}; $subscriber_import_unfold_ranges = $data->{subscriber_import_unfold_ranges} if exists $data->{subscriber_import_unfold_ranges}; $reseller_mapping_yml = $data->{reseller_mapping_yml} if exists $data->{reseller_mapping_yml}; + $barring_profiles_yml = $data->{barring_profiles_yml} if exists $data->{barring_profiles_yml}; @allowedcli_filenames = _get_import_filenames(\@allowedcli_filenames,$data,'allowedcli_filenames'); $allowedcli_import_numofthreads = _get_numofthreads($cpucount,$data,'allowedcli_import_numofthreads'); $ignore_allowedcli_unique = $data->{ignore_allowedcli_unique} if exists $data->{ignore_allowedcli_unique}; $allowedcli_import_single_row_txn = $data->{allowedcli_import_single_row_txn} if exists $data->{allowedcli_import_single_row_txn}; + $allowedcli_import_unfold_ranges = $data->{allowedcli_import_unfold_ranges} if exists $data->{allowedcli_import_unfold_ranges}; + @clir_filenames = _get_import_filenames(\@clir_filenames,$data,'clir_filenames'); + $clir_import_numofthreads = _get_numofthreads($cpucount,$data,'clir_import_numofthreads'); + $ignore_clir_unique = $data->{ignore_clir_unique} if exists $data->{ignore_clir_unique}; + $clir_import_single_row_txn = $data->{clir_import_single_row_txn} if exists $data->{clir_import_single_row_txn}; $provision_subscriber_multithreading = $data->{provision_subscriber_multithreading} if exists $data->{provision_subscriber_multithreading}; $provision_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_subscriber_numofthreads'); - $always_update_subscriber = $data->{always_update_subscriber} if exists $data->{always_update_subscriber}; - - $set_allowed_ips_multithreading = $data->{set_allowed_ips_multithreading} if exists $data->{set_allowed_ips_multithreading}; - $set_allowed_ips_numofthreads = _get_numofthreads($cpucount,$data,'set_allowed_ips_numofthreads'); - $allowed_ips = [ split_tuple($data->{allowed_ips}) ] if exists $data->{allowed_ips}; - foreach my $ipnet (@$allowed_ips) { - if (not check_ipnet($ipnet)) { - configurationerror($configfile,"invalid allowed_ip '$ipnet'",getlogger(__PACKAGE__)); - $result = 0; - } + $webpassword_length = $data->{webpassword_length} if exists $data->{webpassword_length}; + if (not defined $webpassword_length or $webpassword_length <= 7) { + configurationerror($configfile,'webpassword_length greater than 7 required',getlogger(__PACKAGE__)); + $result = 0; } + $webusername_length = $data->{webusername_length} if exists $data->{webusername_length}; + if (not defined $webusername_length or $webusername_length <= 7) { + configurationerror($configfile,'webusername_length greater than 7 required',getlogger(__PACKAGE__)); + $result = 0; + } + #$set_allowed_ips_multithreading = $data->{set_allowed_ips_multithreading} if exists $data->{set_allowed_ips_multithreading}; + #$set_allowed_ips_numofthreads = _get_numofthreads($cpucount,$data,'set_allowed_ips_numofthreads'); + #$allowed_ips = [ split_tuple($data->{allowed_ips}) ] if exists $data->{allowed_ips}; + #foreach my $ipnet (@$allowed_ips) { + # if (not check_ipnet($ipnet)) { + # configurationerror($configfile,"invalid allowed_ip '$ipnet'",getlogger(__PACKAGE__)); + # $result = 0; + # } + #} $set_call_forwards_multithreading = $data->{set_call_forwards_multithreading} if exists $data->{set_call_forwards_multithreading}; $set_call_forwards_numofthreads = _get_numofthreads($cpucount,$data,'set_call_forwards_numofthreads'); @@ -211,8 +241,8 @@ sub update_settings { $result = 0; } - $set_preference_bulk_multithreading = $data->{set_preference_bulk_multithreading} if exists $data->{set_preference_bulk_multithreading}; - $set_preference_bulk_numofthreads = _get_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads'); + #$set_preference_bulk_multithreading = $data->{set_preference_bulk_multithreading} if exists $data->{set_preference_bulk_multithreading}; + #$set_preference_bulk_numofthreads = _get_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads'); #$concurrent_max_total = $data->{concurrent_max_total} if exists $data->{concurrent_max_total}; #if (defined $concurrent_max_total and $concurrent_max_total <= 0) { # configurationerror($configfile,'empty concurrent_max_total or greater than 0 required',getlogger(__PACKAGE__)); @@ -312,4 +342,27 @@ sub update_reseller_mapping { } +sub update_barring_profiles { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + eval { + $barring_profiles = $data->{'mapping'}; + }; + if ($@ or 'HASH' ne ref $barring_profiles or (scalar keys %$barring_profiles) == 0) { + $barring_profiles //= {}; + configurationerror($configfile,'no barring mappings found',getlogger(__PACKAGE__)); + $result = 0; + } + + return $result; + } + return 0; + +} + 1; diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/barring_profiles.yml b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/barring_profiles.yml new file mode 100644 index 0000000..c7e9aef --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/barring_profiles.yml @@ -0,0 +1,19 @@ +mapping: + Junet: + 21: 'Junet 21' + 25: 'Junet 25' + 29: 'Junet 29' + 33: 'Junet 33' + Teleman: + 21: 'Teleman 21' + 25: 'Teleman 25' + 29: 'Teleman 29' + 33: 'Teleman 33' + Teletek: + 21: 'Teletek 21' + 25: 'Teletek 25' + 27: 'Teletek 27' + 29: 'Teletek 29' + 33: 'Teletek 33' + 25_29: 'Teletek 25 29' + 25_33: 'Teletek 25 33' diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl index 50e0c20..5a89605 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/process.pl @@ -13,6 +13,7 @@ use NGCP::BulkProcessor::Globals qw(); use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( update_settings update_reseller_mapping + update_barring_profiles check_dry $output_path $defaultsettings @@ -23,8 +24,11 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw( $run_id @subscriber_filenames $reseller_mapping_yml + $barring_profiles_yml @allowedcli_filenames + + @clir_filenames ); #$allowed_ips @@ -62,6 +66,8 @@ use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles); use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw(destroy_all_dbs); use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw(); +use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); @@ -83,6 +89,7 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Check qw( use NGCP::BulkProcessor::Projects::Migration::Teletek::Import qw( import_subscriber import_allowedcli + import_clir ); use NGCP::BulkProcessor::Projects::Migration::Teletek::Provisioning qw( @@ -123,18 +130,14 @@ push(@TASK_OPTS,$import_allowedcli_task_opt); my $import_truncate_allowedcli_task_opt = 'truncate_allowedcli'; push(@TASK_OPTS,$import_truncate_allowedcli_task_opt); +my $import_clir_task_opt = 'import_clir'; +push(@TASK_OPTS,$import_clir_task_opt); +my $import_truncate_clir_task_opt = 'truncate_clir'; +push(@TASK_OPTS,$import_truncate_clir_task_opt); + my $create_subscriber_task_opt = 'create_subscriber'; push(@TASK_OPTS,$create_subscriber_task_opt); -#my $set_allowed_ips_task_opt = 'set_allowed_ips'; -#push(@TASK_OPTS,$set_allowed_ips_task_opt); - -#my $set_call_forwards_task_opt = 'set_call_forwards'; -#push(@TASK_OPTS,$set_call_forwards_task_opt); - -#my $set_concurrent_max_total_task_opt = 'set_concurrent_max_total'; -#push(@TASK_OPTS,$set_concurrent_max_total_task_opt); - if (init()) { main(); exit(0); @@ -163,6 +166,7 @@ sub init { init_log(); $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); $result &= load_config($reseller_mapping_yml,\&update_reseller_mapping,$YAML_CONFIG_TYPE); + $result &= load_config($barring_profiles_yml,\&update_barring_profiles,$YAML_CONFIG_TYPE); return $result; } @@ -197,6 +201,10 @@ sub main() { } elsif (lc($import_truncate_allowedcli_task_opt) eq lc($task)) { $result &= import_truncate_allowedcli_task(\@messages) if taskinfo($import_truncate_allowedcli_task_opt,$result); + } elsif (lc($import_clir_task_opt) eq lc($task)) { + $result &= import_clir_task(\@messages) if taskinfo($import_clir_task_opt,$result); + } elsif (lc($import_truncate_clir_task_opt) eq lc($task)) { + $result &= import_truncate_clir_task(\@messages) if taskinfo($import_truncate_clir_task_opt,$result); } elsif (lc($create_subscriber_task_opt) eq lc($task)) { if (taskinfo($create_subscriber_task_opt,$result,1)) { @@ -205,28 +213,6 @@ sub main() { $completion |= 1; } - #} elsif (lc($set_allowed_ips_task_opt) eq lc($task)) { - # if (taskinfo($set_allowed_ips_task_opt,$result,1) and ($result = batchinfo($result))) { - # next unless check_dry(); - # $result &= set_allowed_ips_task(\@messages); - # $completion |= 1; - # } - - #} elsif (lc($set_call_forwards_task_opt) eq lc($task)) { - # if (taskinfo($set_call_forwards_task_opt,$result,1) and ($result = batchinfo($result))) { - # next unless check_dry(); - # $result &= set_call_forwards_task(\@messages); - # $completion |= 1; - # } - - #} elsif (lc($set_concurrent_max_total_task_opt) eq lc($task)) { - # if (taskinfo($set_concurrent_max_total_task_opt,$result,1) and ($result = batchinfo($result))) { - # next unless check_dry(); - # $result &= set_preference_bulk_task(\@messages, - # $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CONCURRENT_MAX_TOTAL_ATTRIBUTE, - # $concurrent_max_total); - # $completion |= 1; - # } } else { $result = 0; @@ -369,9 +355,6 @@ sub import_truncate_subscriber_task { } - - - sub import_allowedcli_task { my ($messages) = @_; @@ -431,6 +414,65 @@ sub import_truncate_allowedcli_task { } +sub import_clir_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = import_clir(@clir_filenames); + }; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + $stats .= "\n total clir records: " . + NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_clir() . ' rows'; + my $added_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::added_delta + ); + $stats .= "\n new: $added_count rows"; + my $existing_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::updated_delta + ); + $stats .= "\n existing: $existing_count rows"; + my $deleted_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_delta( + $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::deleted_delta + ); + $stats .= "\n removed: $deleted_count rows"; + }; + if ($err or !$result) { + push(@$messages,"importing clir INCOMPLETE$stats"); + } else { + push(@$messages,"importing clir completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return $result; + +} + + +sub import_truncate_clir_task { + + my ($messages) = @_; + my $result = 0; + eval { + $result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::create_table(1); + }; + my $err = $@; + my $stats = ''; + eval { + $stats .= "\n total clir records: " . + NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_clir() . ' rows'; + }; + if ($err or !$result) { + push(@$messages,"truncating imported clir INCOMPLETE$stats"); + } else { + push(@$messages,"truncating imported clir completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return $result; + +} + sub create_subscriber_task { @@ -482,42 +524,6 @@ sub create_subscriber_task { } -#sub set_allowed_ips_task { -# -# my ($messages) = @_; -# my ($result,$warning_count) = (0,0); -# eval { -# if ($batch) { -# ($result,$warning_count) = set_allowed_ips_batch(); -# } else { -# ($result,$warning_count) = set_allowed_ips(); -# } -# }; -# my $err = $@; -# my $stats = ($skip_errors ? ": $warning_count warnings" : ''); -# eval { -# -# my $allowed_ips_grp_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute( -# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE); -# $stats .= "\n '" . $allowed_ips_grp_attribute->{attribute} . "': " . -# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, -# $allowed_ips_grp_attribute->{id},undef) . ' rows'; -# foreach my $ipnet (@$allowed_ips) { -# $stats .= "\n '$ipnet': " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::countby_groupid_ipnet(undef,$ipnet) . ' rows'; -# } -# $stats .= "\n voip_aig_sequence: " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::get_id(); -# -# }; -# if ($err or !$result) { -# push(@$messages,"set subscribers\' allowed_ips preference INCOMPLETE$stats"); -# } else { -# push(@$messages,"set subscribers\' allowed_ips preference completed$stats"); -# } -# destroy_all_dbs(); #every task should leave with closed connections. -# return $result; -# -#} - #sub set_call_forwards_task { # # my ($messages,$mode) = @_; @@ -558,37 +564,6 @@ sub create_subscriber_task { # #} -#sub set_preference_bulk_task { -# -# my ($messages,$bulk_attribute_name,$value) = @_; -# my ($result,$warning_count) = (0,0); -# eval { -# if ($batch) { -# ($result,$warning_count) = set_preference_bulk_batch($bulk_attribute_name,$value); -# } else { -# ($result,$warning_count) = set_preference_bulk($bulk_attribute_name,$value); -# } -# }; -# my $err = $@; -# my $stats = ($skip_errors ? ": $warning_count warnings" : ''); -# eval { -# my $bulk_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute($bulk_attribute_name); -# -# $stats .= "\n '" . $bulk_attribute->{attribute} . "': " . -# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef, -# $bulk_attribute->{id},undef) . ' rows'; -# -# }; -# if ($err or !$result) { -# push(@$messages,"set subscribers\' $bulk_attribute_name preference INCOMPLETE$stats"); -# } else { -# push(@$messages,"set subscribers\' $bulk_attribute_name preference completed$stats"); -# } -# destroy_all_dbs(); #every task should leave with closed connections. -# return $result; -# -#} - #END { # # this should not be required explicitly, but prevents Log4Perl's # # "rootlogger not initialized error upon exit.. diff --git a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg index 657e326..18c2475 100644 --- a/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Migration/Teletek/settings.cfg @@ -14,7 +14,7 @@ ignore_subscriber_unique = 0 subscriber_import_single_row_txn = 1 subscriber_import_unfold_ranges = 1 reseller_mapping_yml = reseller_mapping.yml - +barring_profiles_yml = barring_profiles.yml #allowedcli_filename = /home/rkrenn/test/teletek/export_multiple_DID_Leica.csv #allowedcli_filename = /home/rkrenn/temp/teletek/export_screeningOnly_170824.csv @@ -22,11 +22,18 @@ allowedcli_filenames = /home/rkrenn/temp/teletek/export_MultipleDID_170823.csv,/ allowedcli_import_numofthreads = 2 ignore_allowedcli_unique = 0 allowedcli_import_single_row_txn = 1 +allowedcli_import_unfold_ranges = 1 + +clir_filenames = /home/rkrenn/temp/teletek/export_CLIR.csv +clir_import_numofthreads = 2 +ignore_clir_unique = 0 +clir_import_single_row_txn = 1 provision_subscriber_multithreading = 1 #provision_subscriber_numofthreads = 6 -always_update_subscriber = 0 +webpassword_length = 8 +webusername_length = 8 set_allowed_ips_multithreading = 1 #set_allowed_ips_numofthreads = 6