TT#128650 contract graph exporter

Change-Id: I37aaba48e9b830b2cb20385c63fac350fd0954c2
(cherry picked from commit e587c8f2c1)
mr10.0
Rene Krenn 5 years ago
parent a82fb8c948
commit 6fc3ef8d6d

@ -36,8 +36,16 @@ sub new {
},
not_a_coderef_or_method => $callbacks->{not_a_coderef_or_method} // sub {
my ($data, $key, $index, $value, $rest) = @_;
die "tried to retrieve from a non-existent coderef or method: $key in $data";
}
die "tried to retrieve from a non-existent coderef or method: $key in $data\n";
},
filter => $callbacks->{filter} // sub {
my ($item_path) = @_;
die "no filter callback method provided\n";
},
transform => $callbacks->{transform} // sub {
my ($item_path) = @_;
die "no transform callback method provided\n";
},
},
};
return bless $self,$class;
@ -101,4 +109,85 @@ sub get {
return $value;
}
# filter graph (in-place)
sub filter {
my ($self,$data,$path) = @_;
# set data to
$data //= $self->{data};
$self->_filter($data,$path);
return $self; #support chaining
}
sub _filter {
my ($self,$data,$path) = @_;
my $reftype = reftype $data;
return unless $reftype;
my $include = 0;
if ("ARRAY" eq $reftype) {
my ($i,$j) = (0, 0);
foreach (@$data) {
my $item_path = (length($path) ? $path : '') . '[' . $j . ']';
if ($self->{callbacks}->{filter}->($item_path)
or $self->_filter($data->[$i],$item_path)) {
$include = 1;
$i++;
} else {
splice(@$data, $i, 1);
}
$j++;
}
} elsif ("HASH" eq $reftype) {
foreach my $key (keys %$data) {
my $item_path = (length($path) ? $path . '.' : '') . $key;
if ($self->{callbacks}->{filter}->($item_path)
or $self->_filter($data->{$key},$item_path)) {
$include = 1;
} else {
delete $data->{$key};
}
}
}
return $include;
}
# transform graph (in-place)
sub transform {
my ($self,$data,$path) = @_;
# set data to
$data //= $self->{data};
$self->_transform($data,$path);
return $self; #support chaining
}
sub _transform {
my ($self,$data,$path) = @_;
my $reftype = reftype $data;
return unless $reftype;
if ("ARRAY" eq $reftype) {
my $i = 0;
foreach (@$data) {
my $item_path = (length($path) ? $path : '') . '[' . $i . ']';
$self->_transform($data->[$i],$item_path);
$data->[$i] = $self->{callbacks}->{transform}->($data->[$i],$item_path);
$i++;
}
} elsif ("HASH" eq $reftype) {
foreach my $key (keys %$data) {
my $item_path = (length($path) ? $path . '.' : '') . $key;
$self->_transform($data->{$key},$item_path);
$data->{$key} = $self->{callbacks}->{transform}->($data->{$key},$item_path);
}
}
}
1;

@ -7,7 +7,6 @@ use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw(
get_sqlite_db
destroy_all_dbs
);
#import_db_tableidentifier
use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
$tabular_fields
@ -18,14 +17,11 @@ use NGCP::BulkProcessor::SqlProcessor qw(
create_targettable
checktableinfo
copy_row
insert_stmt
transfer_table
);
#process_table
use NGCP::BulkProcessor::SqlRecord qw();
#use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw();
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
@ -38,7 +34,6 @@ our @EXPORT_OK = qw(
get_fieldnames
update_delta
findby_delta
countby_delta
@ -46,16 +41,12 @@ our @EXPORT_OK = qw(
$deleted_delta
$updated_delta
$added_delta
copy_table
);
#@fieldnames
#findby_sipusername
#findby_ccacsn
#countby_ccacsn
my $tablename = 'tabular';
my $get_db = \&get_sqlite_db;
#my $get_tablename = \&import_db_tableidentifier;
my $fieldnames;
my $expected_fieldnames;
@ -76,13 +67,10 @@ sub get_fieldnames {
return $expected_fieldnames;
}
# table creation:
my $primarykey_fieldnames = [ 'uuid' ];
my $indexes = {
#$tablename . '_username_domain' => [ 'username', 'domain' ],
$tablename . '_delta' => [ 'delta(7)' ],
};
#my $fixtable_statements = [];
our $deleted_delta = 'DELETED';
our $updated_delta = 'UPDATED';
@ -198,6 +186,26 @@ sub countby_delta {
}
sub copy_table {
my ($get_target_db) = @_;
check_table();
#checktableinfo($get_target_db,
# __PACKAGE__,$tablename,
# get_fieldnames(1),
# $indexes);
return transfer_table(
get_db => $get_db,
class => __PACKAGE__,
get_target_db => $get_target_db,
targetclass => __PACKAGE__,
targettablename => $tablename,
);
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
@ -219,47 +227,6 @@ sub buildrecords_fromrows {
}
#sub process_records {
#
# my %params = @_;
# my ($process_code,
# $static_context,
# $init_process_context_code,
# $uninit_process_context_code,
# $multithreading,
# $numofthreads) = @params{qw/
# process_code
# static_context
# init_process_context_code
# uninit_process_context_code
# multithreading
# numofthreads
# /};
#
# check_table();
# my $db = &$get_db();
# my $table = $db->tableidentifier($tablename);
#
# my @cols = map { $db->columnidentifier($_); } qw/domain sip_username/;
#
# return process_table(
# get_db => $get_db,
# class => __PACKAGE__,
# process_code => sub {
# my ($context,$rowblock,$row_offset) = @_;
# return &$process_code($context,$rowblock,$row_offset);
# },
# static_context => $static_context,
# init_process_context_code => $init_process_context_code,
# uninit_process_context_code => $uninit_process_context_code,
# destroy_reader_dbs_code => \&destroy_all_dbs,
# multithreading => $multithreading,
# tableprocessing_threads => $numofthreads,
# 'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols),
# 'selectcount' => 'SELECT COUNT(DISTINCT(' . join(',',@cols) . ')) FROM ' . $table,
# );
#}
sub getinsertstatement {
my ($insert_ignore) = @_;

@ -5,8 +5,10 @@ use strict;
use threads::shared qw();
use Tie::IxHash;
use NGCP::BulkProcessor::Serialization qw();
use Scalar::Util 'blessed';
use Scalar::Util qw(blessed);
use MIME::Base64 qw(encode_base64);
use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
@ -16,7 +18,6 @@ use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
$export_customers_multithreading
$export_customers_numofthreads
$export_customers_blocksize
run_dao_method
get_dao_var
@ -29,6 +30,8 @@ use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
$load_recursive
$tabular_single_row_txn
$ignore_tabular_unique
$graph_fields
$graph_fields_mode
);
use NGCP::BulkProcessor::Logging qw (
@ -65,11 +68,50 @@ our @EXPORT_OK = qw(
export_customers_tabular
);
sub _init_graph_field_map {
my $context = shift;
my %graph_field_map = ();
my %graph_field_globs = ();
tie(%graph_field_globs, 'Tie::IxHash');
foreach my $graph_field (@$graph_fields) {
my ($c,$a);
if ('HASH' eq ref $graph_field) {
$a = $graph_field->{path};
$c = $graph_field;
} else {
$a = $graph_field;
$c = 1;
}
#my @a = ();
#my $b = '';
#foreach my $c (split(/\./,$a)) {
#
# foreach my () {
# $b .= $_
# push()
# }
#}
if ($a =~ /\*/) {
$a = quotemeta($a);
$a =~ s/(\\\*)+/[^.]+/g;
$a = '^' . $a . '$';
$graph_field_globs{$a} = $c unless exists $graph_field_globs{$a};
} else {
$graph_field_map{$a} = $c unless exists $graph_field_map{$a};
}
}
$context->{graph_field_map} = \%graph_field_map;
$context->{graph_field_globs} = \%graph_field_globs;
}
sub export_customers_graph {
my $static_context = {
};
_init_graph_field_map($static_context);
($static_context->{export_filename},$static_context->{export_format}) = get_export_filename($customer_export_filename_format);
my $result = 1; #_copy_customers_checks($static_context);
destroy_all_dbs();
@ -83,21 +125,18 @@ sub export_customers_graph {
my @data = ();
foreach my $record (@$records) {
next unless _export_customer_graph_init_context($context,$record);
push(@data,_get_contract_graph($context->{contract}));
push(@data,_get_contract_graph($context));
}
write_export_file(\@data,$context->{export_filename},$context->{export_format});
return 1;
},
init_process_context_code => sub {
my ($context)= @_;
#$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
($context->{export_filename},$context->{export_format}) = get_export_filename($customer_export_filename_format);
},
uninit_process_context_code => sub {
my ($context)= @_;
#undef $context->{db};
destroy_all_dbs();
{
lock $warning_count;
@ -226,69 +265,72 @@ sub _export_customer_graph_init_context {
sub _get_contract_graph {
my ($context) = @_;
#sub unshare {
#
# my ($obj,) = @_;
# my $ref = ref $obj;
# if ("ARRAY" eq $ref) {
# my @array = ();
# my $i = 0;
# foreach my $value (@$obj) {
# push(@array, unshare($value)) if xx;
# $i++;
# }
# return \@array;
# } elsif ($ref eq "HASH") {
# my %hash = ();
# foreach my $key (keys %$obj) {
# $hash{$key} = unshare($obj->{$key}) if xx;
# }
# return \%hash;
# }
#
#}
foreach my $bill_subs (@{$context->{contract}->{voip_subscribers}}) {
($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) =
array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences},
sub { return shift->{attribute}; }, sub { my $p = shift; }, 'group' );
if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) {
foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) {
foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) {
$voicemail->{recording} = encode_base64($voicemail->{recording},'');
my $dp = NGCP::BulkProcessor::DSPath->new($context->{contract}, {
filter => sub {
my $path = shift;
if ('whitelist' eq $graph_fields_mode) {
my $include;
if (exists $context->{graph_field_map}->{$path}) {
$include = $context->{graph_field_map}->{$path};
} else {
foreach my $glob (keys %{$context->{graph_field_globs}}) {
if ($path =~ /$glob/) {
$include = $context->{graph_field_globs}->{$glob};
last;
}
}
}
if ('HASH' eq ref $include) {
if (exists $include->{include}) {
return $include->{include};
}
return 1;
} else {
return $include;
}
} elsif ('blacklist' eq $graph_fields_mode) {
my $exclude;
if (exists $context->{graph_field_map}->{$path}) {
$exclude = $context->{graph_field_map}->{$path};
} else {
foreach my $glob (keys %{$context->{graph_field_globs}}) {
if ($path =~ /$glob/) {
$exclude = $context->{graph_field_globs}->{$glob};
last;
}
}
}
if ('HASH' eq ref $exclude) {
if (exists $exclude->{exclude}) {
return not $exclude->{exclude};
} elsif ($exclude->{transform}) {
return 1;
}
return 0;
} else {
return not $exclude;
}
}
}
my $dp = NGCP::BulkProcessor::DSPath->new($bill_subs, {
retrieve_key_from_non_hash => sub {},
key_does_not_exist => sub {},
index_does_not_exist => sub {},
});
#foreach my $graph_field (@$graph_fields) {
# my $a;
# my $sep = ',';
# if ('HASH' eq ref $tabular_field) {
# $a = $tabular_field->{path};
# $sep = $tabular_field->{sep};
# } else {
# $a = $tabular_field;
# }
# #eval {'' . ($dp->get('.' . $a) // '');}; if($@){
# # my $x=5;
# #}
# my $v = $dp->get('.' . $a);
# if ('ARRAY' eq ref $v) {
# if ('HASH' eq ref $v->[0]) {
# $v = join($sep, sort map { $_->{$tabular_field->{field}}; } @$v);
# } else {
# $v = join($sep, sort @$v);
# }
# } else {
# $v = '' . ($v // '');
# }
# push(@row,$v);
#}
}
},
transform => sub {
return shift;
# ($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) =
# array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences},
# sub { return shift->{attribute}; }, sub { my $p = shift; }, 'group' );
# if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) {
# foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) {
# foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) {
# $voicemail->{recording} = encode_base64($voicemail->{recording},'');
# }
# }
# }
},
});
$dp->filter()->transform();
return $context->{contract};
}
sub _export_customer_tabular_init_context {
@ -335,9 +377,11 @@ sub _get_subscriber_rows {
foreach my $tabular_field (@$tabular_fields) {
my $a;
my $sep = ',';
my $transform;
if ('HASH' eq ref $tabular_field) {
$a = $tabular_field->{path};
$sep = $tabular_field->{sep};
$transform = $tabular_field->{transform};
} else {
$a = $tabular_field;
}
@ -345,6 +389,10 @@ sub _get_subscriber_rows {
# my $x=5;
#}
my $v = $dp->get('.' . $a);
if ('CODE' eq ref $transform) {
my $closure = _closure($transform,_get_closure_context($context));
$v = $closure->($v,$bill_subs);
}
if ('ARRAY' eq ref $v) {
if ('HASH' eq ref $v->[0]
or (blessed($v->[0]) and $v->[0]->isa('NGCP::BulkProcessor::SqlRecord'))) {
@ -376,13 +424,7 @@ sub _load_contract {
my ($context,$record) = @_;
$context->{contract} = run_dao_method('billing::contracts::findby_id', $record->{id}, { %$load_recursive,
#'contracts.voip_subscribers.domain' => 1,
_context => {
_info => \&_info,
_error => \&_error,
_debug => \&_debug,
_warn => \&_warn,
context => $context,
},
_context => _get_closure_context($context),
});
return 1 if $context->{contract};
@ -390,6 +432,28 @@ sub _load_contract {
}
sub _get_closure_context {
my $context = shift;
return {
_info => \&_info,
_error => \&_error,
_debug => \&_debug,
_warn => \&_warn,
context => $context,
};
}
sub _closure {
my ($sub,$context) = @_;
return sub {
foreach my $key (keys %$context) {
no strict "refs"; ## no critic (ProhibitNoStrict)
*{"main::$key"} = $context->{$key} if 'CODE' eq ref $context->{$key};
}
return $sub->(@_,$context);
};
}
sub _error {
my ($context,$message) = @_;

@ -8,7 +8,7 @@ use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../');
use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
$csv_dir
$sqlite_db_file
);
@ -16,6 +16,7 @@ use NGCP::BulkProcessor::ConnectorPool qw(
get_connectorinstancename
);
use NGCP::BulkProcessor::SqlConnectors::CSVDB qw();
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode);
use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo);
@ -26,6 +27,9 @@ our @EXPORT_OK = qw(
get_sqlite_db
sqlite_db_tableidentifier
get_csv_db
csv_db_tableidentifier
destroy_dbs
destroy_all_dbs
@ -34,14 +38,15 @@ our @EXPORT_OK = qw(
);
my $sqlite_dbs = {};
my $csv_dbs = {};
sub get_sqlite_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name); #threadid(); #shift;
my $name = get_connectorinstancename($instance_name);
if (not defined $sqlite_dbs->{$name}) {
$sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); #$name);
$sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name);
if (not defined $reconnect) {
$reconnect = 1;
}
@ -62,6 +67,31 @@ sub sqlite_db_tableidentifier {
}
sub get_csv_db {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name);
if (not defined $csv_dbs->{$name}) {
$csv_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::CSVDB->new($instance_name);
if (not defined $reconnect) {
$reconnect = 1;
}
}
if ($reconnect) {
$csv_dbs->{$name}->db_connect($csv_dir);
}
return $csv_dbs->{$name};
}
sub csv_db_tableidentifier {
my ($get_target_db,$tablename) = @_;
my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db;
return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::CSVDB::get_tableidentifier($tablename,$csv_dir));
}
sub destroy_dbs {
foreach my $name (keys %$sqlite_dbs) {
@ -69,6 +99,12 @@ sub destroy_dbs {
undef $sqlite_dbs->{$name};
delete $sqlite_dbs->{$name};
}
foreach my $name (keys %$csv_dbs) {
cleartableinfo($csv_dbs->{$name});
undef $csv_dbs->{$name};
delete $csv_dbs->{$name};
}
}

@ -12,7 +12,6 @@ use DateTime::TimeZone qw();
use JSON -support_by_pp, -no_export;
*NGCP::BulkProcessor::Serialization::serialize_json = sub {
my $input_ref = shift;
#return JSON::XS::encode_json($input_ref);
return JSON::to_json($input_ref, { allow_nonref => 1, allow_blessed => 1, convert_blessed => 1, pretty => 1, as_nonblessed => 1 });
};
@ -40,8 +39,9 @@ use NGCP::BulkProcessor::LoadConfig qw(
split_tuple
parse_regexp
);
use NGCP::BulkProcessor::Utils qw(prompt timestampdigits threadid load_module);
#format_number
use NGCP::BulkProcessor::Array qw(contains);
require Exporter;
@ -55,16 +55,21 @@ our @EXPORT_OK = qw(
write_sql_file
update_load_recursive
$load_recursive_yml
$load_yml
$load_recursive
update_tabular_fields
$tabular_fields_yml
$tabular_yml
$tabular_fields
$ignore_tabular_unique
$tabular_single_row_txn
$graph_yml
$graph_fields
$graph_fields_mode
update_graph_fields
$sqlite_db_file
$csv_dir
check_dry
@ -75,7 +80,6 @@ our @EXPORT_OK = qw(
$customer_import_filename
$split_customers
$defaultsettings
$defaultconfig
@ -83,11 +87,10 @@ our @EXPORT_OK = qw(
$skip_errors
$force
$export_customers_multithreading
$export_customers_numofthreads
$export_customers_blocksize
$cf_default_priority
$cf_default_timeout
@ -100,37 +103,42 @@ our @EXPORT_OK = qw(
our $defaultconfig = 'config.cfg';
our $defaultsettings = 'settings.cfg';
our $tabular_fields_yml = 'tabular_fields.yml';
our $tabular_yml = 'tabular.yml';
our $tabular_fields = [];
our $ignore_tabular_unique = 0;
our $tabular_single_row_txn = 1;
our $load_recursive_yml = 'load_recursive_yml.yml';
our $graph_yml = 'graph.yml';
our $graph_fields = [];
our $graph_fields_mode = 'whitelist';
my @graph_fields_modes = qw(whitelist blacklist);
our $load_yml = 'load.yml';
our $load_recursive;
our $output_path = $working_path . 'output/';
our $input_path = $working_path . 'input/';
our $csv_dir = 'customer';
our $customer_export_filename_format = undef;
our $customer_import_filename = undef;
our $customer_import_numofthreads = $cpucount;
our $customer_import_multithreading = 1;
our $customer_reseller_name = 'default';
our $customer_billing_profile_name = 'Default Billing Profile';
our $customer_domain = undef;
our $customer_contact_email_format = '%s@example.org';
our $subscriber_contact_email_format = '%s@example.org';
our $split_customers = 0;
#our $customer_import_filename = undef;
#our $customer_import_numofthreads = $cpucount;
#our $customer_import_multithreading = 1;
#our $customer_reseller_name = 'default';
#our $customer_billing_profile_name = 'Default Billing Profile';
#our $customer_domain = undef;
#our $customer_contact_email_format = '%s@example.org';
#our $subscriber_contact_email_format = '%s@example.org';
#our $split_customers = 0;
our $subscriber_timezone = undef;
our $contract_timezone = undef;
our $subscriber_profile_set_name = undef;
our $subscriber_profile_name = undef;
our $webusername_format = '%1$s';
our $subscriber_externalid_format = undef;
#our $subscriber_timezone = undef;
#our $contract_timezone = undef;
#our $subscriber_profile_set_name = undef;
#our $subscriber_profile_name = undef;
#our $webusername_format = '%1$s';
#our $subscriber_externalid_format = undef;
our $force = 0;
our $dry = 0;
@ -141,18 +149,16 @@ my @supported_mr = ('Trunk');
our $sqlite_db_file = 'sqlite';
our $export_customers_multithreading = $enablemultithreading;
our $export_customers_numofthreads = $cpucount;
our $export_customers_blocksize = 1000;
#our $cf_default_priority = 1;
#our $cf_default_timeout = 300;
#our $cft_default_ringtimeout = 20;
our $cf_default_priority = 1;
our $cf_default_timeout = 300;
our $cft_default_ringtimeout = 20;
our $rollback_sql_export_filename_format = undef;
our $rollback_sql_stmt_format = undef;
#our $rollback_sql_export_filename_format = undef;
#our $rollback_sql_stmt_format = undef;
my $file_lock :shared = undef;
@ -171,72 +177,75 @@ sub update_settings {
$customer_export_filename_format = $data->{customer_export_filename} if exists $data->{customer_export_filename};
get_export_filename($data->{customer_export_filename},$configfile);
$rollback_sql_export_filename_format = $data->{rollback_sql_export_filename_format} if exists $data->{rollback_sql_export_filename_format};
get_export_filename($data->{rollback_sql_export_filename_format},$configfile);
$rollback_sql_stmt_format = $data->{rollback_sql_stmt_format} if exists $data->{rollback_sql_stmt_format};
#$rollback_sql_export_filename_format = $data->{rollback_sql_export_filename_format} if exists $data->{rollback_sql_export_filename_format};
#get_export_filename($data->{rollback_sql_export_filename_format},$configfile);
#$rollback_sql_stmt_format = $data->{rollback_sql_stmt_format} if exists $data->{rollback_sql_stmt_format};
$sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file};
$customer_import_filename = _get_import_filename($customer_import_filename,$data,'customer_import_filename');
$customer_import_multithreading = $data->{customer_import_multithreading} if exists $data->{customer_import_multithreading};
$customer_import_numofthreads = _get_numofthreads($cpucount,$data,'customer_import_numofthreads');
$customer_reseller_name = $data->{customer_reseller_name} if exists $data->{customer_reseller_name};
$customer_billing_profile_name = $data->{customer_billing_profile_name} if exists $data->{customer_billing_profile_name};
$customer_domain = $data->{customer_domain} if exists $data->{customer_domain};
$customer_contact_email_format = $data->{customer_contact_email_format} if exists $data->{customer_contact_email_format};
$subscriber_contact_email_format = $data->{subscriber_contact_email_format} if exists $data->{subscriber_contact_email_format};
$split_customers = $data->{split_customers} if exists $data->{split_customers};
$csv_dir = $data->{csv_dir} if exists $data->{csv_dir};
#$customer_import_filename = _get_import_filename($customer_import_filename,$data,'customer_import_filename');
#$customer_import_multithreading = $data->{customer_import_multithreading} if exists $data->{customer_import_multithreading};
#$customer_import_numofthreads = _get_numofthreads($cpucount,$data,'customer_import_numofthreads');
#$customer_reseller_name = $data->{customer_reseller_name} if exists $data->{customer_reseller_name};
#$customer_billing_profile_name = $data->{customer_billing_profile_name} if exists $data->{customer_billing_profile_name};
#$customer_domain = $data->{customer_domain} if exists $data->{customer_domain};
#$customer_contact_email_format = $data->{customer_contact_email_format} if exists $data->{customer_contact_email_format};
#$subscriber_contact_email_format = $data->{subscriber_contact_email_format} if exists $data->{subscriber_contact_email_format};
#$split_customers = $data->{split_customers} if exists $data->{split_customers};
$contract_timezone = $data->{customer_timezone} if exists $data->{customer_timezone};
if ($contract_timezone and not DateTime::TimeZone->is_valid_name($contract_timezone)) {
configurationerror($configfile,"invalid customer_timezone '$contract_timezone'");
$result = 0;
}
$subscriber_timezone = $data->{subscriber_timezone} if exists $data->{subscriber_timezone};
if ($subscriber_timezone and not DateTime::TimeZone->is_valid_name($subscriber_timezone)) {
configurationerror($configfile,"invalid subscriber_timezone '$subscriber_timezone'");
$result = 0;
}
#$contract_timezone = $data->{customer_timezone} if exists $data->{customer_timezone};
#if ($contract_timezone and not DateTime::TimeZone->is_valid_name($contract_timezone)) {
# configurationerror($configfile,"invalid customer_timezone '$contract_timezone'");
# $result = 0;
#}
#$subscriber_timezone = $data->{subscriber_timezone} if exists $data->{subscriber_timezone};
#if ($subscriber_timezone and not DateTime::TimeZone->is_valid_name($subscriber_timezone)) {
# configurationerror($configfile,"invalid subscriber_timezone '$subscriber_timezone'");
# $result = 0;
#}
$subscriber_profile_set_name = $data->{subscriber_profile_set_name} if exists $data->{subscriber_profile_set_name};
$subscriber_profile_name = $data->{subscriber_profile_name} if exists $data->{subscriber_profile_name};
if ($subscriber_profile_set_name and not $subscriber_profile_name
or not $subscriber_profile_set_name and $subscriber_profile_name) {
configurationerror($configfile,"both subscriber_profile_set_name and subscriber_profile_name required");
$result = 0;
}
$webusername_format = $data->{webusername_format} if exists $data->{webusername_format};
$subscriber_externalid_format = $data->{subscriber_externalid_format} if exists $data->{subscriber_externalid_format};
#$subscriber_profile_set_name = $data->{subscriber_profile_set_name} if exists $data->{subscriber_profile_set_name};
#$subscriber_profile_name = $data->{subscriber_profile_name} if exists $data->{subscriber_profile_name};
#if ($subscriber_profile_set_name and not $subscriber_profile_name
# or not $subscriber_profile_set_name and $subscriber_profile_name) {
# configurationerror($configfile,"both subscriber_profile_set_name and subscriber_profile_name required");
# $result = 0;
#}
#$webusername_format = $data->{webusername_format} if exists $data->{webusername_format};
#$subscriber_externalid_format = $data->{subscriber_externalid_format} if exists $data->{subscriber_externalid_format};
$dry = $data->{dry} if exists $data->{dry};
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$export_customers_multithreading = $data->{export_customers_multithreading} if exists $data->{export_customers_multithreading};
$export_customers_numofthreads = _get_numofthreads($cpucount,$data,'export_customers_numofthreads');
$export_customers_blocksize = $data->{export_customers_blocksize} if exists $data->{export_customers_blocksize};
$tabular_fields_yml = $data->{tabular_fields_yml} if exists $data->{tabular_fields_yml};
$load_recursive_yml = $data->{load_recursive_yml} if exists $data->{load_recursive_yml};
$tabular_yml = $data->{tabular_yml} if exists $data->{tabular_yml};
$graph_yml = $data->{graph_yml} if exists $data->{graph_yml};
$graph_fields_mode = $data->{graph_fields_mode} if exists $data->{graph_fields_mode};
if (not $graph_fields_mode or not contains($graph_fields_mode,\@graph_fields_modes)) {
configurationerror($configfile,'graph_fields_mode must be one of ' . join(', ', @graph_fields_modes));
$result = 0;
}
$load_yml = $data->{load_yml} if exists $data->{load_yml};
$tabular_single_row_txn = $data->{tabular_single_row_txn} if exists $data->{tabular_single_row_txn};
$ignore_tabular_unique = $data->{ignore_tabular_unique} if exists $data->{ignore_tabular_unique};
$cf_default_priority = $data->{cf_default_priority} if exists $data->{cf_default_priority};
$cf_default_timeout = $data->{cf_default_timeout} if exists $data->{cf_default_timeout};
$cft_default_ringtimeout = $data->{cft_default_ringtimeout} if exists $data->{cft_default_ringtimeout};
#$cf_default_priority = $data->{cf_default_priority} if exists $data->{cf_default_priority};
#$cf_default_timeout = $data->{cf_default_timeout} if exists $data->{cf_default_timeout};
#$cft_default_ringtimeout = $data->{cft_default_ringtimeout} if exists $data->{cft_default_ringtimeout};
$mr = $data->{schema_version};
if (not defined $mr or not contains($mr,\@supported_mr)) {
configurationerror($configfile,'version must be one of ' . join(', ', @supported_mr));
configurationerror($configfile,'schema_version must be one of ' . join(', ', @supported_mr));
$result = 0;
}
return $result;
}
return 0;
@ -290,15 +299,23 @@ sub get_export_filename {
filewarn('cannot remove ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__));
$export_filename = undef;
}
my ($name,$path,$suffix) = fileparse($export_filename,".json",".yml",".yaml",".sql");
my ($name,$path,$suffix) = fileparse($export_filename,".json",".yml",".yaml",".xml",".php",".pl",".db",".csv");
if ($suffix eq '.json') {
$export_format = $NGCP::BulkProcessor::Serialization::format_json;
} elsif ($suffix eq '.yml' or $suffix eq '.yaml') {
$export_format = $NGCP::BulkProcessor::Serialization::format_yaml;
} elsif ($suffix eq '.sql') {
} elsif ($suffix eq '.xml') {
$export_format = $NGCP::BulkProcessor::Serialization::format_xml;
} elsif ($suffix eq '.php') {
$export_format = $NGCP::BulkProcessor::Serialization::format_php;
} elsif ($suffix eq '.pl') {
$export_format = $NGCP::BulkProcessor::Serialization::format_perl;
} elsif ($suffix eq '.db') {
$export_format = 'sqlite';
} elsif ($suffix eq '.csv') {
$export_format = 'csv';
} else {
configurationerror($configfile,"$filename_format: either .json or .yaml export file format required");
configurationerror($configfile,"$filename_format: either .json/.yaml/.xml/.php/.pl or .db/.csv export file format required");
}
}
return ($export_filename,$export_format);
@ -308,6 +325,8 @@ sub write_export_file {
my ($data,$export_filename,$export_format) = @_;
if (defined $export_filename) {
fileerror("invalid extension for output filename $export_filename",getlogger(__PACKAGE__))
unless contains($export_format,\@NGCP::BulkProcessor::Serialization::formats);
# "concatenated json" https://en.wikipedia.org/wiki/JSON_streaming
my $str = '';
if (ref $data eq 'ARRAY') {
@ -382,6 +401,29 @@ sub update_tabular_fields {
}
sub update_graph_fields {
my ($data,$configfile) = @_;
if (defined $data) {
my $result = 1;
eval {
$graph_fields = $data;
};
if ($@ or 'ARRAY' ne ref $graph_fields) {
$graph_fields //= [];
configurationerror($configfile,'invalid graph fields',getlogger(__PACKAGE__));
$result = 0;
}
return $result;
}
return 0;
}
sub update_load_recursive {
my ($data,$configfile) = @_;
@ -405,11 +447,6 @@ sub update_load_recursive {
}
sub _get_sqlite_db_file {
my ($run,$name) = @_;
return ((defined $run and length($run) > 0) ? $run . '_' : '') . $name;
}
sub _get_import_filename {
my ($old_value,$data,$key) = @_;
my $import_filename = $old_value;

@ -0,0 +1,5 @@
# graph.yml: whitelist/blacklist of *contract* fields to export to .json/.yaml/.xml/...
- id
- voip_subscribers*.provisioning_voip_subscriber.voip_usr_preferences*.attribute.attribute
- voip_subscribers*.provisioning_voip_subscriber.voip_usr_preferences*.value

@ -1,7 +1,10 @@
# load.yml: define which *contract* relations to fetch from db.
#contracts.voip_subscribers: 1
contracts.voip_subscribers:
include: !!perl/code |
{
my ($contract) = @_;
my ($contract,$context) = @_;
#return 0 if $contract->{status} eq 'terminated';
return 1;
}
@ -20,12 +23,12 @@ contracts.voip_subscribers:
return $bill_subs;
}
#contracts.contact: 1
contracts.contact: 1
contracts.voip_subscribers.primary_number: 1
contracts.voip_subscribers.provisioning_voip_subscriber: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_dbaliases: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences: 1
#contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.attribute: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.attribute: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.allowed_ips: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.ncos: 1
contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.cf_mapping: 1
@ -38,4 +41,4 @@ contracts.voip_subscribers.provisioning_voip_subscriber.voip_fax_destinations:
{
my ($fax_destinations,$context) = @_;
return [ map { $_->{destination} . ' (' . $_->{filetype} . ')'; } @$fax_destinations ];
}
}

@ -15,10 +15,14 @@ use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
update_settings
update_tabular_fields
$tabular_fields_yml
update_graph_fields
$tabular_yml
$graph_yml
update_load_recursive
$load_recursive_yml
get_export_filename
$customer_export_filename_format
$load_yml
check_dry
$output_path
@ -27,7 +31,6 @@ use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw(
$dry
$skip_errors
$force
);
use NGCP::BulkProcessor::Logging qw(
@ -57,17 +60,13 @@ use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir);
use NGCP::BulkProcessor::Mail qw(
cleanupmsgfiles
);
#use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs);
use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs);
use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles);
#use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles);
use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw(destroy_all_dbs);
use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw(destroy_all_dbs get_csv_db get_sqlite_db);
#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw();
use NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular qw();
use NGCP::BulkProcessor::Projects::ETL::Customer::ExportCustomers qw(
export_customers_graph
@ -77,7 +76,7 @@ use NGCP::BulkProcessor::Projects::ETL::Customer::ExportCustomers qw(
# import_customers_json
#);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB;
my @TASK_OPTS = ();
@ -89,8 +88,8 @@ push(@TASK_OPTS,$cleanup_task_opt);
my $cleanup_all_task_opt = 'cleanup_all';
push(@TASK_OPTS,$cleanup_all_task_opt);
#my $export_customers_graph_task_opt = 'export_customers_graph';
#push(@TASK_OPTS,$export_customers_graph_task_opt);
my $export_customers_graph_task_opt = 'export_customers_graph';
push(@TASK_OPTS,$export_customers_graph_task_opt);
my $export_customers_tabular_task_opt = 'export_customers_tabular';
push(@TASK_OPTS,$export_customers_tabular_task_opt);
@ -117,15 +116,16 @@ sub init {
"dry" => \$dry,
"skip-errors" => \$skip_errors,
"force" => \$force,
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
);
$tasks = removeduplicates($tasks,1);
my $result = load_config($configfile);
init_log();
$result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE);
$result &= load_config($tabular_fields_yml,\&update_tabular_fields,$YAML_CONFIG_TYPE);
$result &= load_config($load_recursive_yml,\&update_load_recursive,$YAML_CONFIG_TYPE);
$result &= load_config($tabular_yml,\&update_tabular_fields,$YAML_CONFIG_TYPE);
$result &= load_config($graph_yml,\&update_graph_fields,$YAML_CONFIG_TYPE);
$result &= load_config($load_yml,\&update_load_recursive,$YAML_CONFIG_TYPE);
return $result;
}
@ -146,9 +146,9 @@ sub main() {
} elsif (lc($cleanup_all_task_opt) eq lc($task)) {
$result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result);
#} elsif (lc($export_customers_graph_task_opt) eq lc($task)) {
# $result &= export_customers_graph_task(\@messages) if taskinfo($export_customers_graph_task_opt,$result);
# $completion |= 1;
} elsif (lc($export_customers_graph_task_opt) eq lc($task)) {
$result &= export_customers_graph_task(\@messages) if taskinfo($export_customers_graph_task_opt,$result);
$completion |= 1;
} elsif (lc($export_customers_tabular_task_opt) eq lc($task)) {
$result &= export_customers_tabular_task(\@messages) if taskinfo($export_customers_tabular_task_opt,$result);
$completion |= 1;
@ -191,17 +191,15 @@ sub cleanup_task {
my $result = 0;
if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) {
eval {
#cleanupcvsdirs() if $clean_generated;
cleanupdbfiles() if $clean_generated;
cleanupcvsdirs();
cleanupdbfiles();
cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile));
cleanupmsgfiles(\&fileerror,\&filewarn);
#cleanupcertfiles();
cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
$result = 1;
};
}
if ($@ or !$result) {
#print $@;
push(@$messages,'working directory cleanup INCOMPLETE');
return 0;
} else {
@ -240,7 +238,7 @@ sub export_customers_graph_task {
} else {
push(@$messages,"exporting customers (graph) completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
destroy_all_dbs();
return $result;
}
@ -255,27 +253,36 @@ sub export_customers_tabular_task {
my $err = $@;
my $stats = ": $warning_count warnings";
eval {
#$stats .= "\n total mta subscriber records: " .
# NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() . ' rows';
#my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta(
# $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::added_delta
#);
#$stats .= "\n new: $added_count rows";
#my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta(
# $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::updated_delta
#);
#$stats .= "\n existing: $existing_count rows";
#my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta(
# $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::deleted_delta
#);
#$stats .= "\n removed: $deleted_count rows";
$stats .= "\n total subscriber records: " .
NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta() . ' rows';
my $added_count = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta(
$NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::added_delta
);
$stats .= "\n new: $added_count rows";
my $existing_count = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta(
$NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::updated_delta
);
$stats .= "\n existing: $existing_count rows";
my $deleted_count = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta(
$NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::deleted_delta
);
$stats .= "\n removed: $deleted_count rows";
my ($export_filename,$export_format) = get_export_filename($customer_export_filename_format);
if ('sqlite' eq $export_format) {
&get_sqlite_db()->copydbfile($export_filename);
} elsif ('csv' eq $export_format) {
NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::copy_table(\&get_csv_db);
&get_csv_db()->copytablefile(NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::gettablename(),$export_filename);
} else {
push(@$messages,'invalid extension for output filename $export_filename');
}
};
if ($err or !$result) {
push(@$messages,"exporting customers (tabular) INCOMPLETE$stats");
} else {
push(@$messages,"exporting customers (tabular) completed$stats");
}
destroy_all_dbs(); #every task should leave with closed connections.
destroy_all_dbs();
return $result;
}
@ -302,7 +309,7 @@ sub export_customers_tabular_task {
# } else {
# push(@$messages,"importing customers (json) completed$stats");
# }
# destroy_all_dbs(); #every task should leave with closed connections.
# destroy_all_dbs();
# return $result;
#
#}

@ -8,52 +8,60 @@ export_customers_numofthreads = 4
export_customers_blocksize = 1000
customer_export_filename=customer_%s.json
#customer_import_filename=customer_20210216173615.json
#split_customers = 1
load_yml = load.yml
tabular_yml = tabular.yml
graph_yml = graph.yml
graph_fields_mode = whitelist
#customer_import_multithreading = 1
#customer_import_numofthreads = 4
#customer_reseller_name = default
#customer_billing_profile_name = Default Billing Profile
#customer_domain = test1610072315.example.org
#customer_contact_email_format = DN0%2$s%3$s@example.org
#customer_timezone = Europe/Vienna
#subscriber_profile_set_name = subscriber_profile_1_set_65261
#subscriber_profile_name = subscriber_profile_1_65261
# sip username as webusername:
#webusername_format = %1$s
# webusername = cc+ac+sn:
#webusername_format = %2$s%3$s%4$s
# webusername = 0+ac+sn:
webusername_format = 0%3$s%4$s
# sip username as external_id:
#subscriber_externalid_format = %1$s
# external_id = cc+ac+sn:
#subscriber_externalid_format = %2$s%3$s%4$s
# external_id = 0+ac+sn:
subscriber_externalid_format = 0%3$s%4$s
# subscriber contact will be created, only if one of below is set.
subscriber_contact_email_format = DN0%2$s%3$s@domain.org
subscriber_timezone = Europe/Vienna
sqlite_db_file = sqlite
csv_dir = customer
tabular_single_row_txn = 1
ignore_tabular_unique = 0
tabular_fields_yml = tabular_fields.yml
load_recursive_yml = load_recursive.yml
sqlite_db_file = sqlite
cf_default_priority: 1
cf_default_timeout: 300
cft_default_ringtimeout: 20
#customer_import_filename=customer_20210216173615.json
#split_customers = 1
#customer_import_multithreading = 1
#customer_import_numofthreads = 4
#customer_reseller_name = default
#customer_billing_profile_name = Default Billing Profile
#customer_domain = test1610072315.example.org
#customer_contact_email_format = DN0%2$s%3$s@example.org
#customer_timezone = Europe/Vienna
#subscriber_profile_set_name = subscriber_profile_1_set_65261
#subscriber_profile_name = subscriber_profile_1_65261
## sip username as webusername:
##webusername_format = %1$s
## webusername = cc+ac+sn:
##webusername_format = %2$s%3$s%4$s
## webusername = 0+ac+sn:
#webusername_format = 0%3$s%4$s
## sip username as external_id:
##subscriber_externalid_format = %1$s
## external_id = cc+ac+sn:
##subscriber_externalid_format = %2$s%3$s%4$s
## external_id = 0+ac+sn:
#subscriber_externalid_format = 0%3$s%4$s
## subscriber contact will be created, only if one of below is set.
#subscriber_contact_email_format = DN0%2$s%3$s@domain.org
#subscriber_timezone = Europe/Vienna
#write sql files for legacy db to set/unset the is_external pref of migrated subscribers:
#cf_default_priority: 1
#cf_default_timeout: 300
#cft_default_ringtimeout: 20
rollback_sql_export_filename_format = delete_subscribers_%s.sql
rollback_sql_stmt_format = start transaction;call billing.remove_subscriber("%1$s",%2$s);commit;
##write sql files for legacy db to set/unset the is_external pref of migrated subscribers:
#
#rollback_sql_export_filename_format = delete_subscribers_%s.sql
#rollback_sql_stmt_format = start transaction;call billing.remove_subscriber("%1$s",%2$s);commit;

@ -1,6 +1,12 @@
# tabular.yml: define which *subscriber* columns to add tabular (.db/.csv) exports.
- path: contract.id
- path: id
- path: username
transform: !!perl/code |
{
my ($id,$bill_subs) = @_;
return $id;
}
- path: primary_number.cc
- path: primary_number.ac
- path: primary_number.sn

@ -29,16 +29,24 @@ our @EXPORT_OK = qw(
$format_php
$format_perl
$format_storable_base64
@formats
);
#$format_storable
#$format_storable
#our $format_storable = 0;
our @formats = ();
our $format_xml = 1;
push(@formats,$format_xml);
our $format_yaml = 2;
push(@formats,$format_yaml);
our $format_json = 3;
push(@formats,$format_json);
our $format_php = 4;
push(@formats,$format_php);
our $format_perl = 5;
push(@formats,$format_perl);
our $format_storable_base64 = 6;
push(@formats,$format_storable_base64);
use MIME::Base64 qw(encode_base64 decode_base64);

@ -46,6 +46,8 @@ use HTML::PullParser qw();
use HTML::Entities qw(decode_entities);
use IO::Uncompress::Unzip qw(unzip $UnzipError);
use File::Copy qw();
# no debian package yet:
#use DateTime::Format::Excel;
@ -432,6 +434,21 @@ sub _gettablefilename {
}
sub copytablefile {
my $self = shift;
my $tablename = shift;
my $target = shift;
my $tablefilename = $self->_gettablefilename($tablename);
$self->db_disconnect();
if (File::Copy::copy($tablefilename,$target)) {
dbinfo($self,"$tablefilename copied to $target",getlogger(__PACKAGE__));
} else {
dberror($self,"copy from $tablefilename to $target failed: $!",getlogger(__PACKAGE__));
}
}
sub create_texttable {
my $self = shift;

@ -26,6 +26,8 @@ use DBI 1.608 qw(:sql_types);
use DBD::SQLite 1.29;
use NGCP::BulkProcessor::Array qw(arrayeq contains setcontains);
use File::Copy qw();
use NGCP::BulkProcessor::Utils qw(
tempfilename
timestampdigits
@ -107,6 +109,19 @@ sub _connectidentifier {
}
sub copydbfile {
my $self = shift;
my $target = shift;
$self->db_disconnect();
if (File::Copy::copy($self->{dbfilename},$target)) {
dbinfo($self,"$self->{dbfilename} copied to $target",getlogger(__PACKAGE__));
} else {
dberror($self,"copy from $self->{dbfilename} to $target failed: $!",getlogger(__PACKAGE__));
}
}
sub tableidentifier {
my $self = shift;

@ -60,13 +60,13 @@ sub load_relation {
}
my $include = $load_recursive->{$relation_path};
my $filter;
my $transfrom;
my $transform;
if ('HASH' eq ref $include) {
$filter = $include->{filter};
$transfrom = $include->{transform};
$transform = $include->{transform};
if (exists $include->{include}) {
$include = $include->{include};
} elsif ($transfrom or $filter) {
} elsif ($transform or $filter) {
$include = 1;
}
}
@ -81,8 +81,8 @@ sub load_relation {
my $closure = _closure($filter,$load_recursive->{_context});
$self->{$relation} = [ grep { $closure->($_); } @{$self->{$relation}}];
}
if ('CODE' eq ref $transfrom) {
my $closure = _closure($transfrom,$load_recursive->{_context});
if ('CODE' eq ref $transform) {
my $closure = _closure($transform,$load_recursive->{_context});
$self->{$relation} = $closure->($self->{$relation});
}
$load_recursive->{_relation_path} = $relation_path_backup;

Loading…
Cancel
Save