TT#127150 contract/susbcriber exporter: sqlite tabular export

Change-Id: I88ad999230421b7aa3c17a99c5783d047c621473
(cherry picked from commit 94e086773b)
mr7.5.5
Rene Krenn 5 years ago
parent 22ee6eb391
commit 2f9039f38b

@ -24,6 +24,7 @@ use NGCP::BulkProcessor::SqlRecord qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::billing_mappings qw();
use NGCP::BulkProcessor::Dao::Trunk::billing::billing_profiles qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
@ -253,7 +254,8 @@ sub process_records {
$uninit_process_context_code,
$multithreading,
$numofthreads,
$load_recursive) = @params{qw/
$load_recursive,
$blocksize) = @params{qw/
process_code
static_context
init_process_context_code
@ -261,6 +263,7 @@ sub process_records {
multithreading
numofthreads
load_recursive
blocksize
/};
check_table();
@ -278,6 +281,7 @@ sub process_records {
destroy_reader_dbs_code => \&destroy_dbs,
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
blocksize => $blocksize,
);
}
@ -337,6 +341,8 @@ sub buildrecords_fromrows {
$record = __PACKAGE__->new($row);
# transformations go here ...
$record->load_relation($load_recursive,'voip_subscribers','NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers::findby_contractid',$record->{id},$load_recursive);
$record->load_relation($load_recursive,'contact','NGCP::BulkProcessor::Dao::Trunk::billing::contacts::findby_id',$record->{contact_id},$load_recursive);
push @records,$record;
}

@ -25,6 +25,7 @@ our @EXPORT_OK = qw(
gettablename
check_table
findby_id
findby_resellerid
findby_resellerid_level
findby_resellername_level
@ -61,6 +62,23 @@ sub new {
}
sub findby_id {
my ($ncos_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($ncos_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub findby_resellerid_level {
my ($reseller_id,$level,$load_recursive) = @_;

@ -30,6 +30,7 @@ our @EXPORT_OK = qw(
insert_row
update_row
findby_id
findby_subscriberid
forupdate_cc_ac_sn_subscriberid
release_subscriber_numbers
@ -73,6 +74,22 @@ sub new {
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub findby_subscriberid {

@ -34,6 +34,8 @@ our @EXPORT_OK = qw(
update_row
delete_row
findby_id
findby_contractid
findby_domainid_username_states
countby_status_resellerid
process_records
@ -81,6 +83,23 @@ sub new {
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub findby_domainid_usernames {
my ($xa_db,$domain_id,$usernames,$load_recursive) = @_;
@ -153,6 +172,23 @@ sub findby_domainid_username_states {
}
sub findby_contractid {
my ($contract_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('contract_id') . ' = ?';
my @params = ($contract_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_contractid_states {
my ($xa_db,$contract_id,$states,$load_recursive) = @_;
@ -454,6 +490,9 @@ sub buildrecords_fromrows {
$record = __PACKAGE__->new($row);
# transformations go here ...
$record->load_relation($load_recursive,'domain','NGCP::BulkProcessor::Dao::Trunk::billing::domains::findby_id',$record->{domain_id},$load_recursive);
$record->load_relation($load_recursive,'primary_number','NGCP::BulkProcessor::Dao::Trunk::billing::voip_numbers::findby_id',$record->{primary_number_id},$load_recursive);
$record->load_relation($load_recursive,'provisioning_voip_subscriber','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers::findby_uuid',undef,$record->{uuid},$load_recursive);
push @records,$record;
}

@ -25,6 +25,7 @@ our @EXPORT_OK = qw(
gettablename
check_table
findby_mailboxuser
insert_row
);
@ -64,6 +65,23 @@ sub new {
}
sub findby_mailboxuser {
my ($mailboxuser,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('mailboxuser') . ' = ?';
my @params = ($mailboxuser);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_row {
my $db = &$get_db();

@ -25,6 +25,7 @@ our @EXPORT_OK = qw(
gettablename
check_table
findby_customerid
insert_row
);
@ -77,6 +78,23 @@ sub new {
}
sub findby_customerid {
my ($uuid,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('customer_id') . ' = ?';
my @params = ($uuid);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_row {
my $db = &$get_db();
@ -132,6 +150,7 @@ sub buildrecords_fromrows {
$record = __PACKAGE__->new($row);
# transformations go here ...
$record->load_relation($load_recursive,'voicemail_spool','NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_spool::findby_mailboxuser',$record->{mailbox},$load_recursive);
push @records,$record;
}

@ -58,6 +58,23 @@ sub new {
}
sub findby_group_id {
my ($group_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('group_id') . ' = ?';
my @params = ($group_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_rows {
my ($xa_db,$group_id,$ipnets) = @_;

@ -27,7 +27,7 @@ our @EXPORT_OK = qw(
gettablename
check_table
findby_destinationsetid
insert_row
);
@ -59,7 +59,22 @@ sub new {
}
sub findby_destinationsetid {
my ($destination_set_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('destination_set_id') . ' = ?';
my @params = ($destination_set_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_row {

@ -27,6 +27,7 @@ our @EXPORT_OK = qw(
gettablename
check_table
findby_id
countby_subscriberid_type
$CFB_TYPE
$CFT_TYPE
@ -71,6 +72,23 @@ sub new {
}
sub findby_id {
my ($id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub countby_subscriberid_type {
my ($subscriber_id,$type,$load_recursive) = @_;
@ -194,6 +212,7 @@ sub buildrecords_fromrows {
$record = __PACKAGE__->new($row);
# transformations go here ...
$record->load_relation($load_recursive,'destinations','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_destinations::findby_destinationsetid',$record->{destination_set_id},$load_recursive);
push @records,$record;
}

@ -33,6 +33,7 @@ our @EXPORT_OK = qw(
findby_domainid_username
countby_subscriberidisprimary
findby_subscriberidisprimary
findby_subscriberid
);
my $tablename = 'voip_dbaliases';
@ -64,6 +65,22 @@ sub new {
}
sub findby_subscriberid {
my ($subscriber_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('subscriber_id') . ' = ?';
my @params = ($subscriber_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_subscriberid_username {

@ -26,7 +26,7 @@ our @EXPORT_OK = qw(
gettablename
check_table
source_findby_subscriberid
findby_subscriberid
);
my $tablename = 'voip_fax_destinations';
@ -61,6 +61,23 @@ sub new {
}
sub findby_subscriberid {
my ($subscriber_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('subscriber_id') . ' = ?';
my @params = ($subscriber_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub insert_row {

@ -26,6 +26,7 @@ our @EXPORT_OK = qw(
gettablename
check_table
findby_subscriberid
insert_row
);
@ -62,6 +63,24 @@ sub new {
}
sub findby_subscriberid {
my ($subscriber_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('subscriber_id') . ' = ?';
my @params = ($subscriber_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub insert_row {
my $db = &$get_db();

@ -24,6 +24,7 @@ our @EXPORT_OK = qw(
findby_attribute
findall
findby_id
$ALLOWED_CLIS_ATTRIBUTE
$CLI_ATTRIBUTE
@ -33,7 +34,6 @@ our @EXPORT_OK = qw(
$NCOS_ID_ATTRIBUTE
$ADM_NCOS_ID_ATTRIBUTE
$ADM_CF_NCOS_ID_ATTRIBUTE
$GPPx_ATTRIBUTE
%DPID_ATTRIBUTES
@ -78,6 +78,7 @@ our @EXPORT_OK = qw(
$BOOLEAN_DATA_TYPE
);
#$FORCE_OUTBOUND_CALLS_TO_PEER
#$ADM_CF_NCOS_ID_ATTRIBUTE
my $tablename = 'voip_preferences';
my $get_db = \&get_provisioning_db;
@ -113,7 +114,7 @@ our $ACCOUNT_ID_ATTRIBUTE = 'account_id';
our $NCOS_ID_ATTRIBUTE = 'ncos_id';
our $ADM_NCOS_ID_ATTRIBUTE = 'adm_ncos_id';
our $ADM_CF_NCOS_ID_ATTRIBUTE = 'adm_ncos_id';
#our $ADM_CF_NCOS_ID_ATTRIBUTE = 'adm_cf_ncos_id';
our $GPPx_ATTRIBUTE = 'gpp';
our %DPID_ATTRIBUTES = map { 'rewrite_' . $_ => $_; } @NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_rewrite_rule_sets::DPID_FIELDS;
@ -132,7 +133,7 @@ our $CONCURRENT_MAX_TOTAL_ATTRIBUTE = 'concurrent_max_total';
our $CONCURRENT_MAX_PER_ACCOUNT_ATTRIBUTE = 'concurrent_max_per_account';
our $CLIR_ATTRIBUTE = 'clir';
our @CF_ATTRIBUTES = qw(cfu cft cfna cfb); #skip sms for now
our @CF_ATTRIBUTES = qw(cfu cft cfna cfb cfo cfr cfs);
our $RINGTIMEOUT_ATTRIBUTE = 'ringtimeout';
@ -190,6 +191,23 @@ sub findby_attribute {
}
sub findby_id {
my ($attribute_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' .
$db->columnidentifier('id') . ' = ?';
my @params = ($attribute_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub findall {
my ($load_recursive) = @_;

@ -228,7 +228,14 @@ sub buildrecords_fromrows {
$record = __PACKAGE__->new($row);
# transformations go here ...
$record->load_relation($load_recursive,'voip_dbaliases','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases::findby_subscriberid',$record->{id},$load_recursive);
$record->load_relation($load_recursive,'voip_usr_preferences','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_usr_preferences::findby_subscriberid',$record->{id},$load_recursive);
$record->load_relation($load_recursive,'voicemail_users','NGCP::BulkProcessor::Dao::Trunk::kamailio::voicemail_users::findby_customerid',$record->{uuid},$load_recursive);
$record->load_relation($load_recursive,'voip_fax_preferences','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_fax_preferences::findby_subscriberid',$record->{id},$load_recursive);
$record->load_relation($load_recursive,'voip_fax_destinations','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_fax_destinations::findby_subscriberid',$record->{id},$load_recursive);
push @records,$record;
}
}

@ -22,6 +22,9 @@ use NGCP::BulkProcessor::SqlProcessor qw(
);
use NGCP::BulkProcessor::SqlRecord qw();
# required to use the constants:
use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
@ -35,6 +38,7 @@ our @EXPORT_OK = qw(
findby_subscriberid_attributeid
countby_subscriberid_attributeid_value
findby_subscriberid
$TRUE
$FALSE
@ -70,6 +74,24 @@ sub new {
}
sub findby_subscriberid {
my ($subscriber_id,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT v.*,a.attribute FROM ' . $table . ' v JOIN ' .
$db->tableidentifier('voip_preferences') . ' a ON v.attribute_id = a.id WHERE ' .
'v.subscriber_id = ?';
my @params = ($subscriber_id);
my $rows = $db->db_get_all_arrayref($stmt,@params);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_subscriberid_attributeid {
my ($xa_db,$subscriber_id,$attribute_id,$load_recursive) = @_;
@ -227,7 +249,19 @@ sub buildrecords_fromrows {
$record = __PACKAGE__->new($row);
# transformations go here ...
$record->{_attribute} = $row->{attribute};
$record->load_relation($load_recursive,'attribute','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::findby_id',$record->{attribute_id},$load_recursive);
$record->{_attribute} //= $record->{attribute}->{attribute} if exists $record->{attribute};
if ($record->{_attribute}) {
$record->load_relation($load_recursive,'allowed_ips','NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_allowed_ip_groups::findby_group_id',$record->{value},$load_recursive)
if ($record->{_attribute} eq $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ALLOWED_IPS_GRP_ATTRIBUTE
or $record->{_attribute} eq $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::MAN_ALLOWED_IPS_GRP_ATTRIBUTE);
$record->load_relation($load_recursive,'ncos','NGCP::BulkProcessor::Dao::Trunk::billing::ncos_levels::findby_id',$record->{value},$load_recursive)
if ($record->{_attribute} eq $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::NCOS_ID_ATTRIBUTE
or $record->{_attribute} eq $NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::ADM_NCOS_ID_ATTRIBUTE);
$record->load_relation($load_recursive,"cf_mapping",'NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings::findby_id',$record->{value},$load_recursive)
if (grep { $record->{_attribute} eq $_; } @NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_preferences::CF_ATTRIBUTES);
}
push @records,$record;
}
}

@ -141,11 +141,11 @@ our $VERSION = '1.0.1';
our $system_version = $VERSION; #keep this filename-save
our $system_abbreviation = 'bulkprocessor'; #keep this filename-, dbname-save
our $system_instance = 'ngcp'; #'test'; #'2014'; #dbname-save 0-9a-z_
our $system_instance_label = 'some node';
our $system_instance_label;
our $local_ip = get_ipaddress();
our $local_fqdn = get_hostfqdn();
our $application_version = $main::VERSION // $VERSION;
our $application_version = ($main::VERSION // $VERSION);
our $application_path = get_applicationpath();
our $executable_path = $FindBin::Bin . '/';
#my $remotefilesystem = "MSWin32";
@ -275,12 +275,12 @@ our $smtppasswd = 'xyz';
our $sender_address = 'donotreply@sipwise.com';
#service layer:
our @jobservers = ('127.0.0.1:4730');
#our $jobnamespace = $system_abbreviation . '-' . $system_version . '-' . $local_fqdn . '-' . $system_instance;
our $jobnamespace = $system_abbreviation . '-' . $system_version . '-' . $system_instance;
our $jobnamespace = $system_abbreviation;
$jobnamespace .= '-' . $system_version if length($system_version);
$jobnamespace .= '-' . $system_instance if length($system_instance);
# test directory

@ -5,7 +5,6 @@ use strict;
use NGCP::BulkProcessor::Globals qw(
$system_name
$system_version
$system_instance_label
$local_fqdn
$application_version
@ -30,7 +29,8 @@ use NGCP::BulkProcessor::LogError qw(
configurationerror
);
use YAML::XS qw();
use YAML qw();
$YAML::UseCode = 1;
use Config::Any qw();
use NGCP::BulkProcessor::Utils qw(format_number trim);
@ -172,7 +172,7 @@ sub _search_path {
sub _splashinfo {
my ($configfile) = @_;
configurationinfo($system_name . ' ' . $system_version . ' (' . $system_instance_label . ') [' . $local_fqdn . ']',getlogger(__PACKAGE__));
configurationinfo($system_name . (length($system_instance_label) ? ' (' . $system_instance_label . ')' : '') . ' [' . $local_fqdn . ']',getlogger(__PACKAGE__));
configurationinfo('application version: ' . $application_version,getlogger(__PACKAGE__));
configurationinfo('application path: ' . $application_path,getlogger(__PACKAGE__));
configurationinfo('working path: ' . $working_path,getlogger(__PACKAGE__));
@ -304,7 +304,7 @@ sub _parse_yaml_config {
my $config = undef;
eval {
$config = YAML::XS::LoadFile($file);
$config = YAML::LoadFile($file);
};
if ($@) {
configurationerror($file,'parsing yaml format - error: ' . $@,getlogger(__PACKAGE__));

@ -4,7 +4,6 @@ use strict;
## no critic
use NGCP::BulkProcessor::Globals qw(
$system_version
$erroremailrecipient
$warnemailrecipient
$doneemailrecipient
@ -418,7 +417,7 @@ sub restresponseerror {
sub fieldnamesdiffer {
my ($db,$tablename,$expectedfieldnames,$fieldnamesfound,$logger) = @_;
my $message = _getsqlconnectorinstanceprefix($db) . 'wrong table fieldnames (v ' . $system_version . '): [' . $db->connectidentifier() . '].' . $tablename . ":\nexpected: " . ((defined $expectedfieldnames) ? join(', ',@$expectedfieldnames) : '<none>') . "\nfound: " . ((defined $fieldnamesfound) ? join(', ',@$fieldnamesfound) : '<none>');
my $message = _getsqlconnectorinstanceprefix($db) . 'wrong table fieldnames: [' . $db->connectidentifier() . '].' . $tablename . ":\nexpected: " . ((defined $expectedfieldnames) ? join(', ',@$expectedfieldnames) : '<none>') . "\nfound: " . ((defined $fieldnamesfound) ? join(', ',@$fieldnamesfound) : '<none>');
if (defined $logger) {
$logger->error($message);
}

@ -12,7 +12,6 @@ use NGCP::BulkProcessor::Logging qw(
use NGCP::BulkProcessor::Globals qw(
$system_name
$system_instance_label
$system_version
$local_fqdn
$mailfile_path
$emailenable
@ -49,7 +48,7 @@ our @EXPORT_OK = qw(
my $wordwrapcolumns = 72; #linebreak/wrap columns
our $signature = "--\n" . $system_name . ' ' . $system_version . ' (' . $system_instance_label . ")\n[" . $local_fqdn . ']'; # a nice email signature
our $signature = "--\n" . $system_name . (length($system_instance_label) ? ' (' . $system_instance_label . ')' : '') . "\n[" . $local_fqdn . ']'; # a nice email signature
my $msgextension = '.msg';

@ -0,0 +1,312 @@
package NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular;
use strict;
## no critic
use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw(
get_sqlite_db
destroy_all_dbs
);
#import_db_tableidentifier
use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
$tabular_fields
);
use NGCP::BulkProcessor::SqlProcessor qw(
registertableinfo
create_targettable
checktableinfo
copy_row
insert_stmt
);
#process_table
use NGCP::BulkProcessor::SqlRecord qw();
#use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
create_table
gettablename
check_table
getinsertstatement
getupsertstatement
get_fieldnames
update_delta
findby_delta
countby_delta
$deleted_delta
$updated_delta
$added_delta
);
#@fieldnames
#findby_sipusername
#findby_ccacsn
#countby_ccacsn
my $tablename = 'tabular';
my $get_db = \&get_sqlite_db;
#my $get_tablename = \&import_db_tableidentifier;
my $fieldnames;
my $expected_fieldnames;
sub get_fieldnames {
my $expected = shift;
unless (defined $fieldnames and defined $expected_fieldnames) {
$fieldnames = [ map {
local $_ = (ref $_ ? $_->{path} : $_);
$_ =~ s/\./_/g;
$_ =~ s/\[(\d+)\]/_$1/g;
$_;
} @$tabular_fields ];
$expected_fieldnames = [ @$fieldnames ];
push(@$expected_fieldnames,'domain') unless grep { 'domain' eq $_; } @$expected_fieldnames;
push(@$expected_fieldnames,'username') unless grep { 'username' eq $_; } @$expected_fieldnames;
push(@$expected_fieldnames,'delta');
}
return $fieldnames unless $expected;
return $expected_fieldnames;
}
# table creation:
my $primarykey_fieldnames = [ 'domain', 'username' ];
my $indexes = {
#$tablename . '_number' => [ 'number(32)' ],
#$tablename . '_rownum' => [ 'rownum(11)' ],
$tablename . '_delta' => [ 'delta(7)' ],
};
#my $fixtable_statements = [];
our $deleted_delta = 'DELETED';
our $updated_delta = 'UPDATED';
our $added_delta = 'ADDED';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,get_fieldnames(1),$indexes);
copy_row($self,shift,get_fieldnames(1));
return $self;
}
sub create_table {
my ($truncate) = @_;
my $db = &$get_db();
registertableinfo($db,__PACKAGE__,$tablename,get_fieldnames(1),$indexes,$primarykey_fieldnames);
return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef);
}
sub findby_delta {
my ($delta,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return [] unless defined $delta;
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' .
$table .
' WHERE ' .
$db->columnidentifier('delta') . ' = ?'
, $delta);
return buildrecords_fromrows($rows,$load_recursive);
}
sub findby_domainusername {
my ($domain,$username,$load_recursive) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
return [] unless (defined $domain and defined $username);
my $rows = $db->db_get_all_arrayref(
'SELECT * FROM ' . $table .
' WHERE ' . $db->columnidentifier('domain') . ' = ?' .
' AND ' . $db->columnidentifier('username') . ' = ?'
, $domain, $username);
return buildrecords_fromrows($rows,$load_recursive)->[0];
}
sub update_delta {
my ($domain,$username,$delta) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'UPDATE ' . $table . ' SET delta = ?';
my @params = ();
push(@params,$delta);
if (defined $domain or defined $username) {
$stmt .= ' WHERE ' .
$db->columnidentifier('domain') . ' = ?' .
' AND ' . $db->columnidentifier('username') . ' = ?';
push(@params, $domain, $username);
}
return $db->db_do($stmt,@params);
}
sub countby_delta {
my ($deltas) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1';
my @params = ();
if (defined $deltas and 'HASH' eq ref $deltas) {
foreach my $in (keys %$deltas) {
my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in}));
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')';
push(@params,@values);
}
} elsif (defined $deltas and length($deltas) > 0) {
$stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?';
push(@params,$deltas);
}
return $db->db_get_value($stmt,@params);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
#sub process_records {
#
# my %params = @_;
# my ($process_code,
# $static_context,
# $init_process_context_code,
# $uninit_process_context_code,
# $multithreading,
# $numofthreads) = @params{qw/
# process_code
# static_context
# init_process_context_code
# uninit_process_context_code
# multithreading
# numofthreads
# /};
#
# check_table();
# my $db = &$get_db();
# my $table = $db->tableidentifier($tablename);
#
# my @cols = map { $db->columnidentifier($_); } qw/domain sip_username/;
#
# return process_table(
# get_db => $get_db,
# class => __PACKAGE__,
# process_code => sub {
# my ($context,$rowblock,$row_offset) = @_;
# return &$process_code($context,$rowblock,$row_offset);
# },
# static_context => $static_context,
# init_process_context_code => $init_process_context_code,
# uninit_process_context_code => $uninit_process_context_code,
# destroy_reader_dbs_code => \&destroy_all_dbs,
# multithreading => $multithreading,
# tableprocessing_threads => $numofthreads,
# 'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),
# 'selectcount' => 'SELECT COUNT(DISTINCT(' . join(',',@cols) . ')) FROM ' . $table,
# );
#}
sub getinsertstatement {
my ($insert_ignore) = @_;
check_table();
return insert_stmt($get_db,__PACKAGE__,$insert_ignore);
}
sub getupsertstatement {
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' .
join(', ', map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @{get_fieldnames(1)}) . ')';
my @values = ();
foreach my $fieldname (@{get_fieldnames(1)}) {
if ('delta' eq $fieldname) {
my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' .
$db->columnidentifier('domain') . ' = ?' .
' AND ' . $db->columnidentifier('username') . ' = ?';
push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')');
} else {
push(@values,'?');
}
}
$upsert_stmt .= ' VALUES (' . join(',',@values) . ')';
return $upsert_stmt;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
get_fieldnames(1),
$indexes);
}
1;

@ -0,0 +1,426 @@
package NGCP::BulkProcessor::Projects::ETL::Customer::ExportCustomers;
use strict;
## no critic
use threads::shared qw();
use NGCP::BulkProcessor::Serialization qw();
use Scalar::Util 'blessed';
use MIME::Base64 qw(encode_base64);
use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
$dry
$skip_errors
$export_customers_multithreading
$export_customers_numofthreads
$export_customers_blocksize
run_dao_method
get_dao_var
get_export_filename
write_export_file
$customer_export_filename_format
$tabular_fields
$load_recursive
$tabular_single_row_txn
$ignore_tabular_unique
);
use NGCP::BulkProcessor::Logging qw (
getlogger
processing_info
processing_debug
);
use NGCP::BulkProcessor::LogError qw(
rowprocessingerror
rowprocessingwarn
fileerror
);
use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
use NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular qw();
use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw(
get_sqlite_db
destroy_all_dbs
ping_all_dbs
);
use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp stringtobool trim); #check_ipnet
#use NGCP::BulkProcessor::DSSorter qw(sort_by_configs);
#use NGCP::BulkProcessor::Table qw(get_rowhash);
use NGCP::BulkProcessor::Array qw(array_to_map);
use NGCP::BulkProcessor::DSPath qw();
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
export_customers_graph
export_customers_tabular
);
sub export_customers_graph {
my $static_context = {
};
my $result = 1; #_copy_customers_checks($static_context);
destroy_all_dbs();
my $warning_count :shared = 0;
return ($result && run_dao_method('billing::contracts::process_records',
#source_dbs => $static_context->{source_dbs},
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
ping_all_dbs();
my @data = ();
foreach my $record (@$records) {
next unless _export_customer_graph_init_context($context,$record);
push(@data,_get_contract_graph($context->{contract}));
}
write_export_file(\@data,$context->{export_filename},$context->{export_format});
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
#$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
($context->{export_filename},$context->{export_format}) = get_export_filename($customer_export_filename_format);
},
uninit_process_context_code => sub {
my ($context)= @_;
#undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
destroy_reader_dbs_code => \&destroy_all_dbs,
blocksize => $export_customers_blocksize,
multithreading => $export_customers_multithreading,
numofthreads => $export_customers_numofthreads,
),$warning_count,);
}
sub export_customers_tabular {
my $result = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::create_table(0);
my $static_context = {
upsert => _tabular_rows_reset_delta(),
};
destroy_all_dbs();
my $warning_count :shared = 0;
return ($result && run_dao_method('billing::contracts::process_records',
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
ping_all_dbs();
my @subscriber_rows = ();
foreach my $record (@$records) {
next unless _export_customer_tabular_init_context($context,$record);
push(@subscriber_rows, _get_subscriber_rows($context));
if ($tabular_single_row_txn and (scalar @subscriber_rows) > 0) {
while (defined (my $subscriber_row = shift @subscriber_rows)) {
if ($skip_errors) {
eval { _insert_tabular_rows($context,[$subscriber_row]); };
_warn($context,$@) if $@;
} else {
_insert_tabular_rows($context,[$subscriber_row]);
}
}
}
}
if (not $tabular_single_row_txn and (scalar @subscriber_rows) > 0) {
if ($skip_errors) {
eval { insert_tabular_rows($context,\@subscriber_rows); };
_warn($context,$@) if $@;
} else {
insert_tabular_rows($context,\@subscriber_rows);
}
}
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
$context->{db} = &get_sqlite_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
},
uninit_process_context_code => sub {
my ($context)= @_;
undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
}
},
destroy_reader_dbs_code => \&destroy_all_dbs,
blocksize => $export_customers_blocksize,
multithreading => $export_customers_multithreading,
numofthreads => $export_customers_numofthreads,
),$warning_count,);
}
sub _tabular_rows_reset_delta {
my $upsert = 0;
if (NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta() > 0) {
processing_info(threadid(),'resetting delta of ' .
NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::update_delta(undef,
$NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::deleted_delta) .
' records',getlogger(__PACKAGE__));
$upsert |= 1;
}
return $upsert;
}
sub _insert_tabular_rows {
my ($context,$subscriber_rows) = @_;
$context->{db}->db_do_begin(
($context->{upsert} ?
NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::getupsertstatement()
: NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::getinsertstatement($ignore_tabular_unique)),
);
eval {
$context->{db}->db_do_rowblock($subscriber_rows);
$context->{db}->db_finish();
};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_finish(1);
};
die($err);
}
}
sub _export_customer_graph_init_context {
my ($context,$record) = @_;
my $result = 1;
return 0 unless _load_contract($context,$record);
return $result;
}
sub _get_contract_graph {
my ($context) = @_;
#sub unshare {
#
# my ($obj,) = @_;
# my $ref = ref $obj;
# if ("ARRAY" eq $ref) {
# my @array = ();
# my $i = 0;
# foreach my $value (@$obj) {
# push(@array, unshare($value)) if xx;
# $i++;
# }
# return \@array;
# } elsif ($ref eq "HASH") {
# my %hash = ();
# foreach my $key (keys %$obj) {
# $hash{$key} = unshare($obj->{$key}) if xx;
# }
# return \%hash;
# }
#
#}
foreach my $bill_subs (@{$context->{contract}->{voip_subscribers}}) {
($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) =
array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences},
sub { return shift->{attribute}; }, sub { my $p = shift; }, 'group' );
if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) {
foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) {
foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) {
$voicemail->{recording} = encode_base64($voicemail->{recording},'');
}
}
}
my $dp = NGCP::BulkProcessor::DSPath->new($bill_subs, {
retrieve_key_from_non_hash => sub {},
key_does_not_exist => sub {},
index_does_not_exist => sub {},
});
#foreach my $graph_field (@$graph_fields) {
# my $a;
# my $sep = ',';
# if ('HASH' eq ref $tabular_field) {
# $a = $tabular_field->{path};
# $sep = $tabular_field->{sep};
# } else {
# $a = $tabular_field;
# }
# #eval {'' . ($dp->get('.' . $a) // '');}; if($@){
# # my $x=5;
# #}
# my $v = $dp->get('.' . $a);
# if ('ARRAY' eq ref $v) {
# if ('HASH' eq ref $v->[0]) {
# $v = join($sep, sort map { $_->{$tabular_field->{field}}; } @$v);
# } else {
# $v = join($sep, sort @$v);
# }
# } else {
# $v = '' . ($v // '');
# }
# push(@row,$v);
#}
}
}
sub _export_customer_tabular_init_context {
my ($context,$record) = @_;
my $result = 1;
return 0 unless _load_contract($context,$record);
if (not scalar @{$context->{contract}->{voip_subscribers}}) {
_info($context,"contract ID $record->{id} has no subscribers, skipping",1);
$result = 0;
}
return $result;
}
sub _get_subscriber_rows {
my ($context) = @_;
my @rows = ();
foreach my $bill_subs (@{$context->{contract}->{voip_subscribers}}) {
my @row = ();
$bill_subs->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts->new($context->{contract}); #no circular ref
($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) =
array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences},
sub { return shift->{_attribute}; }, sub { my $p = shift; }, 'group' );
if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) {
foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) {
foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) {
$voicemail->{recording} = encode_base64($voicemail->{recording},'');
}
}
}
my $dp = NGCP::BulkProcessor::DSPath->new($bill_subs, {
retrieve_key_from_non_hash => sub {},
key_does_not_exist => sub {},
index_does_not_exist => sub {},
});
foreach my $tabular_field (@$tabular_fields) {
my $a;
my $sep = ',';
if ('HASH' eq ref $tabular_field) {
$a = $tabular_field->{path};
$sep = $tabular_field->{sep};
} else {
$a = $tabular_field;
}
#eval {'' . ($dp->get('.' . $a) // '');}; if($@){
# my $x=5;
#}
my $v = $dp->get('.' . $a);
if ('ARRAY' eq ref $v) {
if ('HASH' eq ref $v->[0]
or (blessed($v->[0]) and $v->[0]->isa('NGCP::BulkProcessor::SqlRecord'))) {
$v = join($sep, sort map { $_->{$tabular_field->{field}}; } @$v);
} else {
$v = join($sep, sort @$v);
}
} else {
$v = '' . ($v // '');
}
push(@row,$v);
}
push(@row,$bill_subs->{domain}->{domain}) unless grep { 'domain' eq $_; } @{NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::get_fieldnames()};
push(@row,$bill_subs->{username}) unless grep { 'username' eq $_; } @{NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::get_fieldnames()};
if ($context->{upsert}) {
push(@row,$bill_subs->{domain}->{domain},$bill_subs->{username});
} else {
push(@row,$NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::added_delta);
}
push(@rows,\@row);
}
return @rows;
}
sub _load_contract {
my ($context,$record) = @_;
$context->{contract} = run_dao_method('billing::contracts::findby_id', $record->{id}, { %$load_recursive,
'contracts.voip_subscribers.domain' => 1,
_context => {
_info => \&_info,
_error => \&_error,
_debug => \&_debug,
_warn => \&_warn,
context => $context,
},
});
return 1 if $context->{contract};
return 0;
}
sub _error {
my ($context,$message) = @_;
$context->{error_count} = $context->{error_count} + 1;
rowprocessingerror($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
}
sub _warn {
my ($context,$message) = @_;
$context->{warning_count} = $context->{warning_count} + 1;
rowprocessingwarn($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
}
sub _info {
my ($context,$message,$debug) = @_;
if ($debug) {
processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
} else {
processing_info($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
}
}
sub _debug {
my ($context,$message,$debug) = @_;
processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__));
}
1;

@ -0,0 +1,84 @@
package NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool;
use strict;
## no critic
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../');
use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
$sqlite_db_file
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_connectorinstancename
);
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode);
use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
get_sqlite_db
sqlite_db_tableidentifier
destroy_dbs
destroy_all_dbs
ping_all_dbs
);
my $sqlite_dbs = {};
sub get_sqlite_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name); #threadid(); #shift;
if (not defined $sqlite_dbs->{$name}) {
$sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); #$name);
if (not defined $reconnect) {
$reconnect = 1;
}
}
if ($reconnect) {
$sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file);
}
return $sqlite_dbs->{$name};
}
sub sqlite_db_tableidentifier {
my ($get_target_db,$tablename) = @_;
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file));
}
sub destroy_dbs {
foreach my $name (keys %$sqlite_dbs) {
cleartableinfo($sqlite_dbs->{$name});
undef $sqlite_dbs->{$name};
delete $sqlite_dbs->{$name};
}
}
sub destroy_all_dbs() {
destroy_dbs();
NGCP::BulkProcessor::ConnectorPool::destroy_dbs();
}
sub ping_all_dbs() {
NGCP::BulkProcessor::ConnectorPool::ping_dbs();
}
1;

@ -0,0 +1,444 @@
package NGCP::BulkProcessor::Projects::ETL::Customer::Settings;
use strict;
## no critic
use threads::shared qw();
use File::Basename qw(fileparse);
use NGCP::BulkProcessor::Serialization qw();
use DateTime::TimeZone qw();
use JSON -support_by_pp, -no_export;
*NGCP::BulkProcessor::Serialization::serialize_json = sub {
my $input_ref = shift;
#return JSON::XS::encode_json($input_ref);
return JSON::to_json($input_ref, { allow_nonref => 1, allow_blessed => 1, convert_blessed => 1, pretty => 1, as_nonblessed => 1 });
};
use NGCP::BulkProcessor::Globals qw(
$working_path
$enablemultithreading
$cpucount
create_path
);
use NGCP::BulkProcessor::Logging qw(
getlogger
scriptinfo
configurationinfo
);
use NGCP::BulkProcessor::LogError qw(
fileerror
filewarn
configurationwarn
configurationerror
);
use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(prompt timestampdigits threadid load_module);
#format_number
use NGCP::BulkProcessor::Array qw(contains);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
update_settings
run_dao_method
get_dao_var
get_export_filename
write_export_file
write_sql_file
update_load_recursive
$load_recursive_yml
$load_recursive
update_tabular_fields
$tabular_fields_yml
$tabular_fields
$ignore_tabular_unique
$tabular_single_row_txn
$sqlite_db_file
check_dry
$output_path
$input_path
$customer_export_filename_format
$customer_import_filename
$split_customers
$defaultsettings
$defaultconfig
$dry
$skip_errors
$force
$export_customers_multithreading
$export_customers_numofthreads
$export_customers_blocksize
$cf_default_priority
$cf_default_timeout
$cft_default_ringtimeout
$rollback_sql_export_filename_format
$rollback_sql_stmt_format
);
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
our $tabular_fields_yml = 'tabular_fields.yml';
our $tabular_fields = [];
our $ignore_tabular_unique = 0;
our $tabular_single_row_txn = 1;
our $load_recursive_yml = 'load_recursive_yml.yml';
our $load_recursive;
our $output_path = $working_path . 'output/';
our $input_path = $working_path . 'input/';
our $customer_export_filename_format = undef;
our $customer_import_filename = undef;
our $customer_import_numofthreads = $cpucount;
our $customer_import_multithreading = 1;
our $customer_reseller_name = 'default';
our $customer_billing_profile_name = 'Default Billing Profile';
our $customer_domain = undef;
our $customer_contact_email_format = '%s@example.org';
our $subscriber_contact_email_format = '%s@example.org';
our $split_customers = 0;
our $subscriber_timezone = undef;
our $contract_timezone = undef;
our $subscriber_profile_set_name = undef;
our $subscriber_profile_name = undef;
our $webusername_format = '%1$s';
our $subscriber_externalid_format = undef;
our $force = 0;
our $dry = 0;
our $skip_errors = 0;
my $mr = 'Trunk';
my @supported_mr = ('Trunk');
our $sqlite_db_file = 'sqlite';
our $export_customers_multithreading = $enablemultithreading;
our $export_customers_numofthreads = $cpucount;
our $export_customers_blocksize = 1000;
our $cf_default_priority = 1;
our $cf_default_timeout = 300;
our $cft_default_ringtimeout = 20;
our $rollback_sql_export_filename_format = undef;
our $rollback_sql_stmt_format = undef;
my $file_lock :shared = undef;
sub update_settings {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
my $regexp_result;
#&$configurationinfocode("testinfomessage",$configlogger);
$result &= _prepare_working_paths(1);
$customer_export_filename_format = $data->{customer_export_filename} if exists $data->{customer_export_filename};
get_export_filename($data->{customer_export_filename},$configfile);
$rollback_sql_export_filename_format = $data->{rollback_sql_export_filename_format} if exists $data->{rollback_sql_export_filename_format};
get_export_filename($data->{rollback_sql_export_filename_format},$configfile);
$rollback_sql_stmt_format = $data->{rollback_sql_stmt_format} if exists $data->{rollback_sql_stmt_format};
$sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file};
$customer_import_filename = _get_import_filename($customer_import_filename,$data,'customer_import_filename');
$customer_import_multithreading = $data->{customer_import_multithreading} if exists $data->{customer_import_multithreading};
$customer_import_numofthreads = _get_numofthreads($cpucount,$data,'customer_import_numofthreads');
$customer_reseller_name = $data->{customer_reseller_name} if exists $data->{customer_reseller_name};
$customer_billing_profile_name = $data->{customer_billing_profile_name} if exists $data->{customer_billing_profile_name};
$customer_domain = $data->{customer_domain} if exists $data->{customer_domain};
$customer_contact_email_format = $data->{customer_contact_email_format} if exists $data->{customer_contact_email_format};
$subscriber_contact_email_format = $data->{subscriber_contact_email_format} if exists $data->{subscriber_contact_email_format};
$split_customers = $data->{split_customers} if exists $data->{split_customers};
$contract_timezone = $data->{customer_timezone} if exists $data->{customer_timezone};
if ($contract_timezone and not DateTime::TimeZone->is_valid_name($contract_timezone)) {
configurationerror($configfile,"invalid customer_timezone '$contract_timezone'");
$result = 0;
}
$subscriber_timezone = $data->{subscriber_timezone} if exists $data->{subscriber_timezone};
if ($subscriber_timezone and not DateTime::TimeZone->is_valid_name($subscriber_timezone)) {
configurationerror($configfile,"invalid subscriber_timezone '$subscriber_timezone'");
$result = 0;
}
$subscriber_profile_set_name = $data->{subscriber_profile_set_name} if exists $data->{subscriber_profile_set_name};
$subscriber_profile_name = $data->{subscriber_profile_name} if exists $data->{subscriber_profile_name};
if ($subscriber_profile_set_name and not $subscriber_profile_name
or not $subscriber_profile_set_name and $subscriber_profile_name) {
configurationerror($configfile,"both subscriber_profile_set_name and subscriber_profile_name required");
$result = 0;
}
$webusername_format = $data->{webusername_format} if exists $data->{webusername_format};
$subscriber_externalid_format = $data->{subscriber_externalid_format} if exists $data->{subscriber_externalid_format};
$dry = $data->{dry} if exists $data->{dry};
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$export_customers_multithreading = $data->{export_customers_multithreading} if exists $data->{export_customers_multithreading};
$export_customers_numofthreads = _get_numofthreads($cpucount,$data,'export_customers_numofthreads');
$export_customers_blocksize = $data->{export_customers_blocksize} if exists $data->{export_customers_blocksize};
$tabular_fields_yml = $data->{tabular_fields_yml} if exists $data->{tabular_fields_yml};
$load_recursive_yml = $data->{load_recursive_yml} if exists $data->{load_recursive_yml};
$tabular_single_row_txn = $data->{tabular_single_row_txn} if exists $data->{tabular_single_row_txn};
$ignore_tabular_unique = $data->{ignore_tabular_unique} if exists $data->{ignore_tabular_unique};
$cf_default_priority = $data->{cf_default_priority} if exists $data->{cf_default_priority};
$cf_default_timeout = $data->{cf_default_timeout} if exists $data->{cf_default_timeout};
$cft_default_ringtimeout = $data->{cft_default_ringtimeout} if exists $data->{cft_default_ringtimeout};
$mr = $data->{schema_version};
if (not defined $mr or not contains($mr,\@supported_mr)) {
configurationerror($configfile,'version must be one of ' . join(', ', @supported_mr));
$result = 0;
}
return $result;
}
return 0;
}
sub run_dao_method {
my $method_name = 'NGCP::BulkProcessor::Dao::' . $mr . '::' . shift;
load_module($method_name);
no strict 'refs';
return $method_name->(@_);
}
sub get_dao_var {
my $var_name = 'NGCP::BulkProcessor::Dao::' . $mr . '::' . shift;
load_module($var_name);
no strict 'refs';
return @{$var_name} if wantarray;
return ${$var_name};
}
sub _prepare_working_paths {
my ($create) = @_;
my $result = 1;
my $path_result;
($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
return $result;
}
sub _get_numofthreads {
my ($default_value,$data,$key) = @_;
my $numofthreads = $default_value;
$numofthreads = $data->{$key} if exists $data->{$key};
$numofthreads = $cpucount if $numofthreads > $cpucount;
return $numofthreads;
}
sub get_export_filename {
my ($filename_format,$configfile) = @_;
my $export_filename;
my $export_format;
if ($filename_format) {
$export_filename = $output_path . sprintf($filename_format,timestampdigits(),threadid());
if (-e $export_filename and (unlink $export_filename) == 0) {
filewarn('cannot remove ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__));
$export_filename = undef;
}
my ($name,$path,$suffix) = fileparse($export_filename,".json",".yml",".yaml",".sql");
if ($suffix eq '.json') {
$export_format = $NGCP::BulkProcessor::Serialization::format_json;
} elsif ($suffix eq '.yml' or $suffix eq '.yaml') {
$export_format = $NGCP::BulkProcessor::Serialization::format_yaml;
} elsif ($suffix eq '.sql') {
} else {
configurationerror($configfile,"$filename_format: either .json or .yaml export file format required");
}
}
return ($export_filename,$export_format);
}
sub write_export_file {
my ($data,$export_filename,$export_format) = @_;
if (defined $export_filename) {
# "concatenated json" https://en.wikipedia.org/wiki/JSON_streaming
my $str = '';
if (ref $data eq 'ARRAY') {
foreach my $obj (@$data) {
#$str .= "\n" if length($str);
$str .= NGCP::BulkProcessor::Serialization::serialize($obj,$export_format);
}
} else {
$str = NGCP::BulkProcessor::Serialization::serialize($data,$export_format);
}
_write_file($str,$export_filename);
}
}
sub write_sql_file {
my ($data,$export_filename,$stmt_format) = @_;
if (defined $export_filename and $stmt_format) {
my $str = '';
if (ref $data eq 'ARRAY') {
foreach my $obj (@$data) {
$str .= "\n" if length($str);
if (ref $obj eq 'ARRAY') {
$str .= sprintf($stmt_format,@$obj);
} else {
$str .= sprintf($stmt_format,$str);
}
}
} else {
$str = sprintf($stmt_format,$data);
}
$str .= "\n";
_write_file($str,$export_filename);
}
}
sub _write_file {
my ($str,$export_filename) = @_;
if (defined $export_filename) {
lock $file_lock;
open(my $fh, '>>', $export_filename) or fileerror('cannot open file ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__));
binmode($fh);
print $fh $str;
close $fh;
}
}
sub update_tabular_fields {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
eval {
$tabular_fields = $data;
};
if ($@ or 'ARRAY' ne ref $tabular_fields) {
$tabular_fields //= [];
configurationerror($configfile,'invalid tabular fields',getlogger(__PACKAGE__));
$result = 0;
}
return $result;
}
return 0;
}
sub update_load_recursive {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
eval {
$load_recursive = $data;
};
if ($@ or 'HASH' ne ref $load_recursive) {
undef $load_recursive;
configurationerror($configfile,'invalid load recursive def',getlogger(__PACKAGE__));
$result = 0;
}
return $result;
}
return 0;
}
sub _get_sqlite_db_file {
my ($run,$name) = @_;
return ((defined $run and length($run) > 0) ? $run . '_' : '') . $name;
}
sub _get_import_filename {
my ($old_value,$data,$key) = @_;
my $import_filename = $old_value;
$import_filename = $data->{$key} if exists $data->{$key};
if (defined $import_filename and length($import_filename) > 0) {
$import_filename = $input_path . $import_filename unless -e $import_filename;
}
return $import_filename;
}
sub check_dry {
if ($dry) {
scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__));
return 1;
} else {
scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__));
if (!$force) {
if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) {
return 1;
} else {
return 0;
}
} else {
scriptinfo('force option applied',getlogger(__PACKAGE__));
return 1;
}
}
}
1;

@ -0,0 +1,61 @@
##general settings:
working_path = /var/sipwise
cpucount = 4
enablemultithreading = 1
##gearman/service listener config:
jobservers = 127.0.0.1:4730
##NGCP MySQL connectivity - "accounting" db:
accounting_host = db01
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = db01
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = db01
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = db01
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = db01
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = INFO
#DEBUG
screenloglevel = INFO
emailloglevel = OFF

@ -0,0 +1,61 @@
##general settings:
working_path = /home/rkrenn/temp/customer_exporter
cpucount = 4
enablemultithreading = 1
##gearman/service listener config:
jobservers = 127.0.0.1:4730
##NGCP MySQL connectivity - "accounting" db:
accounting_host = 192.168.0.133
accounting_port = 3306
accounting_databasename = accounting
accounting_username = root
accounting_password =
##NGCP MySQL connectivity - "billing" db:
billing_host = 192.168.0.133
billing_port = 3306
billing_databasename = billing
billing_username = root
billing_password =
##NGCP MySQL connectivity - "provisioning" db:
provisioning_host = 192.168.0.133
provisioning_port = 3306
provisioning_databasename = provisioning
provisioning_username = root
provisioning_password =
##NGCP MySQL connectivity - "kamailio" db:
kamailio_host = 192.168.0.133
kamailio_port = 3306
kamailio_databasename = kamailio
kamailio_username = root
kamailio_password =
##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to:
xa_host = 192.168.0.133
xa_port = 3306
xa_databasename = ngcp
xa_username = root
xa_password =
##NGCP REST-API connectivity:
ngcprestapi_uri = https://127.0.0.1:1443
ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##sending email:
emailenable = 0
erroremailrecipient =
warnemailrecipient =
completionemailrecipient = rkrenn@sipwise.com
doneemailrecipient =
##logging:
fileloglevel = INFO
#DEBUG
screenloglevel = INFO
emailloglevel = OFF

@ -0,0 +1,41 @@
contracts.voip_subscribers:
include: !!perl/code |
{
my ($contract) = @_;
#return 0 if $contract->{status} eq 'terminated';
return 1;
}
filter: !!perl/code |
{
my ($bill_subs,$context) = @_;
#_debug($context,"skipping terminated subscriber $bill_subs->{username}") if $bill_subs->{status} eq 'terminated';
#return 0 if $bill_subs->{status} eq 'terminated';
return 1;
}
transform: !!perl/code |
{
my ($bill_subs,$context) = @_;
return $bill_subs;
}
#contracts.contact: 1
contracts.voip_subscribers.primary_number: 1
contracts.voip_subscribers.provisioning_voip_subscriber: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_dbaliases: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences: 1
#contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.attribute: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.allowed_ips: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.ncos: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.cf_mapping: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.cf_mapping.destinations: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voicemail_users: 1
#contracts.voip_subscribers.provisioning_voip_subscriber.voicemail_users.voicemail_spool: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_fax_preferences: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_fax_destinations:
transform: !!perl/code |
{
my ($fax_destinations,$context) = @_;
return [ map { $_->{destination} . ' (' . $_->{filetype} . ')'; } @$fax_destinations ];
}

@ -0,0 +1,312 @@
use strict;
## no critic
our $VERSION = "0.0";
use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../');
use Getopt::Long qw(GetOptions);
use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
update_settings
update_tabular_fields
$tabular_fields_yml
update_load_recursive
$load_recursive_yml
check_dry
$output_path
$defaultsettings
$defaultconfig
$dry
$skip_errors
$force
);
use NGCP::BulkProcessor::Logging qw(
init_log
getlogger
$attachmentlogfile
scriptinfo
cleanuplogfiles
$currentlogfile
);
use NGCP::BulkProcessor::LogError qw (
completion
done
scriptwarn
scripterror
filewarn
fileerror
);
use NGCP::BulkProcessor::LoadConfig qw(
load_config
$SIMPLE_CONFIG_TYPE
$YAML_CONFIG_TYPE
$ANY_CONFIG_TYPE
);
use NGCP::BulkProcessor::Array qw(removeduplicates);
use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
);
#use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs);
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles);
#use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles);
use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw(destroy_all_dbs);
#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw();
use NGCP::BulkProcessor::Projects::ETL::Customer::ExportCustomers qw(
export_customers_graph
export_customers_tabular
);
#use NGCP::BulkProcessor::Projects::ETL::Customer::ImportCustomers qw(
# import_customers_json
#);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
my @TASK_OPTS = ();
my $tasks = [];
my $cleanup_task_opt = 'cleanup';
push(@TASK_OPTS,$cleanup_task_opt);
my $cleanup_all_task_opt = 'cleanup_all';
push(@TASK_OPTS,$cleanup_all_task_opt);
#my $export_customers_graph_task_opt = 'export_customers_graph';
#push(@TASK_OPTS,$export_customers_graph_task_opt);
my $export_customers_tabular_task_opt = 'export_customers_tabular';
push(@TASK_OPTS,$export_customers_tabular_task_opt);
#my $import_customers_json_task_opt = 'import_customers_json';
#push(@TASK_OPTS,$import_customers_json_task_opt);
if (init()) {
main();
exit(0);
} else {
exit(1);
}
sub init {
my $configfile = $defaultconfig;
my $settingsfile = $defaultsettings;
return 0 unless GetOptions(
"config=s" => \$configfile,
"settings=s" => \$settingsfile,
"task=s" => $tasks,
"dry" => \$dry,
"skip-errors" => \$skip_errors,
"force" => \$force,
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
$tasks = removeduplicates($tasks,1);
my $result = load_config($configfile);
init_log();
$result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE);
$result &= load_config($tabular_fields_yml,\&update_tabular_fields,$YAML_CONFIG_TYPE);
$result &= load_config($load_recursive_yml,\&update_load_recursive,$YAML_CONFIG_TYPE);
return $result;
}
sub main() {
my @messages = ();
my @attachmentfiles = ();
my $result = 1;
my $completion = 0;
if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) {
scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors;
foreach my $task (@$tasks) {
if (lc($cleanup_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result);
} elsif (lc($cleanup_all_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result);
#} elsif (lc($export_customers_graph_task_opt) eq lc($task)) {
# $result &= export_customers_graph_task(\@messages) if taskinfo($export_customers_graph_task_opt,$result);
# $completion |= 1;
} elsif (lc($export_customers_tabular_task_opt) eq lc($task)) {
$result &= export_customers_tabular_task(\@messages) if taskinfo($export_customers_tabular_task_opt,$result);
$completion |= 1;
#} elsif (lc($import_customers_json_task_opt) eq lc($task)) {
# if (taskinfo($import_customers_json_task_opt,$result,1)) {
# next unless check_dry();
# $result &= import_customers_json_task(\@messages);
# $completion |= 1;
# }
} else {
$result = 0;
scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
last;
}
}
} else {
$result = 0;
scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath()));
}
push(@attachmentfiles,$attachmentlogfile);
if ($completion) {
completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
} else {
done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath()));
}
return $result;
}
sub taskinfo {
my ($task,$result) = @_;
scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath()));
return $result;
}
sub cleanup_task {
my ($messages,$clean_generated) = @_;
my $result = 0;
if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) {
eval {
#cleanupcvsdirs() if $clean_generated;
cleanupdbfiles() if $clean_generated;
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
cleanupmsgfiles(\&fileerror,\&filewarn);
#cleanupcertfiles();
cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
$result = 1;
};
}
if ($@ or !$result) {
#print $@;
push(@$messages,'working directory cleanup INCOMPLETE');
return 0;
} else {
push(@$messages,'working directory folders cleaned up');
return 1;
}
}
sub export_customers_graph_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = export_customers_graph();
};
my $err = $@;
my $stats = ": $warning_count warnings";
eval {
#$stats .= "\n total mta subscriber records: " .
# NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() . ' rows';
#my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta(
# $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::added_delta
#);
#$stats .= "\n new: $added_count rows";
#my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta(
# $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::updated_delta
#);
#$stats .= "\n existing: $existing_count rows";
#my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta(
# $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::deleted_delta
#);
#$stats .= "\n removed: $deleted_count rows";
};
if ($err or !$result) {
push(@$messages,"exporting customers (graph) INCOMPLETE$stats");
} else {
push(@$messages,"exporting customers (graph) completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
}
sub export_customers_tabular_task {
my ($messages) = @_;
my ($result,$warning_count) = (0,0);
eval {
($result,$warning_count) = export_customers_tabular();
};
my $err = $@;
my $stats = ": $warning_count warnings";
eval {
#$stats .= "\n total mta subscriber records: " .
# NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() . ' rows';
#my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta(
# $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::added_delta
#);
#$stats .= "\n new: $added_count rows";
#my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta(
# $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::updated_delta
#);
#$stats .= "\n existing: $existing_count rows";
#my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta(
# $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::deleted_delta
#);
#$stats .= "\n removed: $deleted_count rows";
};
if ($err or !$result) {
push(@$messages,"exporting customers (tabular) INCOMPLETE$stats");
} else {
push(@$messages,"exporting customers (tabular) completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
return $result;
}
#sub import_customers_json_task {
#
# my ($messages) = @_;
# my ($result,$warning_count,$contract_read_count,$subscriber_read_count,$contract_created_count,$subscriber_created_count,$contract_failed_count,$subscriber_failed_count) = (0,0,0,0,0,0,0,0);
# eval {
# ($result,$warning_count,$contract_read_count,$subscriber_read_count,$contract_created_count,$subscriber_created_count,$contract_failed_count,$subscriber_failed_count) = import_customers_json();
# };
# my $err = $@;
# my $stats = ": $warning_count warnings";
# eval {
# $stats .= "\n contracts read: " . $contract_read_count;
# $stats .= "\n contracts created: " . $contract_created_count;
# $stats .= "\n contracts failed: " . $contract_failed_count;
# $stats .= "\n subscribers read: " . $subscriber_read_count;
# $stats .= "\n subscribers created: " . $subscriber_created_count;
# $stats .= "\n subscribers failed: " . $subscriber_failed_count;
# };
# if ($err or !$result) {
# push(@$messages,"importing customers (json) INCOMPLETE$stats");
# } else {
# push(@$messages,"importing customers (json) completed$stats");
# }
# destroy_all_dbs(); #every task should leave with closed connections.
# return $result;
#
#}
__DATA__
This exists to allow the locking code at the beginning of the file to work.
DO NOT REMOVE THESE LINES!

@ -0,0 +1,59 @@
#dry=0
#skip_errors=0
schema_version = Trunk
export_customers_multithreading = 1
export_customers_numofthreads = 4
export_customers_blocksize = 1000
customer_export_filename=customer_%s.json
#customer_import_filename=customer_20210216173615.json
#split_customers = 1
#customer_import_multithreading = 1
#customer_import_numofthreads = 4
#customer_reseller_name = default
#customer_billing_profile_name = Default Billing Profile
#customer_domain = test1610072315.example.org
#customer_contact_email_format = DN0%2$s%3$s@example.org
#customer_timezone = Europe/Vienna
#subscriber_profile_set_name = subscriber_profile_1_set_65261
#subscriber_profile_name = subscriber_profile_1_65261
# sip username as webusername:
#webusername_format = %1$s
# webusername = cc+ac+sn:
#webusername_format = %2$s%3$s%4$s
# webusername = 0+ac+sn:
webusername_format = 0%3$s%4$s
# sip username as external_id:
#subscriber_externalid_format = %1$s
# external_id = cc+ac+sn:
#subscriber_externalid_format = %2$s%3$s%4$s
# external_id = 0+ac+sn:
subscriber_externalid_format = 0%3$s%4$s
# subscriber contact will be created, only if one of below is set.
subscriber_contact_email_format = DN0%2$s%3$s@domain.org
subscriber_timezone = Europe/Vienna
tabular_single_row_txn = 1
ignore_tabular_unique = 0
tabular_fields_yml = tabular_fields.yml
load_recursive_yml = load_recursive.yml
sqlite_db_file = sqlite
cf_default_priority: 1
cf_default_timeout: 300
cft_default_ringtimeout: 20
#write sql files for legacy db to set/unset the is_external pref of migrated subscribers:
rollback_sql_export_filename_format = delete_subscribers_%s.sql
rollback_sql_stmt_format = start transaction;call billing.remove_subscriber("%1$s",%2$s);commit;

@ -0,0 +1,64 @@
- path: contract.id
- path: id
- path: username
- path: primary_number.cc
- path: primary_number.ac
- path: primary_number.sn
- path: provisioning_voip_subscriber.voicemail_users[0].attach
- path: provisioning_voip_subscriber.voicemail_users[0].delete
- path: provisioning_voip_subscriber.voicemail_users[0].email
- path: provisioning_voip_subscriber.voicemail_users[0].password
- path: provisioning_voip_subscriber.voip_usr_preferences.allowed_clis
sep: ','
field: 'value'
- path: provisioning_voip_subscriber.voip_usr_preferences.allowed_ips_grp[0].allowed_ips
sep: ','
field: 'ipnet'
- path: provisioning_voip_subscriber.voip_usr_preferences.block_out_list
sep: ','
field: 'value'
- path: provisioning_voip_subscriber.voip_usr_preferences.block_out_mode[0].value
- path: provisioning_voip_subscriber.voip_usr_preferences.block_in_list
sep: ','
field: 'value'
- path: provisioning_voip_subscriber.voip_usr_preferences.block_in_mode[0].value
- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_in_list
sep: ','
field: 'value'
- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_in_mode[0].value
- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_out_list
sep: ','
field: 'value'
- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_out_mode[0].value
- path: provisioning_voip_subscriber.voip_usr_preferences.ncos_id[0].ncos.level
- path: provisioning_voip_subscriber.voip_usr_preferences.adm_ncos_id[0].ncos.level
- path: provisioning_voip_subscriber.voip_usr_preferences.cfb[0].cf_mapping.destinations
sep: ','
field: 'destination'
- path: provisioning_voip_subscriber.voip_usr_preferences.cfna[0].cf_mapping.destinations
sep: ','
field: 'destination'
- path: provisioning_voip_subscriber.voip_usr_preferences.cfo[0].cf_mapping.destinations
sep: ','
field: 'destination'
- path: provisioning_voip_subscriber.voip_usr_preferences.cfr[0].cf_mapping.destinations
sep: ','
field: 'destination'
- path: provisioning_voip_subscriber.voip_usr_preferences.cfs[0].cf_mapping.destinations
sep: ','
field: 'destination'
- path: provisioning_voip_subscriber.voip_usr_preferences.cft[0].cf_mapping.destinations
sep: ','
field: 'destination'
- path: provisioning_voip_subscriber.voip_usr_preferences.cfu[0].cf_mapping.destinations
sep: ','
field: 'destination'
- path: provisioning_voip_subscriber.voip_fax_preferences.active
- path: provisioning_voip_subscriber.voip_fax_preferences.ecm
- path: provisioning_voip_subscriber.voip_fax_preferences.name
- path: provisioning_voip_subscriber.voip_fax_preferences.t38
- path: provisioning_voip_subscriber.voip_fax_destinations
sep: ','
- path: provisioning_voip_subscriber.voip_usr_preferences.force_inbound_calls_to_peer[0].value
- path: provisioning_voip_subscriber.voip_usr_preferences.lnp_for_local_sub[0].value

@ -9,6 +9,8 @@ use NGCP::BulkProcessor::Table qw(get_rowhash);
use NGCP::BulkProcessor::SqlProcessor qw(init_record);
use NGCP::BulkProcessor::Utils qw(load_module);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw();
@ -44,4 +46,61 @@ sub gethash {
return get_rowhash(\@fieldvalues);
}
sub load_relation {
my $self = shift;
my ($load_recursive,$relation,$findby,@findby_args) = @_;
if ($load_recursive and 'HASH' eq ref $load_recursive and length($relation)) {
my $relation_path;
my $relation_path_backup = $load_recursive->{_relation_path};
if (length($relation_path_backup)) {
$relation_path = $relation_path_backup . '.' . $relation;
} else {
no strict "refs"; ## no critic (ProhibitNoStrict)
$relation_path = ((ref $self) . '::gettablename')->() . '.' . $relation;
}
my $include = $load_recursive->{$relation_path};
my $filter;
my $transfrom;
if ('HASH' eq ref $include) {
$filter = $include->{filter};
$transfrom = $include->{transform};
if (exists $include->{include}) {
$include = $include->{include};
} elsif ($transfrom or $filter) {
$include = 1;
}
}
if (('CODE' eq ref $include and $include->($self))
or (not ref $include and $include)) {
load_module($findby);
no strict "refs"; ## no critic (ProhibitNoStrict)
$load_recursive->{_relation_path} = $relation_path;
$self->{$relation} = $findby->(@findby_args);
if ('ARRAY' eq ref $self->{$relation}
and 'CODE' eq ref $filter) {
my $closure = _closure($filter,$load_recursive->{_context});
$self->{$relation} = [ grep { $closure->($_); } @{$self->{$relation}}];
}
if ('CODE' eq ref $transfrom) {
my $closure = _closure($transfrom,$load_recursive->{_context});
$self->{$relation} = $closure->($self->{$relation});
}
$load_recursive->{_relation_path} = $relation_path_backup;
return 1;
}
}
return 0;
}
sub _closure {
my ($sub,$context) = @_;
return sub {
foreach my $key (keys %$context) {
no strict "refs"; ## no critic (ProhibitNoStrict)
*{"main::$key"} = $context->{$key} if 'CODE' eq ref $context->{$key};
}
return $sub->(@_,$context);
};
}
1;

@ -114,6 +114,8 @@ our @EXPORT_OK = qw(
unshare
run
load_module
);
our $chmod_umask = 0777;
@ -1083,4 +1085,17 @@ sub run {
}
sub load_module {
my $package_element = shift;
eval {
(my $module = $package_element) =~ s/::[a-zA-Z_0-9]+$//g;
(my $file = $module) =~ s|::|/|g;
require $file . '.pm';
#$module->import();
1;
} or do {
die($@);
};
}
1;

Loading…
Cancel
Save