TT#48101 upc AT importer

Change-Id: I7ee42cdf2d461c061456863a8f9a4d4b7238befe
changes/13/25313/7
Rene Krenn 7 years ago
parent 2bd51c65c1
commit a6619bfb18

@ -34,6 +34,7 @@ our @EXPORT_OK = qw(
countby_status_resellerid
findby_contactid
findby_externalid
findby_id
forupdate_id
@ -108,6 +109,23 @@ sub findby_contactid {
}
sub findby_externalid {
my ($external_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('external_id') . ' = ?';
my @params = ($external_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_id {
my ($id,$load_recursive) = @_;

@ -27,8 +27,11 @@ our @EXPORT_OK = qw(
$CC_ATTRIBUTE
$ACCOUNT_ID_ATTRIBUTE
$NCOS_ID_ATTRIBUTE
$ADM_NCOS_ID_ATTRIBUTE
$GPPx_ATTRIBUTE
$PEER_AUTH_USER
$PEER_AUTH_PASS
$PEER_AUTH_REALM
@ -76,7 +79,9 @@ our $AC_ATTRIBUTE = 'ac';
our $CC_ATTRIBUTE = 'cc';
our $ACCOUNT_ID_ATTRIBUTE = 'account_id';
our $NCOS_ID_ATTRIBUTE = 'ncos_id';
our $ADM_NCOS_ID_ATTRIBUTE = 'adm_ncos_id';
our $GPPx_ATTRIBUTE = 'gpp';
our $PEER_AUTH_USER = 'peer_auth_user';
our $PEER_AUTH_PASS = 'peer_auth_pass';

@ -13,7 +13,7 @@ require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::FileProcessor);
our @EXPORT_OK = qw();
my $default_lineseparator = '\\n\\r|\\r|\\n';
my $default_lineseparator = '\\r\\n|\\r|\\n'; #\\n\\r
my $default_fieldseparator = ",";
my $default_encoding = 'UTF-8';

@ -48,6 +48,7 @@ use NGCP::BulkProcessor::LoadConfig qw(
);
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles);
use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
);
@ -166,6 +167,7 @@ sub cleanup_task {
#cleanupdbfiles() if $clean_generated;
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
cleanupmsgfiles(\&fileerror,\&filewarn);
cleanupcertfiles();
#cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
#cleanupdir($rollback_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
$result = 1;

@ -394,7 +394,7 @@ sub _unfold_number_ranges {
my ($context,$record,$rows) = @_;
sub create_new_record_code{}
#sub create_new_record_code{}
my $result = 0;
my @fieldnames = @{$context->{fieldnames}};

@ -0,0 +1,244 @@
package NGCP::BulkProcessor::Projects::Migration::UPCAT::Check;
use strict;
## no critic
no strict 'refs';
use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts_billing_profile_network qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts_billing_profile_network_schedule qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::products qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw();
use NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users qw();
#use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw();
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::RestRequests::Trunk::Resellers qw();
use NGCP::BulkProcessor::RestRequests::Trunk::Domains qw();
use NGCP::BulkProcessor::RestRequests::Trunk::BillingProfiles qw();
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
check_billing_db_tables
check_provisioning_db_tables
check_kamailio_db_tables
check_import_db_tables
check_rest_get_items
);
my $NOK = 'NOK';
my $OK = 'ok';
sub check_billing_db_tables {
my ($messages) = @_;
my $result = 1;
my $check_result;
my $message;
my $message_prefix = 'NGCP billing db tables - ';
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::products');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::domains');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::contacts');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::contracts');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::contract_balances');
$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings');
#$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::contracts_billing_profile_network');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::contracts_billing_profile_network_schedule');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers');
$result &= $check_result; push(@$messages,$message);
return $result;
}
sub check_import_db_tables {
my ($messages) = @_;
my $result = 1;
my $check_result;
my $message;
my $message_prefix = 'import db tables - ';
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber');
$result &= $check_result; push(@$messages,$message);
return $result;
}
sub check_provisioning_db_tables {
my ($messages) = @_;
my $result = 1;
my $check_result;
my $message;
my $message_prefix = 'NGCP provisioning db tables - ';
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_domains');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_aig_sequence');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups');
$result &= $check_result; push(@$messages,$message);
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases');
$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destination_sets');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations');
#$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources');
#$result &= $check_result; push(@$messages,$message);
return $result;
}
sub check_kamailio_db_tables {
my ($messages) = @_;
my $result = 1;
my $check_result;
my $message;
my $message_prefix = 'NGCP kamailio db tables - ';
($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users');
$result &= $check_result; push(@$messages,$message);
#($check_result,$message) = _check_table($message_prefix,'NGCP::BulkProcessor::Dao::Trunk::kamailio::location');
#$result &= $check_result; push(@$messages,$message);
return $result;
}
sub _check_table {
my ($message_prefix,$module) = @_;
my $result = 0;
my $message = ($message_prefix // '') . &{$module . '::gettablename'}() . ': ';
eval {
$result = &{$module . '::check_table'}();
};
if (@$ or not $result) {
return (0,$message . $NOK);
} else {
return (1,$message . $OK);
}
}
sub check_rest_get_items {
my ($messages) = @_;
my $result = 1;
my $check_result;
my $message;
my $message_prefix = 'NGCP id\'s/constants - ';
return $result;
}
sub _check_rest_get_item {
my ($message_prefix,$module,$id,$item_name_field,$get_method,$item_path_method) = @_;
my $item = undef;
$get_method //= 'get_item';
$item_path_method //= 'get_item_path';
my $message = ($message_prefix // '') . &{$module . '::' . $item_path_method}($id) . ': ';
return (0,$message . $NOK,$item) unless $id;
eval {
$item = &{$module . '::' . $get_method}($id);
};
if (@$ or not defined $item or ('ARRAY' eq ref $item and (scalar @$item) != 1)) {
return (0,$message . $NOK,$item);
} else {
$item = $item->[0] if ('ARRAY' eq ref $item and (scalar @$item) == 1);
return (1,$message . "'" . $item->{$item_name_field} . "' " . $OK,$item);
}
}
1;

@ -0,0 +1,470 @@
package NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber;
use strict;
## no critic
use NGCP::BulkProcessor::Projects::Migration::UPCAT::ProjectConnectorPool qw(
get_import_db
destroy_all_dbs
);
#import_db_tableidentifier
use NGCP::BulkProcessor::SqlProcessor qw(
registertableinfo
create_targettable
checktableinfo
copy_row
insert_stmt
process_table
);
use NGCP::BulkProcessor::SqlRecord qw();
#use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
create_table
gettablename
check_table
getinsertstatement
getupsertstatement
findby_ccacsn
countby_ccacsn
findby_domain_sipusername
findby_domain_webusername
list_domain_billingprofilename_resellernames
findby_sipusername
list_barring_resellernames
update_delta
findby_delta
countby_delta
$deleted_delta
$updated_delta
$added_delta
process_records
@fieldnames
);
# findby_ccacsn
# countby_ccacsn
# findby_domain_sipusername
# findby_domain_webusername
# list_domain_billingprofilename_resellernames
# findby_sipusername
# list_barring_resellernames
my $tablename = 'subscriber';
my $get_db = \&get_import_db;
#my $get_tablename = \&import_db_tableidentifier;
my @csv_cols = (
# fields in order of cols from .csv
"_rownum",
"_dn",
"_txt_sw_username",
"sip_password", #"Subscriber sip password",
"_len",
"_cpe_mta_mac_address",
"_cpe_model",
"_cpe_vendor",
"customer_id", #"Customer ID",
);
our @fieldnames = (
@csv_cols,
"reseller_name", #"Reseller name",
"domain", #"Sip domain name",
"billing_profile_name", #"Billing profile name",
"sip_username", #"Subscriber sip username",
"cc", #"Subscriber primary number - country code (cc)",
"ac", #"Subscriber primary number - country code (ac)",
"sn", #"Subscriber primary number - country code (sn)",
"web_username", #"Subscriber web username",
"web_password", #"Subscriber web password",
"barring",
#"allowed_ips",
#"channels",
#"voicemail",
#calculated fields at the end!
'rownum',
#'range',
#'contact_hash',
'filenum',
'filename',
);
my $expected_fieldnames = [
@fieldnames,
'delta',
];
# table creation:
my $primarykey_fieldnames = [ 'cc','ac','sn' ];
my $indexes = {
$tablename . '_domain_web_username' => [ 'domain(32)','web_username(32)' ],
$tablename . '_domain_sip_username' => [ 'domain(32)','sip_username(32)' ],
$tablename . '_rownum' => [ 'rownum(11)' ],
$tablename . '_delta' => [ 'delta(7)' ],};
#my $fixtable_statements = [];
our $deleted_delta = 'DELETED';
our $updated_delta = 'UPDATED';
our $added_delta = 'ADDED';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub create_table {
my ($truncate) = @_;
my $db = &$get_db();
registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
sub findby_delta {
my ($delta,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return [] unless defined $delta;
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('delta') . ' = ?'
,$delta);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_ccacsn {
my ($cc,$ac,$sn,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return [] unless (defined $cc or defined $ac or defined $sn);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('cc') . ' = ?' .
' AND ' . $db->columnidentifier('ac') . ' = ?' .
' AND ' . $db->columnidentifier('sn') . ' = ?'
,$cc,$ac,$sn);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub findby_domain_sipusername {
my ($domain,$sip_username,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
#return [] unless (defined $cc or defined $ac or defined $sn);
my $rows = $db->db_get_all_arrayref(
$db->paginate_sort_query('SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('domain') . ' = ?' .
' AND ' . $db->columnidentifier('sip_username') . ' = ?',
undef,undef,[{
column => 'filenum',
numeric => 1,
dir => 1,
},{
column => 'rownum',
numeric => 1,
dir => 1,
}])
,$domain,$sip_username);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_domain_webusername {
my ($domain,$web_username,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
#return [] unless (defined $cc or defined $ac or defined $sn);
my $rows = $db->db_get_all_arrayref(
$db->paginate_sort_query('SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('domain') . ' = ?' .
' AND ' . $db->columnidentifier('web_username') . ' = ?',
undef,undef,[{
column => 'filenum',
numeric => 1,
dir => 1,
},{
column => 'rownum',
numeric => 1,
dir => 1,
}])
,$domain,$web_username);
return buildrecords_fromrows($rows,$load_recursive);
}
sub update_delta {
my ($cc,$ac,$sn,$delta) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'UPDATE ' . $table . ' SET delta = ?';
my @params = ();
push(@params,$delta);
if (defined $cc or defined $ac or defined $sn) {
$stmt .= ' WHERE ' .
$db->columnidentifier('cc') . ' = ?' .
' AND ' . $db->columnidentifier('ac') . ' = ?' .
' AND ' . $db->columnidentifier('sn') . ' = ?';
push(@params,$cc,$ac,$sn);
}
return $db->db_do($stmt,@params);
}
sub countby_ccacsn {
my ($cc,$ac,$sn) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table;
my @params = ();
if (defined $cc or defined $ac or defined $sn) {
$stmt .= ' WHERE ' .
$db->columnidentifier('cc') . ' = ?' .
' AND ' . $db->columnidentifier('ac') . ' = ?' .
' AND ' . $db->columnidentifier('sn') . ' = ?';
push(@params,$cc,$ac,$sn);
}
return $db->db_get_value($stmt,@params);
}
sub countby_delta {
my ($deltas) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1';
my @params = ();
if (defined $deltas and 'HASH' eq ref $deltas) {
foreach my $in (keys %$deltas) {
my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in}));
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
push(@params,@values);
}
} elsif (defined $deltas and length($deltas) > 0) {
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?';
push(@params,$deltas);
}
return $db->db_get_value($stmt,@params);
}
sub list_domain_billingprofilename_resellernames {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my @cols = map { $db->columnidentifier($_); } qw/domain billing_profile_name reseller_name/;
my $stmt = 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols);
my @params = ();
return $db->db_get_all_arrayref($stmt,@params);
}
sub list_barring_resellernames {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my @cols = map { $db->columnidentifier($_); } qw/barring reseller_name/;
my $stmt = 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols);
my @params = ();
return $db->db_get_all_arrayref($stmt,@params);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub process_records {
my %params = @_;
my ($process_code,
$static_context,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads) = @params{qw/
process_code
static_context
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
/};
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my @cols = map { $db->columnidentifier($_); } qw/domain sip_username/;
return process_table(
get_db => $get_db,
class => __PACKAGE__,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,$rowblock,$row_offset);
},
static_context => $static_context,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_dbs_code => \&destroy_all_dbs,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
#'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),
'select' => $db->paginate_sort_query('SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),undef,undef,[{
column => 'filenum',
numeric => 1,
dir => 1,
},{
column => 'rownum',
numeric => 1,
dir => 1,
}]),
'selectcount' => 'SELECT COUNT(*) FROM (SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols) . ') AS g',
);
}
sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
sub getupsertstatement {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' .
join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')';
my @values = ();
foreach my $fieldname (@$expected_fieldnames) {
if ('delta' eq $fieldname) {
my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' .
$db->columnidentifier('cc') . ' = ?' .
' AND ' . $db->columnidentifier('ac') . ' = ?' .
' AND ' . $db->columnidentifier('sn') . ' = ?';
push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')');
} else {
push(@values,'?');
}
}
$upsert_stmt .= ' VALUES (' . join(',',@values) . ')';
return $upsert_stmt;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -0,0 +1,290 @@
package NGCP::BulkProcessor::Projects::Migration::UPCAT::Import;
use strict;
## no critic
use threads::shared qw();
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Settings qw(
$provision_subscriber_rownum_start
$import_multithreading
$subscriber_import_numofthreads
$ignore_subscriber_unique
$subscriber_import_single_row_txn
$skip_errors
$default_domain
$default_reseller_name
$default_billing_profile_name
$default_barring
$cc_ac_map
$default_cc
$cc_len_min
$cc_len_max
$ac_len
);
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
processing_debug
);
use NGCP::BulkProcessor::LogError qw(
fileprocessingwarn
fileprocessingerror
);
#use NGCP::BulkProcessor::Projects::Migration::UPCAT::FileProcessors::CSVFile qw();
use NGCP::BulkProcessor::FileProcessors::CSVFileSimple qw();
use NGCP::BulkProcessor::Projects::Migration::UPCAT::ProjectConnectorPool qw(
get_import_db
destroy_all_dbs
);
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(threadid zerofill trim);
use NGCP::BulkProcessor::Table qw(get_rowhash);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
import_subscriber
);
sub import_subscriber {
my (@files) = @_;
my $result = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::create_table(0);
foreach my $file (@files) {
$result &= _import_subscriber_checks($file);
}
#my $importer = NGCP::BulkProcessor::Projects::Migration::UPCAT::FileProcessors::CSVFile->new($subscriber_import_numofthreads);
my $importer = NGCP::BulkProcessor::FileProcessors::CSVFileSimple->new($subscriber_import_numofthreads);
my $upsert = _import_subscriber_reset_delta();
destroy_all_dbs(); #close all db connections before forking..
my $warning_count :shared = 0;
my $filenum = 0;
foreach my $file (@files) {
$filenum++;
$result &= $importer->process(
file => $file,
process_code => sub {
my ($context,$rows,$row_offset) = @_;
my $rownum = $row_offset;
my @subscriber_rows = ();
foreach my $row (@$rows) {
$rownum++;
next if (defined $provision_subscriber_rownum_start and $rownum < $provision_subscriber_rownum_start);
next if (scalar @$row) == 0;
$row = [ map { local $_ = $_; trim($_); $_ =~ s/^"//; $_ =~ s/"$//r; } @$row ];
my $record = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber->new($row);
$record->{reseller_name} = $default_reseller_name;
($record->{sip_username},$record->{domain}) = split('@',$record->{_txt_sw_username},2);
$record->{domain} //= $default_domain;
$record->{billing_profile_name} = $default_billing_profile_name;
($record->{cc},$record->{ac},$record->{sn}) = _split_dn($record->{_dn});
$record->{web_username} = undef;
$record->{web_password} = undef;
$record->{barring} = $default_barring;
#$record->{allowed_ips}
#"channels",
#"voicemail",
$record->{rownum} = $rownum;
$record->{filenum} = $filenum;
$record->{filename} = $file;
my %r = %$record; my @row_ext = @r{@NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::fieldnames};
if ($context->{upsert}) {
push(@row_ext,$record->{cc},$record->{ac},$record->{sn});
} else {
push(@row_ext,$NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::added_delta);
}
push(@subscriber_rows,\@row_ext); # if &{$context->{check_number_code}}($context,$record);
#my %r = %$record;
#$record->{contact_hash} = get_rowhash([@r{@NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::contact_fieldnames}]);
#next unless _unfold_number_ranges($context,$record,\@subscriber_rows);
if ($subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) {
while (defined (my $subscriber_row = shift @subscriber_rows)) {
if ($skip_errors) {
eval { _insert_subscriber_rows($context,[$subscriber_row]); };
_warn($context,$@) if $@;
} else {
_insert_subscriber_rows($context,[$subscriber_row]);
}
}
}
}
if (not $subscriber_import_single_row_txn and (scalar @subscriber_rows) > 0) {
if ($skip_errors) {
eval { _insert_subscriber_rows($context,\@subscriber_rows); };
_warn($context,$@) if $@;
} else {
_insert_subscriber_rows($context,\@subscriber_rows);
}
}
#use Data::Dumper;
#print Dumper(\@subscriber_rows);
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_import_db(); # keep ref count low..
$context->{upsert} = $upsert;
#$context->{unfold_ranges} = $subscriber_import_unfold_ranges;
#$context->{fieldnames} = \@NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::fieldnames;
#$context->{added_delta} = $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::added_delta;
#$context->{create_new_record_code} = sub {
# return NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber->new(shift);
#};
#$context->{check_number_code} = sub {
# my ($context,$record) = @_;
# my $result = 1;
# my $number = $record->{dn};
# my $number = $record->{cc} . $record->{ac} . $record->{sn};
# if (NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::findby_ccacsn($record->{cc},$record->{ac},$record->{sn})) {
# if ($skip_errors) {
# _warn($context,"$record->{sip_username}: duplicate number $number");
# } else {
# _error($context,"$record->{sip_username}: duplicate number $number");
# }
# $result = 0;
# }
# return $result;
#};
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
multithreading => $import_multithreading
);
}
return ($result,$warning_count);
}
sub _import_subscriber_checks {
my ($file) = @_;
my $result = 1;
return $result;
}
sub _import_subscriber_reset_delta {
my $upsert = 0;
if (NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_ccacsn() > 0) {
processing_info(threadid(),'resetting delta of ' .
NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::update_delta(undef,undef,undef,
$NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::deleted_delta) .
' subscriber records',getlogger(__PACKAGE__));
$upsert |= 1;
}
return $upsert;
}
sub _insert_subscriber_rows {
my ($context,$subscriber_rows) = @_;
$context->{db}->db_do_begin(
($context->{upsert} ?
NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::getupsertstatement()
: NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::getinsertstatement($ignore_subscriber_unique)),
#NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::gettablename(),
#lock
);
eval {
$context->{db}->db_do_rowblock($subscriber_rows);
$context->{db}->db_finish();
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_finish(1);
};
die($err);
}
}
sub _split_dn {
my ($dn) = @_;
my ($cc,$ac,$sn) = ('','',$dn);
if ($cc_ac_map) {
if ($default_cc) {
$cc = $default_cc;
$dn =~ s/^0//;
$sn = $dn;
} else {
foreach my $cc_length ($cc_len_min .. $cc_len_max) {
my ($_cc,$_dn) = (substr($dn,0,$cc_length), substr($dn,$cc_length));
if (exists $cc_ac_map->{$_cc}) {
$cc = $_cc;
$sn = $_dn;
$dn = $_dn;
last;
}
}
}
if (exists $cc_ac_map->{$cc}) {
my $ac_map = $cc_ac_map->{$cc};
foreach my $ac_length ($ac_len->{$cc}->{min} .. $ac_len->{$cc}->{max}) {
my ($_ac,$_sn) = (substr($dn,0,$ac_length), substr($dn,$ac_length));
if (exists $ac_map->{$_ac}) {
$ac = $_ac;
$sn = $_sn;
#$dn = '';
last;
}
}
}
}
return ($cc,$ac,$sn);
}
sub _error {
my ($context,$message) = @_;
$context->{error_count} = $context->{error_count} + 1;
fileprocessingerror($context->{filename},$message,getlogger(__PACKAGE__));
}
sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
fileprocessingwarn($context->{filename},$message,getlogger(__PACKAGE__));
}
sub _info {
my ($context,$message,$debug) = @_;
if ($debug) {
processing_debug($context->{tid},$message,getlogger(__PACKAGE__));
} else {
processing_info($context->{tid},$message,getlogger(__PACKAGE__));
}
}
1;

@ -0,0 +1,182 @@
package NGCP::BulkProcessor::Projects::Migration::UPCAT::Preferences;
use strict;
## no critic
no strict 'refs';
use threads::shared qw();
#use List::Util qw();
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Settings qw(
$dry
$skip_errors
);
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
processing_debug
);
use NGCP::BulkProcessor::LogError qw(
rowprocessingerror
rowprocessingwarn
);
use NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::domain_resellers qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw();
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences qw();
use NGCP::BulkProcessor::ConnectorPool qw(
get_xa_db
);
use NGCP::BulkProcessor::Projects::Migration::UPCAT::ProjectConnectorPool qw(
destroy_all_dbs
);
use NGCP::BulkProcessor::Utils qw(threadid);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
clear_subscriber_preferences
delete_subscriber_preference
set_subscriber_preference
get_subscriber_preference
);
my %get_preference_sub_names = (
voip_usr_preferences => 'findby_subscriberid_attributeid',
);
my %preference_id_cols = (
voip_usr_preferences => 'subscriber_id',
);
sub clear_subscriber_preferences {
my ($context,$subscriber_id,$attribute,$except_value) = @_;
return _clear_preferences($context,'voip_usr_preferences',$subscriber_id,$attribute,$except_value);
}
sub delete_subscriber_preference {
my ($context,$subscriber_id,$attribute,$value) = @_;
return _delete_preference($context,'voip_usr_preferences',$subscriber_id,$attribute,$value);
}
sub set_subscriber_preference {
my ($context,$subscriber_id,$attribute,$value) = @_;
return _set_preference($context,'voip_usr_preferences',$subscriber_id,$attribute,$value);
}
sub get_subscriber_preference {
my ($context,$subscriber_id,$attribute) = @_;
return _get_preference($context,'voip_usr_preferences',$subscriber_id,$attribute);
}
sub _clear_preferences {
my ($context,$pref_type,$id,$attribute,$except_value) = @_;
return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db},
$id, $attribute->{id}, defined $except_value ? { 'NOT IN' => $except_value } : undef);
}
sub _delete_preference {
my ($context,$pref_type,$id,$attribute,$value) = @_;
return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db},
$id, $attribute->{id}, { 'IN' => $value } );
}
sub _set_preference {
my ($context,$pref_type,$id,$attribute,$value) = @_;
if ($attribute->{max_occur} == 1) {
my $old_preferences = &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::' . $get_preference_sub_names{$pref_type}}($context->{db},
$id,$attribute->{id});
if (defined $value) {
if ((scalar @$old_preferences) == 1) {
&{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::update_row'}($context->{db},{
id => $old_preferences->[0]->{id},
value => $value,
});
return $old_preferences->[0]->{id};
} else {
if ((scalar @$old_preferences) > 1) {
&{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db},
$id,$attribute->{id});
}
return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::insert_row'}($context->{db},
attribute_id => $attribute->{id},
$preference_id_cols{$pref_type} => $id,
value => $value,
);
}
} else {
&{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::delete_preferences'}($context->{db},
$id,$attribute->{id});
return undef;
}
} else {
if (defined $value) {
return &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::insert_row'}($context->{db},
attribute_id => $attribute->{id},
$preference_id_cols{$pref_type} => $id,
value => $value,
);
} else {
return undef;
}
}
}
sub _get_preference {
my ($context,$pref_type,$id,$attribute) = @_;
my $preferences = &{'NGCP::BulkProcessor::Dao::Trunk::provisioning::' . $pref_type . '::' . $get_preference_sub_names{$pref_type}}($context->{db},
$id,$attribute->{id});
if ($attribute->{max_occur} == 1) {
return $preferences->[0];
} else {
return $preferences;
}
}
sub _error {
my ($context,$message) = @_;
$context->{error_count} = $context->{error_count} + 1;
rowprocessingerror($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
rowprocessingwarn($context->{tid},$message,getlogger(__PACKAGE__));
}
sub _info {
my ($context,$message,$debug) = @_;
if ($debug) {
processing_debug($context->{tid},$message,getlogger(__PACKAGE__));
} else {
processing_info($context->{tid},$message,getlogger(__PACKAGE__));
}
}
1;

@ -0,0 +1,101 @@
package NGCP::BulkProcessor::Projects::Migration::UPCAT::ProjectConnectorPool;
use strict;
## no critic
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../');
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Settings qw(
$import_db_file
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_connectorinstancename
ping
);
#use NGCP::BulkProcessor::SqlConnectors::MySQLDB;
#use NGCP::BulkProcessor::SqlConnectors::OracleDB;
#use NGCP::BulkProcessor::SqlConnectors::PostgreSQLDB;
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(
$staticdbfilemode
cleanupdbfiles
);
#use NGCP::BulkProcessor::SqlConnectors::CSVDB;
#use NGCP::BulkProcessor::SqlConnectors::SQLServerDB;
#use NGCP::BulkProcessor::RestConnectors::NGCPRestApi;
use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
get_import_db
import_db_tableidentifier
destroy_dbs
destroy_all_dbs
ping_dbs
ping_all_dbs
);
# thread connector pools:
my $import_dbs = {};
sub get_import_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name); #threadid(); #shift;
if (not defined $import_dbs->{$name}) {
$import_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); #$name);
if (not defined $reconnect) {
$reconnect = 1;
}
}
if ($reconnect) {
$import_dbs->{$name}->db_connect($staticdbfilemode,$import_db_file);
}
return $import_dbs->{$name};
}
sub import_db_tableidentifier {
my ($get_target_db,$tablename) = @_;
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$import_db_file));
}
sub ping_dbs {
ping($import_dbs);
}
sub ping_all_dbs {
ping_dbs();
NGCP::BulkProcessor::ConnectorPool::ping_dbs();
}
sub destroy_dbs {
foreach my $name (keys %$import_dbs) {
cleartableinfo($import_dbs->{$name});
undef $import_dbs->{$name};
delete $import_dbs->{$name};
}
}
sub destroy_all_dbs() {
destroy_dbs();
NGCP::BulkProcessor::ConnectorPool::destroy_dbs();
}
1;

@ -0,0 +1,325 @@
package NGCP::BulkProcessor::Projects::Migration::UPCAT::Settings;
use strict;
## no critic
use NGCP::BulkProcessor::Globals qw(
$working_path
$enablemultithreading
$cpucount
create_path
);
use NGCP::BulkProcessor::Logging qw(
getlogger
scriptinfo
configurationinfo
);
use NGCP::BulkProcessor::LogError qw(
fileerror
filewarn
configurationwarn
configurationerror
);
use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(prompt timestampdigits);
#format_number check_ipnet
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
update_settings
update_cc_ac_map
update_barring_profiles
check_dry
$input_path
$output_path
$report_filename
$defaultsettings
$defaultconfig
$import_multithreading
$run_id
$dry
$skip_errors
$force
$import_db_file
@subscriber_filenames
$subscriber_import_numofthreads
$ignore_subscriber_unique
$subscriber_import_single_row_txn
$provision_subscriber_rownum_start
$provision_subscriber_multithreading
$provision_subscriber_numofthreads
$webpassword_length
$webusername_length
$sippassword_length
$default_domain
$default_reseller_name
$default_billing_profile_name
$default_barring
$cc_ac_map_yml
$cc_ac_map
$default_cc
$cc_len_min
$cc_len_max
$ac_len
$barring_profiles_yml
$barring_profiles
);
#$default_channels_map
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
our $input_path = $working_path . 'input/';
our $output_path = $working_path . 'output/';
our $report_filename = undef;
our $force = 0;
our $dry = 0;
our $skip_errors = 0;
our $run_id = '';
our $import_db_file = _get_import_db_file($run_id,'import');
our $import_multithreading = 0; #$enablemultithreading;
our @subscriber_filenames = ();
our $subscriber_import_numofthreads = $cpucount;
our $ignore_subscriber_unique = 0;
our $subscriber_import_single_row_txn = 1;
our $provision_subscriber_rownum_start = 0; #all lines
our $provision_subscriber_multithreading = $enablemultithreading;
our $provision_subscriber_numofthreads = $cpucount;
our $webpassword_length = 8;
our $webusername_length = 8;
our $sippassword_length = 16;
our $default_domain = undef;
our $default_reseller_name = 'default';
our $default_billing_profile_name = 'Default Billing Profile';
our $default_barring = undef;
our $cc_ac_map_yml = 'cc_ac.yml';
our $cc_ac_map = {};
our $default_cc = undef;
our $cc_len_min = ~0;
our $cc_len_max = 0;
our $ac_len = {};
our $barring_profiles_yml = undef;
our $barring_profiles = {};
sub update_settings {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
my $regexp_result;
#&$configurationinfocode("testinfomessage",$configlogger);
$result &= _prepare_working_paths(1);
if ($data->{report_filename}) {
$report_filename = $output_path . sprintf('/' . $data->{report_filename},timestampdigits());
if (-e $report_filename and (unlink $report_filename) == 0) {
filewarn('cannot remove ' . $report_filename . ': ' . $!,getlogger(__PACKAGE__));
$report_filename = undef;
}
} else {
$report_filename = undef;
}
$dry = $data->{dry} if exists $data->{dry};
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$import_db_file = _get_import_db_file($run_id,'import');
$import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading};
#if ($import_multithreading) {
# configurationerror($configfile,"import_multithreading must be disabled to preserve record order",getlogger(__PACKAGE__));
#}
@subscriber_filenames = _get_import_filenames(\@subscriber_filenames,$data,'subscriber_filenames');
$subscriber_import_numofthreads = _get_numofthreads($cpucount,$data,'subscriber_import_numofthreads');
$ignore_subscriber_unique = $data->{ignore_subscriber_unique} if exists $data->{ignore_subscriber_unique};
$subscriber_import_single_row_txn = $data->{subscriber_import_single_row_txn} if exists $data->{subscriber_import_single_row_txn};
$provision_subscriber_rownum_start = $data->{provision_subscriber_rownum_start} if exists $data->{provision_subscriber_rownum_start};
$provision_subscriber_multithreading = $data->{provision_subscriber_multithreading} if exists $data->{provision_subscriber_multithreading};
$provision_subscriber_numofthreads = _get_numofthreads($cpucount,$data,'provision_subscriber_numofthreads');
$webpassword_length = $data->{webpassword_length} if exists $data->{webpassword_length};
if (not defined $webpassword_length or $webpassword_length <= 7) {
configurationerror($configfile,'webpassword_length greater than 7 required',getlogger(__PACKAGE__));
$result = 0;
}
$webusername_length = $data->{webusername_length} if exists $data->{webusername_length};
if (not defined $webusername_length or $webusername_length <= 7) {
configurationerror($configfile,'webusername_length greater than 7 required',getlogger(__PACKAGE__));
$result = 0;
}
$sippassword_length = $data->{sippassword_length} if exists $data->{sippassword_length};
if (not defined $sippassword_length or $sippassword_length <= 7) {
configurationerror($configfile,'sippassword_length greater than 7 required',getlogger(__PACKAGE__));
$result = 0;
}
#$default_channels = $data->{default_channels} if exists $data->{default_channels};
$default_domain = $data->{default_domain} if exists $data->{default_domain};
$default_reseller_name = $data->{default_reseller_name} if exists $data->{default_reseller_name};
$default_billing_profile_name = $data->{default_billing_profile_name} if exists $data->{default_billing_profile_name};
$default_barring = $data->{default_barring} if exists $data->{default_barring};
$cc_ac_map_yml = $data->{cc_ac_map_yml} if exists $data->{cc_ac_map_yml};
$default_cc = $data->{default_cc} if exists $data->{default_cc};
$barring_profiles_yml = $data->{barring_profiles_yml} if exists $data->{barring_profiles_yml};
return $result;
}
return 0;
}
sub update_cc_ac_map {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
eval {
$cc_ac_map = $data;
};
if ($@ or 'HASH' ne ref $cc_ac_map) {
$cc_ac_map //= {};
configurationerror($configfile,'invalid cc ac map',getlogger(__PACKAGE__));
$result = 0;
} else {
foreach my $cc (keys %$cc_ac_map) {
my $ac_map = $cc_ac_map->{$cc};
$cc_len_min = length($cc) if length($cc) < $cc_len_min;
$cc_len_max = length($cc) if length($cc) > $cc_len_max;
$ac_len->{$cc} = { min => ~0, max => 0, };
if ('HASH' ne ref $ac_map) {
configurationerror($configfile,"invalid $cc ac map",getlogger(__PACKAGE__));
$result = 0;
} else {
foreach my $ac (keys %$ac_map) {
if ($ac_map->{$ac}) { # ac enabled
$ac_len->{$cc}->{min} = length($ac) if length($ac) < $ac_len->{$cc}->{min};
$ac_len->{$cc}->{max} = length($ac) if length($ac) > $ac_len->{$cc}->{max};
} else {
delete $ac_map->{$ac};
}
}
}
}
}
return $result;
}
return 0;
}
sub update_barring_profiles {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
eval {
$barring_profiles = $data; #->{'mapping'};
};
if ($@ or 'HASH' ne ref $barring_profiles or (scalar keys %$barring_profiles) == 0) {
$barring_profiles //= {};
configurationerror($configfile,'no barring mappings found',getlogger(__PACKAGE__));
$result = 0;
}
return $result;
}
return 0;
}
sub _prepare_working_paths {
my ($create) = @_;
my $result = 1;
my $path_result;
($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
return $result;
}
sub _get_numofthreads {
my ($default_value,$data,$key) = @_;
my $numofthreads = $default_value;
$numofthreads = $data->{$key} if exists $data->{$key};
$numofthreads = $cpucount if $numofthreads > $cpucount;
return $numofthreads;
}
sub _get_import_db_file {
my ($run,$name) = @_;
return ((defined $run and length($run) > 0) ? $run . '_' : '') . $name;
}
sub _get_import_filenames {
my ($old_value,$data,$key) = @_;
my @import_filenames = @$old_value;
@import_filenames = split_tuple($data->{$key}) if exists $data->{$key};
my @result = ();
foreach my $import_filename (@import_filenames) {
if (defined $import_filename and length($import_filename) > 0) {
$import_filename = $input_path . $import_filename unless -e $import_filename;
push(@result,$import_filename);
}
}
return @result;
}
sub check_dry {
if ($dry) {
scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__));
return 1;
} else {
scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__));
if (!$force) {
if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) {
return 1;
} else {
return 0;
}
} else {
scriptinfo('force option applied',getlogger(__PACKAGE__));
return 1;
}
}
}
1;

File diff suppressed because it is too large Load Diff

@ -0,0 +1,61 @@
##general settings:
working_path = /home/rkrenn/temp/upcat
cpucount = 4
enablemultithreading = 0
##gearman/service listener config:
jobservers = 127.0.0.1:4730
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 192.168.0.29
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = 192.168.0.29
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = 192.168.0.29
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = 192.168.0.29
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = 192.168.0.29
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = DEBUG
screenloglevel = INFO
#INFO
emailloglevel = OFF

@ -0,0 +1,61 @@
##general settings:
working_path = /home/rkrenn/temp/upcat
cpucount = 4
enablemultithreading = 0
##gearman/service listener config:
jobservers = 127.0.0.1:4730
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 192.168.0.29
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = 192.168.0.29
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = 192.168.0.29
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = 192.168.0.29
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = 192.168.0.29
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = DEBUG
screenloglevel = INFO
#INFO
emailloglevel = OFF

@ -0,0 +1,377 @@
use strict;
## no critic
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../');
use Getopt::Long qw(GetOptions);
use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Settings qw(
update_settings
update_cc_ac_map
update_barring_profiles
check_dry
$output_path
$defaultsettings
$defaultconfig
$dry
$skip_errors
$force
$run_id
@subscriber_filenames
$cc_ac_map_yml
$barring_profiles_yml
);
#$allowed_ips
use NGCP::BulkProcessor::Logging qw(
init_log
getlogger
$attachmentlogfile
scriptinfo
cleanuplogfiles
$currentlogfile
);
use NGCP::BulkProcessor::LogError qw (
completion
done
scriptwarn
scripterror
filewarn
fileerror
);
use NGCP::BulkProcessor::LoadConfig qw(
load_config
$SIMPLE_CONFIG_TYPE
$YAML_CONFIG_TYPE
$ANY_CONFIG_TYPE
);
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
);
use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs);
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles);
use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles);
use NGCP::BulkProcessor::Projects::Migration::UPCAT::ProjectConnectorPool qw(destroy_all_dbs);
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw();
#use NGCP::BulkProcessor::Dao::Trunk::kamailio::location qw();
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Check qw(
check_billing_db_tables
check_provisioning_db_tables
check_kamailio_db_tables
check_import_db_tables
check_rest_get_items
);
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Import qw(
import_subscriber
);
use NGCP::BulkProcessor::Projects::Migration::UPCAT::Provisioning qw(
provision_subscribers
);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
my @TASK_OPTS = ();
my $tasks = [];
my $check_task_opt = 'check';
push(@TASK_OPTS,$check_task_opt);
my $cleanup_task_opt = 'cleanup';
push(@TASK_OPTS,$cleanup_task_opt);
my $cleanup_all_task_opt = 'cleanup_all';
push(@TASK_OPTS,$cleanup_all_task_opt);
my $import_subscriber_task_opt = 'import_subscriber';
push(@TASK_OPTS,$import_subscriber_task_opt);
my $import_truncate_subscriber_task_opt = 'truncate_subscriber';
push(@TASK_OPTS,$import_truncate_subscriber_task_opt);
my $create_subscriber_task_opt = 'create_subscriber';
push(@TASK_OPTS,$create_subscriber_task_opt);
if (init()) {
main();
exit(0);
} else {
exit(1);
}
sub init {
my $configfile = $defaultconfig;
my $settingsfile = $defaultsettings;
return 0 unless GetOptions(
"config=s" => \$configfile,
"settings=s" => \$settingsfile,
"task=s" => $tasks,
"run=s" => \$run_id,
"dry" => \$dry,
"skip-errors" => \$skip_errors,
"force" => \$force,
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
$tasks = removeduplicates($tasks,1);
my $result = load_config($configfile);
init_log();
$result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE);
$result &= load_config($cc_ac_map_yml,\&update_cc_ac_map,$YAML_CONFIG_TYPE);
$result &= load_config($barring_profiles_yml,\&update_barring_profiles,$YAML_CONFIG_TYPE);
return $result;
}
sub main() {
my @messages = ();
my @attachmentfiles = ();
my $result = 1;
my $completion = 0;
if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) {
scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors;
foreach my $task (@$tasks) {
if (lc($check_task_opt) eq lc($task)) {
$result &= check_task(\@messages) if taskinfo($check_task_opt,$result);
} elsif (lc($cleanup_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result);
} elsif (lc($cleanup_all_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result);
} elsif (lc($import_subscriber_task_opt) eq lc($task)) {
$result &= import_subscriber_task(\@messages) if taskinfo($import_subscriber_task_opt,$result);
} elsif (lc($import_truncate_subscriber_task_opt) eq lc($task)) {
$result &= import_truncate_subscriber_task(\@messages) if taskinfo($import_truncate_subscriber_task_opt,$result);
} elsif (lc($create_subscriber_task_opt) eq lc($task)) {
if (taskinfo($create_subscriber_task_opt,$result,1)) {
next unless check_dry();
$result &= create_subscriber_task(\@messages);
$completion |= 1;
}
} else {
$result = 0;
scripterror("unknow task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
last;
}
}
} else {
$result = 0;
scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
}
push(@attachmentfiles,$attachmentlogfile);
if ($completion) {
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
} else {
done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
}
return $result;
}
sub taskinfo {
my ($task,$result) = @_;
scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath()));
return $result;
}
sub check_task {
my ($messages) = @_;
my @check_messages = ();
my $result = check_billing_db_tables(\@check_messages);
#$result &= ..
push(@$messages,join("\n",@check_messages));
@check_messages = ();
$result = check_provisioning_db_tables(\@check_messages);
#$result &= ..
push(@$messages,join("\n",@check_messages));
@check_messages = ();
$result = check_kamailio_db_tables(\@check_messages);
#$result &= ..
push(@$messages,join("\n",@check_messages));
@check_messages = ();
$result = check_rest_get_items(\@check_messages);
#$result &= ..
push(@$messages,join("\n",@check_messages));
@check_messages = ();
$result = check_import_db_tables(\@check_messages);
#$result &= ..
push(@$messages,join("\n",@check_messages));
destroy_all_dbs();
return $result;
}
sub cleanup_task {
my ($messages,$clean_generated) = @_;
my $result = 0;
if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) {
eval {
cleanupcvsdirs() if $clean_generated;
cleanupdbfiles() if $clean_generated;
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
cleanupmsgfiles(\&fileerror,\&filewarn);
cleanupcertfiles();
cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
$result = 1;
};
}
if ($@ or !$result) {
push(@$messages,'working directory cleanup INCOMPLETE');
return 0;
} else {
push(@$messages,'working directory folders cleaned up');
return 1;
}
}
sub import_subscriber_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = import_subscriber(@subscriber_filenames);
};
my $err = $@;
my $stats = ": $warning_count warnings";
eval {
$stats .= "\n total subscriber records: " .
NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_ccacsn() . ' rows';
my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_delta(
$NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::added_delta
);
$stats .= "\n new: $added_count rows";
my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_delta(
$NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::updated_delta
);
$stats .= "\n existing: $existing_count rows";
my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_delta(
$NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::deleted_delta
);
$stats .= "\n removed: $deleted_count rows";
};
if ($err or !$result) {
push(@$messages,"importing subscribers INCOMPLETE$stats");
} else {
push(@$messages,"importing subscribers completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
}
sub import_truncate_subscriber_task {
my ($messages) = @_;
my $result = 0;
eval {
$result = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::create_table(1);
};
my $err = $@;
my $stats = '';
eval {
$stats .= "\n total subscriber records: " .
NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::Subscriber::countby_dn() . ' rows';
};
if ($err or !$result) {
push(@$messages,"truncating imported subscribers INCOMPLETE$stats");
} else {
push(@$messages,"truncating imported subscribers completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
}
sub create_subscriber_task {
my ($messages) = @_;
my ($result,$warning_count,$nonunique_contacts) = (0,0,{});
eval {
($result,$warning_count,$nonunique_contacts) = provision_subscribers();
};
my $err = $@;
my $stats = ": $warning_count warnings";
eval {
$stats .= "\n total contracts: " .
NGCP::BulkProcessor::Dao::Trunk::billing::contracts::countby_status_resellerid(undef,undef) . ' rows';
$stats .= "\n total subscribers: " .
NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::countby_status_resellerid(undef,undef) . ' rows';
$stats .= "\n total aliases: " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::countby_subscriberidisprimary(undef,undef) . ' rows';
$stats .= "\n primary aliases: " .
NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::countby_subscriberidisprimary(undef,1) . ' rows';
#$stats .= "\n call forwards: " .
# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::countby_subscriberid_type(undef,undef) . ' rows';
#$stats .= "\n registrations: " .
# NGCP::BulkProcessor::Dao::Trunk::kamailio::location::countby_usernamedomain(undef,undef) . ' rows';
#$stats .= "\n trusted sources: " .
# NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources::countby_subscriberid(undef) . ' rows';
$stats .= "\n non-unique contacts skipped:\n " . join("\n ",keys %$nonunique_contacts)
if (scalar keys %$nonunique_contacts) > 0;
};
if ($err or !$result) {
push(@$messages,"create subscribers INCOMPLETE$stats");
} else {
push(@$messages,"create subscribers completed$stats");
#if (not $dry) {
# push(@$messages,"YOU MIGHT WANT TO RESTART KAMAILIO FOR PERMANENT REGISTRATIONS TO COME INTO EFFECT");
#}
}
destroy_all_dbs();
return $result;
}
#END {
# # this should not be required explicitly, but prevents Log4Perl's
# # "rootlogger not initialized error upon exit..
# destroy_all_dbs
#}
__DATA__
This exists to allow the locking code at the beginning of the file to work.
DO NOT REMOVE THESE LINES!

@ -0,0 +1,28 @@
#dry=0
#skip_errors=0
import_multithreading = 0
#subscriber_filenames = /home/rkrenn/temp/upcat/export1.csv,/home/rkrenn/temp/upcat/export2.csv
subscriber_filenames = /home/rkrenn/temp/upcat/ExportTestMigration.csv
subscriber_import_numofthreads = 2
ignore_subscriber_unique = 0
subscriber_import_single_row_txn = 1
provision_subscriber_rownum_start = 2
provision_subscriber_multithreading = 1
provision_subscriber_numofthreads = 2
webpassword_length = 8
webusername_length = 8
sippassword_length = 16
report_filename = provision.txt
#report_filename = provision_%s.json
#default_channels = 1
default_domain = d20.upc.at
default_reseller_name = default
default_billing_profile_name = Default Billing Profile
default_barring =
cc_ac_map_yml = cc_ac.yml
default_cc = 43
barring_profiles_yml = barring_profiles.yml

@ -0,0 +1,28 @@
#dry=0
#skip_errors=0
import_multithreading = 0
#subscriber_filenames = /home/rkrenn/temp/upcat/export1.csv,/home/rkrenn/temp/upcat/export2.csv
subscriber_filenames = /home/rkrenn/temp/upcat/ExportTestMigration.csv
subscriber_import_numofthreads = 2
ignore_subscriber_unique = 0
subscriber_import_single_row_txn = 1
provision_subscriber_rownum_start = 2
provision_subscriber_multithreading = 1
provision_subscriber_numofthreads = 2
webpassword_length = 8
webusername_length = 8
sippassword_length = 16
report_filename = provision.txt
#report_filename = provision_%s.json
#default_channels = 1
default_domain = d20.upc.at
default_reseller_name = default
default_billing_profile_name = Default Billing Profile
default_barring =
cc_ac_map_yml = cc_ac.yml
default_cc = 43
barring_profiles_yml = barring_profiles.yml
Loading…
Cancel
Save