TT#18879 implement teletek importer #5

+ introduce support to manipulate contract, domain, etc.
  voip_preferences
+ write concurrent_max_total preference
+ error in invalid "channels = 0"

+ try to find non-empty sip-password of a subscriber, if any
+ try to find nonempty web username+password, if any
+ set emtpy web_password, if web_username is empty
+ auto-generate web_username, if duplicate
+ auto-generate web_password, if web_username is not emtpy but
  web_password is empty

+ parse export_CLIR.csv
+ write clir preference

+ write allowed_ips_grp preference

+ use maximum of concurrent_max_total for a subscriber,
  when inconsisten
+ use combined list of allowed ips, when inconsistent

+ write adm_ncos, derive barrings combination if
  inconsistent for a subscriber

+ log created items, cleanup code a bit

+ range unfolding for numbers in allowedcli files

Change-Id: I26bab2cb9b4ef349777ebef2e883c9e41af3eeba
changes/48/15348/10
Rene Krenn 8 years ago
parent 275afbbfb3
commit 52ad964c53

@ -71,9 +71,12 @@ sub findby_subscriberid_username {
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('subscriber_id') . ' = ?' .
' AND ' . $db->columnidentifier('username') . ' = ?';
my @params = ($subscriber_id,$username);
$db->columnidentifier('subscriber_id') . ' = ?';
my @params = ($subscriber_id);
if (defined $username) {
$stmt .= ' AND ' . $db->columnidentifier('username') . ' = ?';
push(@params,$username);
}
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);

@ -37,6 +37,7 @@ our @EXPORT_OK = qw(
$ALLOWED_IPS_GRP_ATTRIBUTE
$CONCURRENT_MAX_TOTAL_ATTRIBUTE
$CONCURRENT_MAX_PER_ACCOUNT
);
#$FORCE_OUTBOUND_CALLS_TO_PEER
@ -84,6 +85,8 @@ our $FORCE_INBOUND_CALLS_TO_PEER = 'force_inbound_calls_to_peer';
our $ALLOWED_IPS_GRP_ATTRIBUTE = 'allowed_ips_grp';
our $CONCURRENT_MAX_TOTAL_ATTRIBUTE = 'concurrent_max_total';
our $CONCURRENT_MAX_PER_ACCOUNT_ATTRIBUTE = 'concurrent_max_per_account';
our $CLIR_ATTRIBUTE = 'clir';
sub new {

@ -40,6 +40,8 @@ use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw();
use NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw();
use NGCP::BulkProcessor::RestRequests::Trunk::Resellers qw();
use NGCP::BulkProcessor::RestRequests::Trunk::Domains qw();
@ -97,8 +99,8 @@ sub check_billing_db_tables {
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers');
$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels');
#$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers');
$result &= $check_result; push(@$messages,$message);
@ -129,6 +131,12 @@ sub check_import_db_tables {
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir');
$result &= $check_result; push(@$messages,$message);
return $result;
}

@ -67,6 +67,7 @@ our @fieldnames = (
#calculated fields at the end!
'rownum',
'filename',
);
my $expected_fieldnames = [

@ -0,0 +1,287 @@
package NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir;
use strict;
## no critic
use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw(
get_import_db
destroy_all_dbs
);
#import_db_tableidentifier
use NGCP::BulkProcessor::SqlProcessor qw(
registertableinfo
create_targettable
checktableinfo
copy_row
insert_stmt
);
#process_table
use NGCP::BulkProcessor::SqlRecord qw();
#use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
create_table
gettablename
check_table
getinsertstatement
getupsertstatement
@fieldnames
findby_sipusername
countby_clir
update_delta
findby_delta
countby_delta
$deleted_delta
$updated_delta
$added_delta
);
#@fieldnames
#@contact_fieldnames
#process_records
#findby_ccacsn
#countby_ccacsn
#list_domain_billingprofilename_resellernames
my $tablename = 'clir';
my $get_db = \&get_import_db;
#my $get_tablename = \&import_db_tableidentifier;
our @fieldnames = (
"sip_username",
"clir",
#calculated fields at the end!
'rownum',
'filename',
);
my $expected_fieldnames = [
@fieldnames,
'delta',
];
# table creation:
my $primarykey_fieldnames = [ 'sip_username' ];
my $indexes = {
$tablename . '_rownum' => [ 'rownum(11)' ],
$tablename . '_delta' => [ 'delta(7)' ],
};
#my $fixtable_statements = [];
our $deleted_delta = 'DELETED';
our $updated_delta = 'UPDATED';
our $added_delta = 'ADDED';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub create_table {
my ($truncate) = @_;
my $db = &$get_db();
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
sub findby_delta {
my ($delta,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return [] unless defined $delta;
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('delta') . ' = ?'
,$delta);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_sipusername {
my ($sip_username,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
#return [] unless (defined $cc or defined $ac or defined $sn);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('sip_username') . ' = ?'
,$sip_username);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub countby_clir {
my ($clir) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
if (defined $clir) {
$stmt .= ' WHERE ' .
$db->columnidentifier('clir') . ' = ?';
push(@params,$clir);
}
return $db->db_get_value($stmt,@params);
}
sub update_delta {
my ($sip_username,$delta) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'UPDATE ' . $table . ' SET delta = ?';
my @params = ();
push(@params,$delta);
if (defined $sip_username) {
$stmt .= ' WHERE ' .
$db->columnidentifier('sip_username') . ' = ?';
push(@params,$sip_username);
}
return $db->db_do($stmt,@params);
}
sub countby_delta {
my ($deltas) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1';
my @params = ();
if (defined $deltas and 'HASH' eq ref $deltas) {
foreach my $in (keys %$deltas) {
my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in}));
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
push(@params,@values);
}
} elsif (defined $deltas and length($deltas) > 0) {
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?';
push(@params,$deltas);
}
return $db->db_get_value($stmt,@params);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
sub getupsertstatement {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' .
join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')';
my @values = ();
foreach my $fieldname (@$expected_fieldnames) {
if ('delta' eq $fieldname) {
my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' .
$db->columnidentifier('sip_username') . ' = ?';
push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')');
} else {
push(@values,'?');
}
}
$upsert_stmt .= ' VALUES (' . join(',',@values) . ')';
return $upsert_stmt;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -39,6 +39,7 @@ our @EXPORT_OK = qw(
findby_domain_webusername
list_domain_billingprofilename_resellernames
findby_sipusername
list_barring_resellernames
update_delta
findby_delta
@ -92,6 +93,7 @@ our @fieldnames = (
'rownum',
'range',
'contact_hash',
'filename',
);
my $expected_fieldnames = [
@fieldnames,
@ -331,6 +333,20 @@ sub list_domain_billingprofilename_resellernames {
}
sub list_barring_resellernames {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my @cols = map { $db->columnidentifier($_); } qw/barrings reseller_name/;
my $stmt = 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols);
my @params = ();
return $db->db_get_all_arrayref($stmt,@params);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;

@ -15,6 +15,11 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
$allowedcli_import_numofthreads
$ignore_allowedcli_unique
$allowedcli_import_single_row_txn
$allowedcli_import_unfold_ranges
$clir_import_numofthreads
$ignore_clir_unique
$clir_import_single_row_txn
$skip_errors
);
@ -37,9 +42,10 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw(
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw();
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(threadid zerofill);
use NGCP::BulkProcessor::Utils qw(threadid zerofill trim);
use NGCP::BulkProcessor::Table qw(get_rowhash);
require Exporter;
@ -47,6 +53,7 @@ our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
import_subscriber
import_allowedcli
import_clir
);
sub import_subscriber {
@ -75,61 +82,21 @@ sub import_subscriber {
foreach my $row (@$rows) {
$rownum++;
next if (scalar @$row) == 0;
$row = [ map { local $_ = $_; trim($_); } @$row ];
my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($row);
$record->{cc} //= '';
$record->{ac} //= '';
$record->{sn} //= '';
$record->{cc} = trim($record->{cc});
$record->{ac} = trim($record->{ac});
$record->{sn} = trim($record->{sn});
$record->{rownum} = $rownum;
my @subscriber_row;
my %r;
if ($subscriber_import_unfold_ranges and $record->{sn} =~ /\.+$/) {
#if ($record->{sn} == '2861..') {
#print "x";
#}
my $pow = scalar (() = $record->{sn} =~ /\./g);
_warn($context,"number range $record->{sn} results in " . 10**$pow . ' numbers') if $pow > 2;
$record->{sn} =~ s/\.+$//g;
$record->{range} = 0;
%r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]);
my $base_sn = $record->{sn};
%r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames};
if ($context->{upsert}) {
push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta);
}
push(@subscriber_rows, [@subscriber_row]);
for (my $i = 0; $i < 10**$pow; $i++) {
$record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new($record);
#@subscriber_row = @$row;
$record->{sn} = $base_sn . zerofill($i,$pow);
$record->{range} = 1;
%r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames};
if ($context->{upsert}) {
push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta);
}
push(@subscriber_rows,[@subscriber_row]);
}
#if ($base_sn == '2861') {
#print "x";
#last;
#}
} else {
$record->{range} = 0;
%r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]);
%r = %$record; @subscriber_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames};
if ($context->{upsert}) {
push(@subscriber_row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@subscriber_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta);
}
push(@subscriber_rows,\@subscriber_row);
}
}
if ((scalar @subscriber_rows) > 0) {
if ($subscriber_import_single_row_txn) {
foreach my $subscriber_row (@subscriber_rows) {
$record->{filename} = $file;
my %r = %$record;
$record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]);
next unless _unfold_number_ranges($context,$record,\@subscriber_rows);
if ($subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) {
while (defined (my $subscriber_row = shift @subscriber_rows)) {
if ($skip_errors) {
eval { _insert_subscriber_rows($context,[$subscriber_row]); };
_warn($context,$@) if $@;
@ -137,23 +104,46 @@ sub import_subscriber {
_insert_subscriber_rows($context,[$subscriber_row]);
}
}
}
}
if (not $subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) {
if ($skip_errors) {
eval { _insert_subscriber_rows($context,\@subscriber_rows); };
_warn($context,$@) if $@;
} else {
if ($skip_errors) {
eval { _insert_subscriber_rows($context,\@subscriber_rows); };
_warn($context,$@) if $@;
} else {
_insert_subscriber_rows($context,\@subscriber_rows);
}
_insert_subscriber_rows($context,\@subscriber_rows);
}
}
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_import_db(); # keep ref count low..
$context->{upsert} = $upsert;
$context->{unfold_ranges} = $subscriber_import_unfold_ranges;
$context->{fieldnames} = \@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::fieldnames;
$context->{added_delta} = $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::added_delta;
$context->{create_new_record_code} = sub {
return NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber->new(shift);
};
$context->{check_number_code} = sub {
my ($context,$record) = @_;
my $result = 1;
my $number = $record->{cc} . $record->{ac} . $record->{sn};
# prevent db's unique constraint violation:
if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) {
if ($skip_errors) {
_warn($context,"$record->{sip_username}: duplicate number $number");
} else {
_error($context,"$record->{sip_username}: duplicate number $number");
}
$result = 0;
}
return $result;
};
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
@ -177,22 +167,7 @@ sub import_subscriber {
sub _import_subscriber_checks {
my ($file) = @_;
my $result = 1;
#my $optioncount = 0;
#eval {
# $optioncount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::FeatureOption::countby_subscribernumber_option();
#};
#if ($@ or $optioncount == 0) {
# fileprocessingerror($file,'please import subscriber features first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
#my $userpasswordcount = 0;
#eval {
# $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::UsernamePassword::countby_fqdn();
#};
#if ($@ or $userpasswordcount == 0) {
# fileprocessingerror($file,'please import user passwords first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
return $result;
}
@ -230,21 +205,6 @@ sub _insert_subscriber_rows {
}
}
#sub _insert_subscriber_row {
# my ($context,$subscriber_row) = @_;
# $context->{db}->db_do(
# ($context->{upsert} ?
# NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::getupsertstatement()
# : NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::getinsertstatement($ignore_subscriber_unique)),
# @$subscriber_row
# );
#}
sub import_allowedcli {
@ -272,40 +232,30 @@ sub import_allowedcli {
foreach my $row (@$rows) {
$rownum++;
next if (scalar @$row) == 0;
$row = [ map { local $_ = $_; trim($_); } @$row ];
my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli->new($row);
if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) {
my $number = ($record->{cc} // '') . ($record->{ac} // '') . ($record->{sn} // '');
if ($skip_errors) {
_warn($context,"duplicate number: $number");
} else {
_error($context,"duplicate number: $number");
}
next;
}
$record->{cc} //= '';
$record->{ac} //= '';
$record->{sn} //= '';
$record->{cc} = trim($record->{cc});
$record->{ac} = trim($record->{ac});
$record->{sn} = trim($record->{sn});
$record->{rownum} = $rownum;
$record->{filename} = $file;
if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) {
my $number = $record->{cc} . $record->{ac} . $record->{sn};
if ($skip_errors) {
_warn($context,"sip username $record->{sip_username} not found");
_warn($context,"$number: sip username $record->{sip_username} not found");
} else {
_error($context,"sip username $record->{sip_username} not found");
_error($context,"$number: sip username $record->{sip_username} not found");
}
next;
}
$record->{rownum} = $rownum;
my %r = %$record; my @allowedcli_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::fieldnames};
if ($context->{upsert}) {
push(@allowedcli_row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@allowedcli_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::added_delta);
}
push(@allowedcli_rows,\@allowedcli_row);
}
if ((scalar @allowedcli_rows) > 0) {
if ($allowedcli_import_single_row_txn) {
foreach my $allowedcli_row (@allowedcli_rows) {
next unless _unfold_number_ranges($context,$record,\@allowedcli_rows);
if ($allowedcli_import_single_row_txn and (scalar @allowedcli_rows) > 0) {
while (defined (my $allowedcli_row = shift @allowedcli_rows)) {
if ($skip_errors) {
eval { _insert_allowedcli_rows($context,[$allowedcli_row]); };
_warn($context,$@) if $@;
@ -313,23 +263,54 @@ sub import_allowedcli {
_insert_allowedcli_rows($context,[$allowedcli_row]);
}
}
}
}
if (not $allowedcli_import_single_row_txn and (scalar @allowedcli_rows) > 0) {
if ($skip_errors) {
eval { _insert_allowedcli_rows($context,\@allowedcli_rows); };
_warn($context,$@) if $@;
} else {
if ($skip_errors) {
eval { _insert_allowedcli_rows($context,\@allowedcli_rows); };
_warn($context,$@) if $@;
} else {
_insert_allowedcli_rows($context,\@allowedcli_rows);
}
_insert_allowedcli_rows($context,\@allowedcli_rows);
}
}
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_import_db(); # keep ref count low..
$context->{upsert} = $upsert;
$context->{unfold_ranges} = $allowedcli_import_unfold_ranges;
$context->{fieldnames} = \@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::fieldnames;
$context->{added_delta} = $NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::added_delta;
$context->{create_new_record_code} = sub {
return NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli->new(shift);
};
$context->{check_number_code} = sub {
my ($context,$record) = @_;
my $result = 1;
my $number = $record->{cc} . $record->{ac} . $record->{sn};
if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) {
if ($skip_errors) {
_warn($context,"$record->{sip_username}: duplicate number $number");
} else {
_error($context,"$record->{sip_username}: duplicate number $number");
}
$result = 0;
}
# prevent db's unique constraint violation:
if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) {
if ($skip_errors) {
_warn($context,"$record->{sip_username}: duplicate number $number");
} else {
_error($context,"$record->{sip_username}: duplicate number $number");
}
$result = 0;
}
return $result;
};
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
@ -361,14 +342,7 @@ sub _import_allowedcli_checks {
fileprocessingerror($file,'please import subscribers first',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
#my $userpasswordcount = 0;
#eval {
# $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::UsernamePassword::countby_fqdn();
#};
#if ($@ or $userpasswordcount == 0) {
# fileprocessingerror($file,'please import user passwords first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
return $result;
}
@ -406,9 +380,228 @@ sub _insert_allowedcli_rows {
}
}
sub _unfold_number_ranges {
my ($context,$record,$rows) = @_;
sub create_new_record_code{}
my $result = 0;
my @fieldnames = @{$context->{fieldnames}};
my $cc_ac_ok = ($record->{cc} =~ /^\d*$/ and $record->{ac} =~ /^\d*$/);
my @row;
my %r;
if ($context->{unfold_ranges} and $cc_ac_ok and $record->{sn} =~ /\.+$/) {
#if ($record->{sn} == '2861..') {
#print "x";
#}
my $pow = scalar (() = $record->{sn} =~ /\./g);
_info($context,"expanding number range '$record->{sn}' to " . 10**$pow . ' numbers');
_warn($context,"expanding number range '$record->{sn}' results in " . 10**$pow . ' numbers') if $pow > 2;
$record->{sn} =~ s/\.+$//g;
$record->{range} = 0;
#%r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]);
my $base_sn = $record->{sn};
%r = %$record; @row = @r{@fieldnames};
if ($context->{upsert}) {
push(@row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@row,$context->{added_delta});
}
push(@$rows, [@row]) if &{$context->{check_number_code}}($context,$record);
for (my $i = 0; $i < 10**$pow; $i++) {
$record = &{$context->{create_new_record_code}}($record);
#@subscriber_row = @$row;
$record->{sn} = $base_sn . zerofill($i,$pow);
$record->{range} = 1;
%r = %$record; @row = @r{@fieldnames};
if ($context->{upsert}) {
push(@row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@row,$context->{added_delta});
}
push(@$rows,[@row]) if &{$context->{check_number_code}}($context,$record);
}
#if ($base_sn == '2861') {
#print "x";
#last;
#}
$result = 1;
} elsif ($cc_ac_ok and $record->{sn} =~ /^\d*$/) {
$record->{range} = 0;
#%r = %$record; $record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::contact_fieldnames}]);
%r = %$record; @row = @r{@fieldnames};
if ($context->{upsert}) {
push(@row,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@row,$context->{added_delta});
}
push(@$rows,\@row) if &{$context->{check_number_code}}($context,$record);
$result = 1;
} else {
my $number = $record->{cc} . $record->{ac} . $record->{sn};
if ($skip_errors) {
_warn($context,"invalid number: $number");
} else {
_error($context,"invalid number: $number");
}
$result = 0;
}
return $result;
}
sub import_clir {
my (@files) = @_;
my $result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::create_table(0);
foreach my $file (@files) {
$result &= _import_clir_checks($file);
}
my $importer = NGCP::BulkProcessor::Projects::Migration::Teletek::FileProcessors::CSVFile->new($clir_import_numofthreads);
my $upsert = _import_clir_reset_delta();
destroy_all_dbs(); #close all db connections before forking..
my $warning_count :shared = 0;
foreach my $file (@files) {
$result &= $importer->process(
file => $file,
process_code => sub {
my ($context,$rows,$row_offset) = @_;
my $rownum = $row_offset;
my @clir_rows = ();
foreach my $row (@$rows) {
$rownum++;
next if (scalar @$row) == 0;
$row = [ map { local $_ = $_; trim($_); } @$row ];
my $record = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir->new($row);
$record->{rownum} = $rownum;
$record->{filename} = $file;
if ((scalar @{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_sipusername($record->{sip_username})}) == 0) {
if ($skip_errors) {
_warn($context,"sip username $record->{sip_username} not found");
} else {
_error($context,"sip username $record->{sip_username} not found");
}
next;
}
# prevent db's unique constraint violation:
if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::findby_sipusername($record->{sip_username})) {
if ($skip_errors) {
_warn($context,"duplicate sip username $record->{sip_username}");
} else {
_error($context,"duplicate sip username $record->{sip_username}");
}
next;
}
my %r = %$record; my @clir_row = @r{@NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::fieldnames};
if ($context->{upsert}) {
push(@clir_row,$record->{sip_username});
} else {
push(@clir_row,$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::added_delta);
}
push(@clir_rows,\@clir_row);
if ($clir_import_single_row_txn and (scalar @clir_rows) > 0) {
while (defined (my $clir_row = shift @clir_rows)) {
if ($skip_errors) {
eval { _insert_clir_rows($context,[$clir_row]); };
_warn($context,$@) if $@;
} else {
_insert_clir_rows($context,[$clir_row]);
}
}
}
}
if (not $clir_import_single_row_txn and (scalar @clir_rows) > 0) {
if ($skip_errors) {
eval { _insert_clir_rows($context,\@clir_rows); };
_warn($context,$@) if $@;
} else {
_insert_clir_rows($context,\@clir_rows);
}
}
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_import_db(); # keep ref count low..
$context->{upsert} = $upsert;
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
multithreading => $import_multithreading
);
}
return ($result,$warning_count);
}
sub _import_clir_checks {
my ($file) = @_;
my $result = 1;
my $subscribercount = 0;
eval {
$subscribercount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::countby_ccacsn();
};
if ($@ or $subscribercount == 0) {
fileprocessingerror($file,'please import subscribers first',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
}
return $result;
}
sub _import_clir_reset_delta {
my $upsert = 0;
if (NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_clir() > 0) {
processing_info(threadid(),'resetting delta of ' .
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::update_delta(undef,
$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::deleted_delta) .
' clir records',getlogger(__PACKAGE__));
$upsert |= 1;
}
return $upsert;
}
sub _insert_clir_rows {
my ($context,$clir_rows) = @_;
$context->{db}->db_do_begin(
($context->{upsert} ?
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::getupsertstatement()
: NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::getinsertstatement($ignore_clir_unique)),
#NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::gettablename(),
#lock
);
eval {
$context->{db}->db_do_rowblock($clir_rows);
$context->{db}->db_finish();
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_finish(1);
};
die($err);
}
}
sub _error {

@ -3,6 +3,8 @@ use strict;
## no critic
no strict 'refs';
use threads::shared qw();
#use List::Util qw();
@ -13,24 +15,6 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
);
#$batch
#$domain_name
#$reseller_id
#$subsciber_username_prefix
#$set_barring_profiles_multithreading
#$set_barring_profiles_numofthreads
#$barring_profiles
#
#$set_peer_auth_multithreading
#$set_peer_auth_numofthreads
#$peer_auth_realm
#
#$set_allowed_ips_multithreading
#$set_allowed_ips_numofthreads
#$allowed_ips
#
#$set_preference_bulk_multithreading
#$set_preference_bulk_numofthreads
use NGCP::BulkProcessor::Logging qw (
getlogger
@ -69,544 +53,157 @@ require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
clear_preferences
delete_preference
set_preference
get_preference
clear_subscriber_preferences
delete_subscriber_preference
set_subscriber_preference
get_subscriber_preference
set_allowed_ips_preferences
cleanup_aig_sequence_ids
);
# set_barring_profiles
# set_barring_profiles_batch
# set_peer_auth
# set_peer_auth_batch
# set_allowed_ips
# set_allowed_ips_batch
# set_preference_bulk
# set_preference_bulk_batch
# $INIT_PEER_AUTH_MODE
# $SWITCHOVER_PEER_AUTH_MODE
# $CLEAR_PEER_AUTH_MODE
#our $INIT_PEER_AUTH_MODE = 'init';
#our $SWITCHOVER_PEER_AUTH_MODE = 'switchover';
#our $CLEAR_PEER_AUTH_MODE = 'clear';
#sub set_barring_profiles {
#
# my $static_context = {};
# my $result = _set_barring_profiles_checks($static_context);
#
# destroy_all_dbs();
# my $warning_count :shared = 0;
# return ($result && NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::process_records(
# static_context => $static_context,
# process_code => sub {
# my ($context,$records,$row_offset) = @_;
# my $rownum = $row_offset;
# foreach my $imported_subscriber (@$records) {
# $rownum++;
# next unless _reset_set_barring_profile_context($context,$imported_subscriber,$rownum);
# _set_barring_profile($context);
# }
#
# #return 0;
# return 1;
# },
# init_process_context_code => sub {
# my ($context)= @_;
# $context->{db} = &get_xa_db();
# $context->{error_count} = 0;
# $context->{warning_count} = 0;
# # below is not mandatory..
# _check_insert_tables();
# },
# uninit_process_context_code => sub {
# my ($context)= @_;
# undef $context->{db};
# destroy_all_dbs();
# {
# lock $warning_count;
# $warning_count += $context->{warning_count};
# }
# },
# load_recursive => 0,
# multithreading => $set_barring_profiles_multithreading,
# numofthreads => $set_barring_profiles_numofthreads,
# ),$warning_count);
#}
#
#
#
#sub _check_insert_tables {
#
# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::check_table();
#
#}
#
#sub _set_subscriber_preference {
# my ($context,$set_code) = @_;
#
# eval {
# $context->{db}->db_begin();
# #rowprocessingwarn($context->{tid},'AutoCommit is on' ,getlogger(__PACKAGE__)) if $context->{db}->{drh}->{AutoCommit};
#
# my $existing_billing_voip_subscribers = NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::findby_domainid_username_states($context->{db},
# $context->{billing_domain}->{id},$context->{username},{ 'NOT IN' => $NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::TERMINATED_STATE});
# if ((scalar @$existing_billing_voip_subscribers) == 0) {
#
# if ($context->{subscriberdelta} eq
# $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::deleted_delta) {
# _info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' is deleted, and no active subscriber found',1);
# } else {
# _warn($context,"($context->{rownum}) no active subscriber found for susbcriber " . $context->{cli});
# }
# } elsif ((scalar @$existing_billing_voip_subscribers) == 1) {
# $context->{billing_voip_subscriber} = $existing_billing_voip_subscribers->[0];
# $context->{provisioning_voip_subscriber} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid(
# $context->{db},$context->{billing_voip_subscriber}->{uuid});
# if (defined $context->{provisioning_voip_subscriber}) {
# if ($context->{subscriberdelta} eq
# $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::deleted_delta) {
#
# _warn($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ' is deleted, but active subscriber found');
#
# } else {
# if (defined $set_code and 'CODE' eq ref $set_code) {
# &$set_code($context);
# }
# }
# } else {
# if ($skip_errors) {
# _warn($context,"($context->{rownum}) " . 'no provisioning subscriber found: ' . $context->{cli});
# } else {
# _error($context,"($context->{rownum}) " . 'no provisioning subscriber found: ' . $context->{cli});
# }
# }
# } else {
# rowprocessingwarn($context->{tid},"($context->{rownum}) " . 'multiple (' . (scalar @$existing_billing_voip_subscribers) . ') existing billing subscribers with username ' . $context->{username} . ' found, skipping' ,getlogger(__PACKAGE__));
# }
#
# if ($dry) {
# $context->{db}->db_rollback(0);
# } else {
# $context->{db}->db_commit();
# }
#
# };
# my $err = $@;
# if ($err) {
# eval {
# $context->{db}->db_rollback(1);
# };
# if ($skip_errors) {
# _warn($context,"($context->{rownum}) " . 'database error with subscriber ' . $context->{cli} . ': ' . $err);
# } else {
# _error($context,"($context->{rownum}) " . 'database error with subscriber ' . $context->{cli} . ': ' . $err);
# }
# }
#
#}
#
#sub _set_barring_profile {
# my ($context) = @_;
# _set_subscriber_preference($context,\&_set_adm_ncos);
#}
#
#sub _checks {
#
# my ($context) = @_;
#
# my $result = 1;
# #my $optioncount = 0;
# #eval {
# # $optioncount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::FeatureOption::countby_subscribernumber_option();
# #};
# #if ($@ or $optioncount == 0) {
# # rowprocessingerror(threadid(),'please import subscriber features first',getlogger(__PACKAGE__));
# # $result = 0; #even in skip-error mode..
# #}
# my $userpasswordcount = 0;
# eval {
# $userpasswordcount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::countby_fqdn();
# };
# if ($@ or $userpasswordcount == 0) {
# rowprocessingerror(threadid(),'please import user passwords first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
# }
# my $subscribercount = 0;
# my $subscriber_barring_profiles = [];
# eval {
# $subscribercount = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::countby_subscribernumber();
# $subscriber_barring_profiles = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::list_barringprofiles();
# };
# if ($@ or $subscribercount == 0) {
# rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
# }
#
# if ($batch) {
# my $batch_size = 0;
# eval {
# $batch_size = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::countby_delta({ 'NOT IN' =>
# $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Batch::deleted_delta});
# };
# if ($@ or $batch_size == 0) {
# rowprocessingerror(threadid(),'please import a batch first',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
# }
# }
#
# eval {
# $context->{billing_domain} = NGCP::BulkProcessor::Dao::Trunk::billing::domains::findby_domain($domain_name);
# if (defined $context->{billing_domain}
# and NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers::countby_domainid_resellerid($context->{billing_domain}->{id},$reseller_id) == 0) {
# undef $context->{billing_domain};
# }
# };
# if ($@ or not defined $context->{billing_domain}) {
# rowprocessingerror(threadid(),'cannot find billing domain',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
# }
#
# return $result;
#
#}
#
#sub _set_barring_profiles_checks {
# my ($context) = @_;
#
# my $result = _checks($context);
#
# my $subscriber_barring_profiles = [];
# eval {
# $subscriber_barring_profiles = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::list_barringprofiles();
# };
# if ($@ or (scalar @$subscriber_barring_profiles) == 0) {
# rowprocessingerror(threadid(),'subscribers have no barring profiles',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
# }
#
# $context->{ncos_level_map} = {};
# foreach my $barring_profile (@$subscriber_barring_profiles) {
# if (not exists $barring_profiles->{$barring_profile}) {
# rowprocessingerror(threadid(),"mapping for barring profile '" . $barring_profile . "' missing",getlogger(__PACKAGE__));
# #$result = 0; #even in skip-error mode..
# } else {
# my $level = $barring_profiles->{$barring_profile};
# if (not defined $level or length($level) == 0) {
# $context->{ncos_level_map}->{$barring_profile} = undef;
# } else {
# eval {
# $context->{ncos_level_map}->{$barring_profile} = NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels::findby_resellerid_level(
# $reseller_id,$level);
# };
# if ($@ or not defined $context->{ncos_level_map}->{$barring_profile}) {
# rowprocessingerror(threadid(),"cannot find ncos level '$level'",getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
# }
# }
# }
# }
#
# eval {
# $context->{adm_ncos_id_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ADM_NCOS_ID_ATTRIBUTE);
# };
# if ($@ or not defined $context->{adm_ncos_id_attribute}) {
# rowprocessingerror(threadid(),'cannot find adm_ncos_id attribute',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
# }
#
# return $result;
#}
#
#sub _set_adm_ncos {
#
# my ($context) = @_;
#
# $context->{adm_ncos_id_preference_id} = set_preference($context,$context->{provisioning_voip_subscriber}->{id},
# $context->{adm_ncos_id_attribute},defined $context->{ncos_level} ? $context->{ncos_level}->{id} : undef);
#
# _info($context,"($context->{rownum}) " . 'subscriber ' . $context->{cli} . ': ncos level ' .
# (defined $context->{ncos_level} ? "'" . $context->{ncos_level}->{level} . "' set" : 'cleared') .
# " for barring profile '" . $context->{barring_profile} . "'",1);
#
#}
#
#sub _reset_context {
#
# my ($context,$imported_subscriber,$rownum) = @_;
#
# my $result = 1;
#
# $context->{rownum} = $rownum;
#
# $context->{cli} = $imported_subscriber->subscribernumber();
# $context->{e164} = {};
# $context->{e164}->{cc} = substr($context->{cli},0,3);
# $context->{e164}->{ac} = '';
# $context->{e164}->{sn} = substr($context->{cli},3);
#
# $context->{subscriberdelta} = $imported_subscriber->{delta};
#
# my $userpassword = NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::findby_fqdn($context->{cli});
# if (defined $userpassword) {
# $context->{username} = (defined $subsciber_username_prefix ? $subsciber_username_prefix : '') . $userpassword->{username};
# $context->{password} = $userpassword->{password};
# $context->{userpassworddelta} = $userpassword->{delta};
# } else {
# # once full username+passwords is available:
# delete $context->{username};
# delete $context->{password};
# delete $context->{userpassworddelta};
# if ($context->{subscriberdelta} eq
# $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::deleted_delta) {
#
# } else {
# $result &= 0;
#
# # for now, as username+passwords are incomplete:
# #$context->{username} = $context->{e164}->{sn};
# #$context->{password} = $context->{username};
# #$context->{userpassworddelta} = $NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::UsernamePassword::updated_delta;
#
# if ($skip_errors) {
# # for now, as username+passwords are incomplete:
# _warn($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli});
# } else {
# _error($context,"($context->{rownum}) " . 'no username/password for subscriber found: ' . $context->{cli});
# }
# }
# }
#
# delete $context->{billing_voip_subscriber};
# delete $context->{provisioning_voip_subscriber};
#
# return $result;
#
#}
#
#sub _reset_set_barring_profile_context {
#
# my ($context,$imported_subscriber,$rownum) = @_;
#
# my $result = _reset_context($context,$imported_subscriber,$rownum);
#
# $context->{barring_profile} = $imported_subscriber->{barring_profile};
# $context->{ncos_level} = $context->{ncos_level_map}->{$context->{barring_profile}};
#
# delete $context->{adm_ncos_id_preference_id};
#
# return $result;
#
#}
#
#
#
#
#
#sub set_allowed_ips {
#
# my $static_context = {};
# my $result = _set_allowed_ips_checks($static_context);
#
# destroy_all_dbs();
# my $warning_count :shared = 0;
# return ($result && NGCP::BulkProcessor::Projects::Migration::IPGallery::Dao::import::Subscriber::process_records(
# static_context => $static_context,
# process_code => sub {
# my ($context,$records,$row_offset) = @_;
# my $rownum = $row_offset;
# foreach my $imported_subscriber (@$records) {
# $rownum++;
# next unless _reset_set_allowed_ips_context($context,$imported_subscriber,$rownum);
# _set_allowed_ips($context);
# }
# cleanup_aig_sequence_ids($context);
# #return 0;
# return 1;
# },
# init_process_context_code => sub {
# my ($context)= @_;
# $context->{db} = &get_xa_db();
# $context->{error_count} = 0;
# $context->{warning_count} = 0;
# # below is not mandatory..
# _check_insert_tables();
# },
# uninit_process_context_code => sub {
# my ($context)= @_;
# undef $context->{db};
# destroy_all_dbs();
# {
# lock $warning_count;
# $warning_count += $context->{warning_count};
# }
# },
# load_recursive => 0,
# multithreading => $set_allowed_ips_multithreading,
# numofthreads => $set_allowed_ips_numofthreads,
# ),$warning_count);
#}
#
#sub cleanup_aig_sequence_ids {
# my ($context) = @_;
# eval {
# $context->{db}->db_begin();
# if (NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::cleanup_ids($context->{db})) {
# _info($context,'voip_aig_sequence cleaned up');
# }
# if ($dry) {
# $context->{db}->db_rollback(0);
# } else {
# $context->{db}->db_commit();
# }
# };
# my $err = $@;
# if ($err) {
# eval {
# $context->{db}->db_rollback(1);
# };
# if ($skip_errors) {
# _warn($context,"database problem with voip_aig_sequence clean up: " . $err);
# } else {
# _error($context,"database problem with voip_aig_sequence clean up: " . $err);
# }
# }
#}
#
#sub _set_allowed_ips {
# my ($context) = @_;
# _set_subscriber_preference($context,\&_set_allowed_ips_preferences);
#}
#
#sub _set_allowed_ips_checks {
# my ($context) = @_;
#
# my $result = _checks($context);
#
# eval {
# $context->{allowed_ips_grp_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE);
# };
# if ($@ or not defined $context->{allowed_ips_grp_attribute}) {
# rowprocessingerror(threadid(),'cannot find allowed_ips_grp attribute',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
# }
#
# return $result;
#}
#
#sub _set_allowed_ips_preferences {
#
# my ($context) = @_;
#
# my $subscriber_id = $context->{provisioning_voip_subscriber}->{id};
# my $attribute = $context->{allowed_ips_grp_attribute};
#
# my $allowed_ips_grp_attribute_preference = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid(
# $context->{db},$subscriber_id,$attribute->{id})->[0];
#
# if (defined $allowed_ips_grp_attribute_preference) {
# $context->{allowed_ip_group_id} = $allowed_ips_grp_attribute_preference->{value};
# $context->{allowed_ips_grp_attribute_preference_id} = $allowed_ips_grp_attribute_preference->{id};
# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::delete_groupid($context->{db},$context->{allowed_ip_group_id});
# _info($context,"($context->{rownum}) " . 'allowed ips group for subscriber ' . $context->{cli} . ' exists, ipnets deleted',1);
# } else {
# $context->{allowed_ip_group_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::increment($context->{db});
# _info($context,"($context->{rownum}) " . 'new allowed ips group id for subscriber ' . $context->{cli} . ' aquired',1);
# }
#
# $context->{allowed_ips_grp_ipnet_ids} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::insert_rows($context->{db},$context->{allowed_ip_group_id},$context->{allowed_ips});
# _info($context,"($context->{rownum}) " . 'new allowed ips group id for subscriber ' . $context->{cli} . ' aquired',1);
#
# if (not defined $allowed_ips_grp_attribute_preference) {
# $context->{allowed_ips_grp_attribute_preference_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db},
# attribute_id => $attribute->{id},
# subscriber_id => $subscriber_id,
# value => $context->{allowed_ip_group_id},
# );
# _info($context,"($context->{rownum}) " . 'new allowed ips group preference value for subscriber ' . $context->{cli} . ' added',1);
# }
#
#}
#
#sub _reset_set_allowed_ips_context {
#
# my ($context,$imported_subscriber,$rownum) = @_;
#
# my $result = _reset_context($context,$imported_subscriber,$rownum);
#
# $context->{allowed_ips} = $allowed_ips;
#
# delete $context->{allowed_ip_group_id};
# delete $context->{allowed_ips_grp_attribute_preference_id};
# delete $context->{allowed_ips_grp_ipnet_ids};
#
# return $result;
#
#}
sub clear_preferences {
my ($context,$subscriber_id,$attribute,$except_value) = @_;
return NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::delete_preferences($context->{db},
$subscriber_id, $attribute->{id}, defined $except_value ? { 'NOT IN' => $except_value } : undef);
sub cleanup_aig_sequence_ids {
my ($context) = @_;
eval {
$context->{db}->db_begin();
if (NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::cleanup_ids($context->{db})) {
_info($context,'voip_aig_sequence cleaned up');
}
if ($dry) {
$context->{db}->db_rollback(0);
} else {
$context->{db}->db_commit();
}
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
};
if ($skip_errors) {
_warn($context,"database problem with voip_aig_sequence clean up: " . $err);
} else {
_error($context,"database problem with voip_aig_sequence clean up: " . $err);
}
}
}
sub set_allowed_ips_preferences {
my ($context,$subscriber_id,$sip_username,$attribute,$allowed_ips) = @_;
#my $subscriber_id = $context->{prov_subscriber}->{id} ;
#my $attribute = $context->{attributes}->{allowed_ips_grp};
my $allowed_ips_grp_attribute_preference = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid(
$context->{db},$subscriber_id,$attribute->{id})->[0];
my ($allowed_ip_group_id,$allowed_ip_group_preferrence_id);
if (defined $allowed_ips_grp_attribute_preference) {
$allowed_ip_group_id = $allowed_ips_grp_attribute_preference->{value};
$allowed_ip_group_preferrence_id = $allowed_ips_grp_attribute_preference->{id};
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::delete_groupid($context->{db},$allowed_ip_group_id);
_info($context,"allowed ips group for subscriber $sip_username exists, ipnets deleted",1);
} else {
$allowed_ip_group_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::increment($context->{db});
_info($context,"new allowed ips group id for subscriber $sip_username aquired",1);
}
my $allowed_ips_grp_ipnet_ids = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::insert_rows($context->{db},$allowed_ip_group_id,$allowed_ips);
_info($context,"ipnets for allowed ips group for subscriber $sip_username created",1);
if (not defined $allowed_ips_grp_attribute_preference) {
$allowed_ip_group_preferrence_id = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db},
attribute_id => $attribute->{id},
subscriber_id => $subscriber_id,
value => $allowed_ip_group_id,
);
_info($context,"new allowed ips group preference value for subscriber $sip_username added",1);
}
return ($allowed_ip_group_id,$allowed_ip_group_preferrence_id);
#$context->{preferences}->{allowed_ips_grp} = { id => $allowed_ip_group_preferrence_id, $allowed_ip_group_id };
}
sub delete_preference {
my %get_preference_sub_names = (
voip_usr_preferences => 'findby_subscriberid_attributeid',
);
my %preference_id_cols = (
voip_usr_preferences => 'subscriber_id',
);
sub clear_subscriber_preferences {
my ($context,$subscriber_id,$attribute,$except_value) = @_;
return _clear_preferences($context,'voip_usr_preferences',$subscriber_id,$attribute,$except_value);
}
sub delete_subscriber_preference {
my ($context,$subscriber_id,$attribute,$value) = @_;
return _delete_preference($context,'voip_usr_preferences',$subscriber_id,$attribute,$value);
}
sub set_subscriber_preference {
my ($context,$subscriber_id,$attribute,$value) = @_;
return _set_preference($context,'voip_usr_preferences',$subscriber_id,$attribute,$value);
}
sub get_subscriber_preference {
my ($context,$subscriber_id,$attribute) = @_;
return _get_preference($context,'voip_usr_preferences',$subscriber_id,$attribute);
}
sub _clear_preferences {
my ($context,$pref_type,$id,$attribute,$except_value) = @_;
return NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::delete_preferences($context->{db},
$subscriber_id, $attribute->{id}, { 'IN' => $value } );
return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db},
$id, $attribute->{id}, defined $except_value ? { 'NOT IN' => $except_value } : undef);
}
sub set_preference {
my ($context,$subscriber_id,$attribute,$value) = @_;
sub _delete_preference {
my ($context,$pref_type,$id,$attribute,$value) = @_;
return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db},
$id, $attribute->{id}, { 'IN' => $value } );
}
sub _set_preference {
my ($context,$pref_type,$id,$attribute,$value) = @_;
if ($attribute->{max_occur} == 1) {
my $old_preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db},
$subscriber_id,$attribute->{id});
my $old_preferences = &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::' . $get_preference_sub_names{$pref_type}}($context->{db},
$id,$attribute->{id});
if (defined $value) {
if ((scalar @$old_preferences) == 1) {
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::update_row($context->{db},{
&{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::update_row'}($context->{db},{
id => $old_preferences->[0]->{id},
value => $value,
});
return $old_preferences->[0]->{id};
} else {
if ((scalar @$old_preferences) > 1) {
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::delete_preferences($context->{db},
$subscriber_id,$attribute->{id});
&{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db},
$id,$attribute->{id});
}
return NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db},
return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::insert_row'}($context->{db},
attribute_id => $attribute->{id},
subscriber_id => $subscriber_id,
$preference_id_cols{$pref_type} => $id,
value => $value,
);
}
} else {
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::delete_preferences($context->{db},
$subscriber_id,$attribute->{id});
&{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db},
$id,$attribute->{id});
return undef;
}
} else {
if (defined $value) {
return NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::insert_row($context->{db},
return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::insert_row'}($context->{db},
attribute_id => $attribute->{id},
subscriber_id => $subscriber_id,
$preference_id_cols{$pref_type} => $id,
value => $value,
);
} else {
@ -616,11 +213,12 @@ sub set_preference {
}
sub get_preference {
my ($context,$subscriber_id,$attribute) = @_;
sub _get_preference {
my ($context,$pref_type,$id,$attribute) = @_;
my $preferences = &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::' . $get_preference_sub_names{$pref_type}}($context->{db},
$id,$attribute->{id});
my $preferences = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid_attributeid($context->{db},
$subscriber_id,$attribute->{id});
if ($attribute->{max_occur} == 1) {
return $preferences->[0];
} else {

@ -16,22 +16,13 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
$provision_subscriber_multithreading
$provision_subscriber_numofthreads
$webpassword_length
$webusername_length
$reseller_mapping
$barring_profiles
);
#$batch
#$reseller_id
#$domain_name
#$subsciber_username_prefix
#$billing_profile_id
#$contact_email_format
#$webpassword_length
#$generate_webpassword
#$reprovision_upon_password_change
#$always_update_subscriber
use NGCP::BulkProcessor::Logging qw (
getlogger
@ -45,6 +36,7 @@ use NGCP::BulkProcessor::LogError qw(
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::products qw();
@ -57,6 +49,7 @@ use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
@ -70,9 +63,12 @@ use NGCP::BulkProcessor::RestRequests::Trunk::Subscribers qw();
use NGCP::BulkProcessor::RestRequests::Trunk::Customers qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Preferences qw(
set_preference
clear_preferences
delete_preference
set_subscriber_preference
get_subscriber_preference
clear_subscriber_preferences
delete_subscriber_preference
set_allowed_ips_preferences
cleanup_aig_sequence_ids
);
use NGCP::BulkProcessor::ConnectorPool qw(
@ -83,8 +79,9 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw(
destroy_all_dbs
);
use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp);
use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp stringtobool check_ipnet trim);
use NGCP::BulkProcessor::DSSorter qw(sort_by_configs);
use NGCP::BulkProcessor::RandomString qw(createtmpstring);
require Exporter;
our @ISA = qw(Exporter);
@ -93,9 +90,15 @@ our @EXPORT_OK = qw(
);
my $split_ipnets_pattern = join('|',(
quotemeta(','),
quotemeta(';'),
#quotemeta('/')
));
sub provision_subscribers {
my $static_context = { now => timestamp() };
my $static_context = { now => timestamp(), rowcount => undef };
my $result = _provision_subscribers_checks($static_context);
destroy_all_dbs();
@ -105,11 +108,13 @@ sub provision_subscribers {
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
$context->{rowcount} = $row_offset;
foreach my $domain_sipusername (@$records) {
$context->{rowcount} += 1;
next unless _provision_susbcriber($context,
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_domain_sipusername(@$domain_sipusername));
}
cleanup_aig_sequence_ids($context);
return 1;
},
init_process_context_code => sub {
@ -177,6 +182,7 @@ sub _provision_susbcriber {
_update_contract($context);
_update_subscriber($context);
_create_aliases($context);
_update_preferences($context);
#todo: additional prefs, AllowedIPs, NCOS, Callforwards. still thinking wether to integrate it
#in this main provisioning loop, or align it in separate run-modes, according to the files given.
@ -212,6 +218,39 @@ sub _provision_subscribers_checks {
my $result = 1;
my $subscribercount = 0;
eval {
$subscribercount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::countby_ccacsn();
};
if ($@ or $subscribercount == 0) {
rowprocessingerror(threadid(),'please import subscribers first',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"$subscribercount subscriber found",getlogger(__PACKAGE__));
}
my $allowedclicount = 0;
eval {
$allowedclicount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::countby_ccacsn();
};
if ($@ or $allowedclicount == 0) {
rowprocessingerror(threadid(),'please import allowed clis first',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"$allowedclicount allowed clis found",getlogger(__PACKAGE__));
}
my $clircount = 0;
eval {
$clircount = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_clir();
};
if ($@ or $clircount == 0) {
rowprocessingerror(threadid(),'please import clir first',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"$clircount clir records found",getlogger(__PACKAGE__));
}
my $domain_billingprofilename_resellernames = [];
eval {
$domain_billingprofilename_resellernames = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::list_domain_billingprofilename_resellernames();
@ -247,6 +286,7 @@ sub _provision_subscribers_checks {
$result = 0; #even in skip-error mode..
} else {
$context->{reseller_map}->{$resellername}->{billingprofile_map} = {};
processing_info(threadid(),"reseller '$resellername' found",getlogger(__PACKAGE__));
}
}
if (not exists $context->{domain_map}->{$domain}) {
@ -254,16 +294,19 @@ sub _provision_subscribers_checks {
$context->{domain_map}->{$domain} = NGCP::BulkProcessor::Dao::Trunk::billing::domains::findby_domain($domain);
};
if ($@ or not $context->{domain_map}->{$domain}) {
rowprocessingerror(threadid(),"cannot find domain '$domain' (billing)",getlogger(__PACKAGE__));
rowprocessingerror(threadid(),"cannot find billing domain '$domain'",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"billing domain '$domain' found",getlogger(__PACKAGE__));
eval {
$context->{domain_map}->{$domain}->{prov_domain} =
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains::findby_domain($domain);
};
if ($@ or not $context->{domain_map}->{$domain}->{prov_domain}) {
rowprocessingerror(threadid(),"cannot find domain '$domain' (provisioning)",getlogger(__PACKAGE__));
rowprocessingerror(threadid(),"cannot find provisioning domain '$domain'",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"provisioning domain '$domain' found",getlogger(__PACKAGE__));
}
}
}
@ -276,6 +319,8 @@ sub _provision_subscribers_checks {
if ($@ or not $domain_reseller) {
rowprocessingerror(threadid(),"domain $domain does not belong to reseller $resellername",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"domain $domain belongs to reseller $resellername",getlogger(__PACKAGE__));
}
if ($context->{reseller_map}->{$resellername}->{billingprofile_map} and
@ -291,6 +336,8 @@ sub _provision_subscribers_checks {
if ($@ or not $context->{reseller_map}->{$resellername}->{billingprofile_map}->{$billingprofilename}) {
rowprocessingerror(threadid(),"cannot find billing profile '$billingprofilename' of reseller '$resellername'",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"billing profile '$billingprofilename' of reseller '$resellername' found",getlogger(__PACKAGE__));
}
}
}
@ -303,6 +350,8 @@ sub _provision_subscribers_checks {
if ($@ or not defined $context->{sip_account_product}) {
rowprocessingerror(threadid(),"cannot find $NGCP::BulkProcessor::Dao::Trunk::billing::products::SIP_ACCOUNT_HANDLE product",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"$NGCP::BulkProcessor::Dao::Trunk::billing::products::SIP_ACCOUNT_HANDLE product found",getlogger(__PACKAGE__));
}
$context->{attributes} = {};
@ -314,6 +363,8 @@ sub _provision_subscribers_checks {
if ($@ or not defined $context->{attributes}->{allowed_clis}) {
rowprocessingerror(threadid(),'cannot find allowed_clis attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"allowed_clis attribute found",getlogger(__PACKAGE__));
}
eval {
@ -323,6 +374,8 @@ sub _provision_subscribers_checks {
if ($@ or not defined $context->{attributes}->{cli}) {
rowprocessingerror(threadid(),'cannot find cli attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"cli attribute found",getlogger(__PACKAGE__));
}
eval {
@ -332,6 +385,8 @@ sub _provision_subscribers_checks {
if ($@ or not defined $context->{attributes}->{ac}) {
rowprocessingerror(threadid(),'cannot find ac attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"ac attribute found",getlogger(__PACKAGE__));
}
eval {
@ -341,6 +396,8 @@ sub _provision_subscribers_checks {
if ($@ or not defined $context->{attributes}->{cc}) {
rowprocessingerror(threadid(),'cannot find cc attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"cc attribute found",getlogger(__PACKAGE__));
}
eval {
@ -350,21 +407,119 @@ sub _provision_subscribers_checks {
if ($@ or not defined $context->{attributes}->{account_id}) {
rowprocessingerror(threadid(),'cannot find account_id attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"account_id attribute found",getlogger(__PACKAGE__));
}
eval {
$context->{attributes}->{concurrent_max_total} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CONCURRENT_MAX_TOTAL_ATTRIBUTE);
};
if ($@ or not defined $context->{attributes}->{concurrent_max_total}) {
rowprocessingerror(threadid(),'cannot find concurrent_max_total attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"concurrent_max_total attribute found",getlogger(__PACKAGE__));
}
eval {
$context->{attributes}->{clir} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CLIR_ATTRIBUTE);
};
if ($@ or not defined $context->{attributes}->{clir}) {
rowprocessingerror(threadid(),'cannot find clir attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"clir attribute found",getlogger(__PACKAGE__));
}
eval {
$context->{attributes}->{allowed_ips_grp} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE);
};
if ($@ or not defined $context->{attributes}->{allowed_ips_grp}) {
rowprocessingerror(threadid(),'cannot find allowed_ips_grp attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"allowed_ips_grp attribute found",getlogger(__PACKAGE__));
}
#eval {
# $context->{peer_auth_pass_attribute} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::PEER_AUTH_PASS);
#};
#if ($@ or not defined $context->{peer_auth_pass_attribute}) {
# rowprocessingerror(threadid(),'cannot find peer_auth_pass attribute',getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
my $barring_resellernames = [];
eval {
$barring_resellernames = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::list_barring_resellernames();
};
if ($@) {
rowprocessingerror(threadid(),'error retrieving barrings',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
$context->{ncos_level_map} = {};
foreach my $barring_resellername (@$barring_resellernames) {
my $resellername = _apply_reseller_mapping($barring_resellername->{reseller_name});
#unless ($resellername) {
# rowprocessingerror(threadid(),"empty reseller name detected",getlogger(__PACKAGE__));
# $result = 0; #even in skip-error mode..
#}
my $barring = $barring_resellername->{barrings};
next unless ($barring);
$result &= _check_ncos_level($context,$resellername,$barring);
}
}
eval {
$context->{attributes}->{adm_ncos_id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ADM_NCOS_ID_ATTRIBUTE);
};
if ($@ or not defined $context->{attributes}->{adm_ncos_id}) {
rowprocessingerror(threadid(),'cannot find adm_ncos_id attribute',getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"adm_ncos_id attribute found",getlogger(__PACKAGE__));
}
return $result;
}
sub _check_ncos_level {
my ($context,$resellername,$barring) = @_;
my $result = 1;
if (not exists $barring_profiles->{$resellername}) {
rowprocessingerror(threadid(),"barring mappings for reseller $resellername missing",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} elsif (not exists $barring_profiles->{$resellername}->{$barring}) {
rowprocessingerror(threadid(),"mappings for barring '" . $barring . "' of reseller $resellername missing",getlogger(__PACKAGE__));
$result = 0; #even in skip-error mode..
} else {
my $reseller_id = $context->{reseller_map}->{$resellername}->{id};
$context->{ncos_level_map}->{$reseller_id} = {} unless exists $context->{ncos_level_map}->{$reseller_id};
my $level = $barring_profiles->{$resellername}->{$barring};
unless (exists $context->{ncos_level_map}->{$reseller_id}->{$barring}) {
if (not defined $level or length($level) == 0) {
$context->{ncos_level_map}->{$reseller_id}->{$barring} = undef;
} else {
eval {
$context->{ncos_level_map}->{$reseller_id}->{$barring} = NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels::findby_resellerid_level(
$reseller_id,$level);
};
if ($@ or not defined $context->{ncos_level_map}->{$reseller_id}->{$barring}) {
my $err = "cannot find ncos level '$level' of reseller $resellername";
if (not defined $context->{rowcount}) {
rowprocessingerror(threadid(),$err,getlogger(__PACKAGE__));
} elsif ($skip_errors) {
_warn($context, $err);
} else {
_error($context, $err);
}
$result = 0; #even in skip-error mode..
} else {
processing_info(threadid(),"ncos level '$level' of reseller $resellername found",getlogger(__PACKAGE__));
}
}
}
}
return $result;
}
sub _update_contact {
my ($context) = @_;
@ -378,6 +533,7 @@ sub _update_contact {
$context->{contract}->{contact},
);
$context->{contract}->{contact_id} = $context->{contract}->{contact}->{id};
_info($context,"contact id $context->{contract}->{contact}->{id} created",1);
}
return 1;
@ -407,6 +563,8 @@ sub _update_contract {
$context->{contract}->{contract_balance_id} = NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances::insert_row($context->{db},
contract_id => $context->{contract}->{id},
);
_info($context,"contract id $context->{contract}->{id} created",1);
}
return 1;
@ -454,7 +612,7 @@ sub _update_subscriber {
);
}
$context->{preferences}->{cli} = { id => set_preference($context,
$context->{preferences}->{cli} = { id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{cli},
$number->{number}), value => $number->{number} };
@ -464,6 +622,8 @@ sub _update_subscriber {
primary_number_id => $context->{voip_numbers}->{primary}->{id},
});
_info($context,"subscriber uuid $context->{prov_subscriber}->{uuid} created",1);
#primary alias
$context->{aliases}->{primary}->{id} = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::insert_row($context->{db},
domain_id => $context->{prov_subscriber}->{domain_id},
@ -472,7 +632,7 @@ sub _update_subscriber {
);
my @allowed_clis = ();
push(@allowed_clis,{ id => set_preference($context,
push(@allowed_clis,{ id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{allowed_clis},
$number->{number}), value => $number->{number}});
@ -484,38 +644,88 @@ sub _update_subscriber {
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::delete_dbaliases($context->{db},
$context->{prov_subscriber}->{id},{ 'NOT IN' => $number->{number} });
clear_preferences($context,
clear_subscriber_preferences($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{allowed_clis},
$number->{number});
_info($context,"primary alias $number->{number} created",1);
$context->{voicemail_user}->{id} = NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users::insert_row($context->{db},
$context->{voicemail_user},
);
$context->{preferences}->{account_id} = { id => set_preference($context,
$context->{preferences}->{account_id} = { id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{account_id},
$context->{contract}->{id}), value => $context->{contract}->{id} };
if (length($number->{ac}) > 0) {
$context->{preferences}->{ac} = { id => set_preference($context,
$context->{preferences}->{ac} = { id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{ac},
$number->{ac}), value => $number->{ac} };
}
if (length($number->{cc}) > 0) {
$context->{preferences}->{cc} = { id => set_preference($context,
$context->{preferences}->{cc} = { id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{cc},
$number->{cc}), value => $number->{cc} };
}
}
return $result;
}
sub _update_preferences {
my ($context) = @_;
my $result = 1;
if (defined $context->{channels}) {
$context->{preferences}->{concurrent_max_total} = { id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{concurrent_max_total},
$context->{channels}), value => $context->{channels} };
_info($context,"concurrent_max_total preference set to $context->{channels}",1);
}
if ($context->{clir}) {
$context->{preferences}->{clir} = { id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{clir},
$NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::TRUE), value => $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::TRUE };
_info($context,"clir preference set to $context->{clir}",1);
}
if ((scalar @{$context->{allowed_ips}}) > 0) {
my ($allowed_ip_group_preferrence_id, $allowed_ip_group_id) = set_allowed_ips_preferences($context,
$context->{prov_subscriber}->{id},
$context->{prov_subscriber}->{username},
$context->{attributes}->{allowed_ips_grp},
$context->{allowed_ips},
);
$context->{preferences}->{allowed_ips_grp} = { id => $allowed_ip_group_preferrence_id, value => $allowed_ip_group_id };
_info($context,"allowed_ips_grp preference set to $allowed_ip_group_id - " . join(',',@{$context->{allowed_ips}}),1);
}
if (defined $context->{ncos_level}) {
$context->{preferences}->{adm_ncos_id} = { id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{adm_ncos_id},
$context->{ncos_level}->{id}), value => $context->{ncos_level}->{id} };
_info($context,"adm_ncos_id preference set to $context->{ncos_level}->{id} - $context->{ncos_level}->{level}",1);
}
return $result;
}
sub _create_aliases {
my ($context) = @_;
@ -576,15 +786,16 @@ sub _create_aliases {
push(@{$context->{aliases}->{other}},$alias);
push(@usernames,$number->{number});
delete_preference($context,
delete_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{allowed_clis},
$number->{number});
push(@{$context->{preferences}->{allowed_clis}},{ id => set_preference($context,
push(@{$context->{preferences}->{allowed_clis}},{ id => set_subscriber_preference($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{allowed_clis},
$number->{number}), value => $number->{number}});
_info($context,"alias $number->{number} created",1);
}
push(@voip_number_ids,$context->{voip_numbers}->{primary}->{id});
@ -596,11 +807,23 @@ sub _create_aliases {
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::delete_dbaliases($context->{db},$context->{prov_subscriber}->{id},
{ 'NOT IN' => \@usernames });
clear_preferences($context,
clear_subscriber_preferences($context,
$context->{prov_subscriber}->{id},
$context->{attributes}->{allowed_clis},
\@usernames );
#test:
#my $allowed_clis = get_subscriber_preference($context,
# $context->{prov_subscriber}->{id},
# $context->{attributes}->{allowed_clis});
#my $voip_numbers = NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::findby_subscriberid($context->{db},
# $context->{bill_subscriber}->{id});
#my $aliases = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberid_username($context->{db},
# $context->{prov_subscriber}->{id},undef);
#_info($context,(scalar @{$context->{numbers}->{other}}) . " aliases created: " . join(',',(map { $_->{number}; } @{$context->{numbers}->{other}})));
}
return $result;
}
@ -619,25 +842,17 @@ sub _provision_susbcriber_init_context {
}
$context->{domain} = $context->{domain_map}->{$first->{domain}};
$context->{reseller} = $context->{reseller_map}->{_apply_reseller_mapping($first->{reseller_name})};
my $resellername = _apply_reseller_mapping($first->{reseller_name});
$context->{reseller} = $context->{reseller_map}->{$resellername};
$context->{billing_profile} = $context->{reseller}->{billingprofile_map}->{$first->{billing_profile_name}};
$context->{prov_subscriber} = {};
$context->{prov_subscriber}->{username} = $first->{sip_username};
$context->{prov_subscriber}->{password} = $first->{sip_password};
$context->{prov_subscriber}->{webusername} = $first->{web_username};
if (not (defined $first->{web_username} and length($first->{web_username}) > 0)) {
$context->{prov_subscriber}->{webusername} = undef;
} else {
my %webusername_dupes = map { $_->{sip_username} => 1; }
@{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_domain_webusername(
$first->{domain},$context->{prov_subscriber}->{webusername})};
if ((scalar keys %webusername_dupes) > 1) {
#_warn($context,"duplicate web_username $context->{prov_subscriber}->{webusername}, using sip_username");
$context->{prov_subscriber}->{webusername} = $first->{sip_username};
}
}
$context->{prov_subscriber}->{webpassword} = $first->{web_password};
my $webusername = $first->{web_username};
$context->{prov_subscriber}->{uuid} = create_uuid();
$context->{prov_subscriber}->{domain_id} = $context->{domain}->{prov_domain}->{id};
@ -647,17 +862,20 @@ sub _provision_susbcriber_init_context {
$context->{bill_subscriber}->{uuid} = $context->{prov_subscriber}->{uuid};
undef $context->{contract};
undef $context->{channels};
my @numbers = ();
my %number_dupes = ();
my %contact_dupes = ();
my %allowed_ips = ();
my %barrings = ();
foreach my $subscriber (@$subscriber_group) {
my $number = ($subscriber->{cc} // '') . ($subscriber->{ac} // '') . ($subscriber->{sn} // '');
my $number = $subscriber->{cc} . $subscriber->{ac} . $subscriber->{sn};
if (not exists $number_dupes{$number}) {
push(@numbers,{
cc => $subscriber->{cc} // '',
ac => $subscriber->{ac} // '',
sn => $subscriber->{sn} // '',
cc => $subscriber->{cc},
ac => $subscriber->{ac},
sn => $subscriber->{sn},
number => $number,
delta => $subscriber->{delta},
additional => 0,
@ -693,15 +911,100 @@ sub _provision_susbcriber_init_context {
_warn($context,'non-unique contact hash, skipped');
}
}
my $channels = $subscriber->{channels};
if (defined $channels and length($channels) > 0) {
if (not ($channels > 0)) {
_warn($context,"invalid number of channels $subscriber->{channels}, ignoring");
} elsif (not defined $context->{channels} or $channels > $context->{channels}) {
$context->{channels} = $channels;
}
}
#print $subscriber->{allowed_ips} . "\n";
if (defined $subscriber->{allowed_ips} and length($subscriber->{allowed_ips}) > 0) {
foreach my $ipnet (map { local $_ = $_; trim($_); } split(/$split_ipnets_pattern/,$subscriber->{allowed_ips})) {
if (check_ipnet($ipnet)) {
if ('0.0.0.0' ne $ipnet) {
$allowed_ips{$ipnet} = 1;
} else {
_info($context,"allowed_ip '$ipnet' ignored",1);
}
} else {
_warn($context,"invalid allowed_ip '$ipnet', ignored");
}
}
}
#$context->{allowed_ips} = \@allowed_ips;
unless (defined $context->{prov_subscriber}->{password} and length($context->{prov_subscriber}->{password}) > 0) {
$context->{prov_subscriber}->{password} = $subscriber->{sip_password};
}
unless (defined $context->{prov_subscriber}->{webusername} and length($context->{prov_subscriber}->{webusername}) > 0
and defined $context->{prov_subscriber}->{webpassword} and length($context->{prov_subscriber}->{webpassword}) > 0) {
$context->{prov_subscriber}->{webusername} = $subscriber->{web_username};
$context->{prov_subscriber}->{webpassword} = $subscriber->{web_password};
}
unless (defined $webusername and length($webusername) > 0) {
$webusername = $subscriber->{web_username};
}
if (defined $subscriber->{barrings} and length($subscriber->{barrings}) > 0) {
$barrings{$subscriber->{barrings}} = 1;
}
}
unless (defined $context->{prov_subscriber}->{webusername} and length($context->{prov_subscriber}->{webusername}) > 0) {
$context->{prov_subscriber}->{webusername} = $webusername;
$context->{prov_subscriber}->{webpassword} = undef;
}
if (not (defined $context->{prov_subscriber}->{webusername} and length($context->{prov_subscriber}->{webusername}) > 0)) {
$context->{prov_subscriber}->{webusername} = undef;
$context->{prov_subscriber}->{webpassword} = undef;
_info($context,"empty web_username for sip_username '$first->{sip_username}'",1);
} else {
$webusername = $context->{prov_subscriber}->{webusername};
my %webusername_dupes = map { $_->{sip_username} => 1; }
@{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber::findby_domain_webusername(
$first->{domain},$webusername)};
if ((scalar keys %webusername_dupes) > 1) {
my $generated = _generate_webusername(); #$first->{sip_username};
_info($context,"duplicate web_username '$webusername', using generated '$generated'",1);
$context->{prov_subscriber}->{webusername} = $generated;
}
#$context->{prov_subscriber}->{webpassword} = $first->{web_password};
if (not (defined $context->{prov_subscriber}->{webpassword} and length($context->{prov_subscriber}->{webpassword}) > 0)) {
my $generated = _generate_webpassword();
_info($context,"empty web_password for web_username '$webusername', using generated '$generated'",1);
$context->{prov_subscriber}->{webpassword} = $generated;
#} elsif (defined $first->{web_password} and length($first->{web_password}) < 8) {
# $context->{prov_subscriber}->{webpassword} = _generate_webpassword();
# _info($context,"web_password for web_username '$first->{web_username}' is too short, using '$context->{prov_subscriber}->{webpassword}'");
}
}
$context->{allowed_ips} = [ keys %allowed_ips ];
$context->{ncos_level} = undef;
if ((scalar keys %barrings) > 1) {
my $combined_barring = join('_',sort keys %barrings);
#$result &=
_check_ncos_level($context,$resellername,$combined_barring);
_info($context,"barrings combination $combined_barring");
$context->{ncos_level} = $context->{ncos_level_map}->{$context->{reseller}->{id}}->{$combined_barring};
} elsif ((scalar keys %barrings) == 1) {
my ($barring) = keys %barrings;
$context->{ncos_level} = $context->{ncos_level_map}->{$context->{reseller}->{id}}->{$barring};
}
foreach my $allowed_cli (@{NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli::findby_sipusername($first->{sip_username})}) {
my $number = ($allowed_cli->{cc} // '') . ($allowed_cli->{ac} // '') . ($allowed_cli->{sn} // '');
my $number = $allowed_cli->{cc} . $allowed_cli->{ac} . $allowed_cli->{sn};
if (not exists $number_dupes{$number}) {
push(@numbers,{
cc => $allowed_cli->{cc} // '',
ac => $allowed_cli->{ac} // '',
sn => $allowed_cli->{sn} // '',
cc => $allowed_cli->{cc},
ac => $allowed_cli->{ac},
sn => $allowed_cli->{sn},
number => $number,
delta => $allowed_cli->{delta},
additional => 1,
@ -732,6 +1035,7 @@ sub _provision_susbcriber_init_context {
},
]);
$context->{numbers}->{primary} = shift(@{$context->{numbers}->{other}});
#return 0 unless scalar @{$context->{numbers}->{other}};
$context->{voip_numbers} = {};
$context->{voip_numbers}->{primary} = undef;
@ -740,24 +1044,35 @@ sub _provision_susbcriber_init_context {
$context->{aliases}->{primary} = undef;
$context->{aliases}->{other} = [];
$context->{preferences} = {};
$context->{voicemail_user} = {};
$context->{voicemail_user}->{customer_id} = $context->{prov_subscriber}->{uuid};
$context->{voicemail_user}->{mailbox} = $context->{numbers}->{primary}->{number};
$context->{voicemail_user}->{password} = sprintf("%04d", int(rand 10000));
$context->{preferences} = {};
$context->{clir} = 0;
if (my $clir = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::findby_sipusername($first->{sip_username})) {
$context->{clir} = stringtobool($clir->{clir});
}
#$context->{counts} = {} unless defined $context->{counts};
return $result;
}
#sub _generate_webpassword {
# return String::MkPasswd::mkpasswd(
# -length => $webpassword_length,
# -minnum => 1, -minlower => 1, -minupper => 1, -minspecial => 1,
# -distribute => 1, -fatal => 1,
# );
#}
sub _generate_webpassword {
return String::MkPasswd::mkpasswd(
-length => $webpassword_length,
-minnum => 1, -minlower => 1, -minupper => 1, -minspecial => 1,
-distribute => 1, -fatal => 1,
);
}
sub _generate_webusername {
return createtmpstring($webusername_length);
}
sub _apply_reseller_mapping {
my $reseller_name = shift;

@ -26,13 +26,15 @@ use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(format_number check_ipnet prompt);
use NGCP::BulkProcessor::Utils qw(prompt);
#format_number check_ipnet
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
update_settings
update_reseller_mapping
update_barring_profiles
check_dry
$input_path
@ -55,20 +57,25 @@ our @EXPORT_OK = qw(
$subscriber_import_unfold_ranges
$reseller_mapping_yml
$reseller_mapping
$barring_profiles_yml
$barring_profiles
@allowedcli_filenames
$allowedcli_import_numofthreads
$ignore_allowedcli_unique
$allowedcli_import_single_row_txn
$allowedcli_import_unfold_ranges
@clir_filenames
$clir_import_numofthreads
$ignore_clir_unique
$clir_import_single_row_txn
$provision_subscriber_multithreading
$provision_subscriber_numofthreads
$always_update_subscriber
$webpassword_length
$webusername_length
$set_allowed_ips_multithreading
$set_allowed_ips_numofthreads
$allowed_ips
$set_call_forwards_multithreading
$set_call_forwards_numofthreads
@ -89,6 +96,9 @@ our @EXPORT_OK = qw(
);
#$concurrent_max_total
# $set_allowed_ips_multithreading
# $set_allowed_ips_numofthreads
# $allowed_ips
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
@ -110,21 +120,27 @@ our $subscriber_import_single_row_txn = 1;
our $subscriber_import_unfold_ranges = 1;
our $reseller_mapping_yml = undef;
our $reseller_mapping = {};
our $barring_profiles_yml = undef;
our $barring_profiles = {};
our @allowedcli_filenames = ();
our $allowedcli_import_numofthreads = $cpucount;
our $ignore_allowedcli_unique = 0;
our $allowedcli_import_single_row_txn = 1;
our $allowedcli_import_unfold_ranges = 1;
our @clir_filenames = ();
our $clir_import_numofthreads = $cpucount;
our $ignore_clir_unique = 0;
our $clir_import_single_row_txn = 1;
our $provision_subscriber_multithreading = $enablemultithreading;
our $provision_subscriber_numofthreads = $cpucount;
our $always_update_subscriber = 0;
our $set_allowed_ips_multithreading = $enablemultithreading;
our $set_allowed_ips_numofthreads = $cpucount;
our $allowed_ips = [];
our $webpassword_length = 8;
our $webusername_length = 8;
#our $set_allowed_ips_multithreading = $enablemultithreading;
#our $set_allowed_ips_numofthreads = $cpucount;
#our $allowed_ips = [];
our $set_call_forwards_multithreading = $enablemultithreading;
our $set_call_forwards_numofthreads = $cpucount;
@ -140,9 +156,9 @@ our $cfnumber_exclude_pattern = undef;
our $cfnumber_trim_pattern = undef;
our $ringtimeout = undef;
our $set_preference_bulk_multithreading = $enablemultithreading;
our $set_preference_bulk_numofthreads = $cpucount;
our $concurrent_max_total = undef;
#our $set_preference_bulk_multithreading = $enablemultithreading;
#our $set_preference_bulk_numofthreads = $cpucount;
#our $concurrent_max_total = undef;
sub update_settings {
@ -168,26 +184,40 @@ sub update_settings {
$subscriber_import_single_row_txn = $data->{subscriber_import_single_row_txn} if exists $data->{subscriber_import_single_row_txn};
$subscriber_import_unfold_ranges = $data->{subscriber_import_unfold_ranges} if exists $data->{subscriber_import_unfold_ranges};
$reseller_mapping_yml = $data->{reseller_mapping_yml} if exists $data->{reseller_mapping_yml};
$barring_profiles_yml = $data->{barring_profiles_yml} if exists $data->{barring_profiles_yml};
@allowedcli_filenames = _get_import_filenames(\@allowedcli_filenames,$data,'allowedcli_filenames');
$allowedcli_import_numofthreads = _get_numofthreads($cpucount,$data,'allowedcli_import_numofthreads');
$ignore_allowedcli_unique = $data->{ignore_allowedcli_unique} if exists $data->{ignore_allowedcli_unique};
$allowedcli_import_single_row_txn = $data->{allowedcli_import_single_row_txn} if exists $data->{allowedcli_import_single_row_txn};
$allowedcli_import_unfold_ranges = $data->{allowedcli_import_unfold_ranges} if exists $data->{allowedcli_import_unfold_ranges};
@clir_filenames = _get_import_filenames(\@clir_filenames,$data,'clir_filenames');
$clir_import_numofthreads = _get_numofthreads($cpucount,$data,'clir_import_numofthreads');
$ignore_clir_unique = $data->{ignore_clir_unique} if exists $data->{ignore_clir_unique};
$clir_import_single_row_txn = $data->{clir_import_single_row_txn} if exists $data->{clir_import_single_row_txn};
$provision_subscriber_multithreading = $data->{provision_subscriber_multithreading} if exists $data->{provision_subscriber_multithreading};
$provision_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_subscriber_numofthreads');
$always_update_subscriber = $data->{always_update_subscriber} if exists $data->{always_update_subscriber};
$set_allowed_ips_multithreading = $data->{set_allowed_ips_multithreading} if exists $data->{set_allowed_ips_multithreading};
$set_allowed_ips_numofthreads = _get_numofthreads($cpucount,$data,'set_allowed_ips_numofthreads');
$allowed_ips = [ split_tuple($data->{allowed_ips}) ] if exists $data->{allowed_ips};
foreach my $ipnet (@$allowed_ips) {
if (not check_ipnet($ipnet)) {
configurationerror($configfile,"invalid allowed_ip '$ipnet'",getlogger(__PACKAGE__));
$result = 0;
}
$webpassword_length = $data->{webpassword_length} if exists $data->{webpassword_length};
if (not defined $webpassword_length or $webpassword_length <= 7) {
configurationerror($configfile,'webpassword_length greater than 7 required',getlogger(__PACKAGE__));
$result = 0;
}
$webusername_length = $data->{webusername_length} if exists $data->{webusername_length};
if (not defined $webusername_length or $webusername_length <= 7) {
configurationerror($configfile,'webusername_length greater than 7 required',getlogger(__PACKAGE__));
$result = 0;
}
#$set_allowed_ips_multithreading = $data->{set_allowed_ips_multithreading} if exists $data->{set_allowed_ips_multithreading};
#$set_allowed_ips_numofthreads = _get_numofthreads($cpucount,$data,'set_allowed_ips_numofthreads');
#$allowed_ips = [ split_tuple($data->{allowed_ips}) ] if exists $data->{allowed_ips};
#foreach my $ipnet (@$allowed_ips) {
# if (not check_ipnet($ipnet)) {
# configurationerror($configfile,"invalid allowed_ip '$ipnet'",getlogger(__PACKAGE__));
# $result = 0;
# }
#}
$set_call_forwards_multithreading = $data->{set_call_forwards_multithreading} if exists $data->{set_call_forwards_multithreading};
$set_call_forwards_numofthreads = _get_numofthreads($cpucount,$data,'set_call_forwards_numofthreads');
@ -211,8 +241,8 @@ sub update_settings {
$result = 0;
}
$set_preference_bulk_multithreading = $data->{set_preference_bulk_multithreading} if exists $data->{set_preference_bulk_multithreading};
$set_preference_bulk_numofthreads = _get_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads');
#$set_preference_bulk_multithreading = $data->{set_preference_bulk_multithreading} if exists $data->{set_preference_bulk_multithreading};
#$set_preference_bulk_numofthreads = _get_numofthreads($cpucount,$data,'set_preference_bulk_numofthreads');
#$concurrent_max_total = $data->{concurrent_max_total} if exists $data->{concurrent_max_total};
#if (defined $concurrent_max_total and $concurrent_max_total <= 0) {
# configurationerror($configfile,'empty concurrent_max_total or greater than 0 required',getlogger(__PACKAGE__));
@ -312,4 +342,27 @@ sub update_reseller_mapping {
}
sub update_barring_profiles {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
eval {
$barring_profiles = $data->{'mapping'};
};
if ($@ or 'HASH' ne ref $barring_profiles or (scalar keys %$barring_profiles) == 0) {
$barring_profiles //= {};
configurationerror($configfile,'no barring mappings found',getlogger(__PACKAGE__));
$result = 0;
}
return $result;
}
return 0;
}
1;

@ -0,0 +1,19 @@
mapping:
Junet:
21: 'Junet 21'
25: 'Junet 25'
29: 'Junet 29'
33: 'Junet 33'
Teleman:
21: 'Teleman 21'
25: 'Teleman 25'
29: 'Teleman 29'
33: 'Teleman 33'
Teletek:
21: 'Teletek 21'
25: 'Teletek 25'
27: 'Teletek 27'
29: 'Teletek 29'
33: 'Teletek 33'
25_29: 'Teletek 25 29'
25_33: 'Teletek 25 33'

@ -13,6 +13,7 @@ use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
update_settings
update_reseller_mapping
update_barring_profiles
check_dry
$output_path
$defaultsettings
@ -23,8 +24,11 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Settings qw(
$run_id
@subscriber_filenames
$reseller_mapping_yml
$barring_profiles_yml
@allowedcli_filenames
@clir_filenames
);
#$allowed_ips
@ -62,6 +66,8 @@ use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles);
use NGCP::BulkProcessor::Projects::Migration::Teletek::ProjectConnectorPool qw(destroy_all_dbs);
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::AllowedCli qw();
use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
@ -83,6 +89,7 @@ use NGCP::BulkProcessor::Projects::Migration::Teletek::Check qw(
use NGCP::BulkProcessor::Projects::Migration::Teletek::Import qw(
import_subscriber
import_allowedcli
import_clir
);
use NGCP::BulkProcessor::Projects::Migration::Teletek::Provisioning qw(
@ -123,18 +130,14 @@ push(@TASK_OPTS,$import_allowedcli_task_opt);
my $import_truncate_allowedcli_task_opt = 'truncate_allowedcli';
push(@TASK_OPTS,$import_truncate_allowedcli_task_opt);
my $import_clir_task_opt = 'import_clir';
push(@TASK_OPTS,$import_clir_task_opt);
my $import_truncate_clir_task_opt = 'truncate_clir';
push(@TASK_OPTS,$import_truncate_clir_task_opt);
my $create_subscriber_task_opt = 'create_subscriber';
push(@TASK_OPTS,$create_subscriber_task_opt);
#my $set_allowed_ips_task_opt = 'set_allowed_ips';
#push(@TASK_OPTS,$set_allowed_ips_task_opt);
#my $set_call_forwards_task_opt = 'set_call_forwards';
#push(@TASK_OPTS,$set_call_forwards_task_opt);
#my $set_concurrent_max_total_task_opt = 'set_concurrent_max_total';
#push(@TASK_OPTS,$set_concurrent_max_total_task_opt);
if (init()) {
main();
exit(0);
@ -163,6 +166,7 @@ sub init {
init_log();
$result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE);
$result &= load_config($reseller_mapping_yml,\&update_reseller_mapping,$YAML_CONFIG_TYPE);
$result &= load_config($barring_profiles_yml,\&update_barring_profiles,$YAML_CONFIG_TYPE);
return $result;
}
@ -197,6 +201,10 @@ sub main() {
} elsif (lc($import_truncate_allowedcli_task_opt) eq lc($task)) {
$result &= import_truncate_allowedcli_task(\@messages) if taskinfo($import_truncate_allowedcli_task_opt,$result);
} elsif (lc($import_clir_task_opt) eq lc($task)) {
$result &= import_clir_task(\@messages) if taskinfo($import_clir_task_opt,$result);
} elsif (lc($import_truncate_clir_task_opt) eq lc($task)) {
$result &= import_truncate_clir_task(\@messages) if taskinfo($import_truncate_clir_task_opt,$result);
} elsif (lc($create_subscriber_task_opt) eq lc($task)) {
if (taskinfo($create_subscriber_task_opt,$result,1)) {
@ -205,28 +213,6 @@ sub main() {
$completion |= 1;
}
#} elsif (lc($set_allowed_ips_task_opt) eq lc($task)) {
# if (taskinfo($set_allowed_ips_task_opt,$result,1) and ($result = batchinfo($result))) {
# next unless check_dry();
# $result &= set_allowed_ips_task(\@messages);
# $completion |= 1;
# }
#} elsif (lc($set_call_forwards_task_opt) eq lc($task)) {
# if (taskinfo($set_call_forwards_task_opt,$result,1) and ($result = batchinfo($result))) {
# next unless check_dry();
# $result &= set_call_forwards_task(\@messages);
# $completion |= 1;
# }
#} elsif (lc($set_concurrent_max_total_task_opt) eq lc($task)) {
# if (taskinfo($set_concurrent_max_total_task_opt,$result,1) and ($result = batchinfo($result))) {
# next unless check_dry();
# $result &= set_preference_bulk_task(\@messages,
# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CONCURRENT_MAX_TOTAL_ATTRIBUTE,
# $concurrent_max_total);
# $completion |= 1;
# }
} else {
$result = 0;
@ -369,9 +355,6 @@ sub import_truncate_subscriber_task {
}
sub import_allowedcli_task {
my ($messages) = @_;
@ -431,6 +414,65 @@ sub import_truncate_allowedcli_task {
}
sub import_clir_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = import_clir(@clir_filenames);
};
my $err = $@;
my $stats = ": $warning_count warnings";
eval {
$stats .= "\n total clir records: " .
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_clir() . ' rows';
my $added_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_delta(
$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::added_delta
);
$stats .= "\n new: $added_count rows";
my $existing_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_delta(
$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::updated_delta
);
$stats .= "\n existing: $existing_count rows";
my $deleted_count = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_delta(
$NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::deleted_delta
);
$stats .= "\n removed: $deleted_count rows";
};
if ($err or !$result) {
push(@$messages,"importing clir INCOMPLETE$stats");
} else {
push(@$messages,"importing clir completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
}
sub import_truncate_clir_task {
my ($messages) = @_;
my $result = 0;
eval {
$result = NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::create_table(1);
};
my $err = $@;
my $stats = '';
eval {
$stats .= "\n total clir records: " .
NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Clir::countby_clir() . ' rows';
};
if ($err or !$result) {
push(@$messages,"truncating imported clir INCOMPLETE$stats");
} else {
push(@$messages,"truncating imported clir completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
}
sub create_subscriber_task {
@ -482,42 +524,6 @@ sub create_subscriber_task {
}
#sub set_allowed_ips_task {
#
# my ($messages) = @_;
# my ($result,$warning_count) = (0,0);
# eval {
# if ($batch) {
# ($result,$warning_count) = set_allowed_ips_batch();
# } else {
# ($result,$warning_count) = set_allowed_ips();
# }
# };
# my $err = $@;
# my $stats = ($skip_errors ? ": $warning_count warnings" : '');
# eval {
#
# my $allowed_ips_grp_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute(
# $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE);
# $stats .= "\n '" . $allowed_ips_grp_attribute->{attribute} . "': " .
# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
# $allowed_ips_grp_attribute->{id},undef) . ' rows';
# foreach my $ipnet (@$allowed_ips) {
# $stats .= "\n '$ipnet': " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::countby_groupid_ipnet(undef,$ipnet) . ' rows';
# }
# $stats .= "\n voip_aig_sequence: " . NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence::get_id();
#
# };
# if ($err or !$result) {
# push(@$messages,"set subscribers\' allowed_ips preference INCOMPLETE$stats");
# } else {
# push(@$messages,"set subscribers\' allowed_ips preference completed$stats");
# }
# destroy_all_dbs(); #every task should leave with closed connections.
# return $result;
#
#}
#sub set_call_forwards_task {
#
# my ($messages,$mode) = @_;
@ -558,37 +564,6 @@ sub create_subscriber_task {
#
#}
#sub set_preference_bulk_task {
#
# my ($messages,$bulk_attribute_name,$value) = @_;
# my ($result,$warning_count) = (0,0);
# eval {
# if ($batch) {
# ($result,$warning_count) = set_preference_bulk_batch($bulk_attribute_name,$value);
# } else {
# ($result,$warning_count) = set_preference_bulk($bulk_attribute_name,$value);
# }
# };
# my $err = $@;
# my $stats = ($skip_errors ? ": $warning_count warnings" : '');
# eval {
# my $bulk_attribute = NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_attribute($bulk_attribute_name);
#
# $stats .= "\n '" . $bulk_attribute->{attribute} . "': " .
# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::countby_subscriberid_attributeid_value(undef,
# $bulk_attribute->{id},undef) . ' rows';
#
# };
# if ($err or !$result) {
# push(@$messages,"set subscribers\' $bulk_attribute_name preference INCOMPLETE$stats");
# } else {
# push(@$messages,"set subscribers\' $bulk_attribute_name preference completed$stats");
# }
# destroy_all_dbs(); #every task should leave with closed connections.
# return $result;
#
#}
#END {
# # this should not be required explicitly, but prevents Log4Perl's
# # "rootlogger not initialized error upon exit..

@ -14,7 +14,7 @@ ignore_subscriber_unique = 0
subscriber_import_single_row_txn = 1
subscriber_import_unfold_ranges = 1
reseller_mapping_yml = reseller_mapping.yml
barring_profiles_yml = barring_profiles.yml
#allowedcli_filename = /home/rkrenn/test/teletek/export_multiple_DID_Leica.csv
#allowedcli_filename = /home/rkrenn/temp/teletek/export_screeningOnly_170824.csv
@ -22,11 +22,18 @@ allowedcli_filenames = /home/rkrenn/temp/teletek/export_MultipleDID_170823.csv,/
allowedcli_import_numofthreads = 2
ignore_allowedcli_unique = 0
allowedcli_import_single_row_txn = 1
allowedcli_import_unfold_ranges = 1
clir_filenames = /home/rkrenn/temp/teletek/export_CLIR.csv
clir_import_numofthreads = 2
ignore_clir_unique = 0
clir_import_single_row_txn = 1
provision_subscriber_multithreading = 1
#provision_subscriber_numofthreads = 6
always_update_subscriber = 0
webpassword_length = 8
webusername_length = 8
set_allowed_ips_multithreading = 1
#set_allowed_ips_numofthreads = 6

Loading…
Cancel
Save