diff --git a/lib/NGCP/BulkProcessor/DSPath.pm b/lib/NGCP/BulkProcessor/DSPath.pm index 8431f41..2b37786 100644 --- a/lib/NGCP/BulkProcessor/DSPath.pm +++ b/lib/NGCP/BulkProcessor/DSPath.pm @@ -36,8 +36,16 @@ sub new { }, not_a_coderef_or_method => $callbacks->{not_a_coderef_or_method} // sub { my ($data, $key, $index, $value, $rest) = @_; - die "tried to retrieve from a non-existent coderef or method: $key in $data"; - } + die "tried to retrieve from a non-existent coderef or method: $key in $data\n"; + }, + filter => $callbacks->{filter} // sub { + my ($item_path) = @_; + die "no filter callback method provided\n"; + }, + transform => $callbacks->{transform} // sub { + my ($item_path) = @_; + die "no transform callback method provided\n"; + }, }, }; return bless $self,$class; @@ -101,4 +109,85 @@ sub get { return $value; } +# filter graph (in-place) +sub filter { + my ($self,$data,$path) = @_; + + # set data to + $data //= $self->{data}; + + $self->_filter($data,$path); + + return $self; #support chaining +} + +sub _filter { + my ($self,$data,$path) = @_; + + my $reftype = reftype $data; + return unless $reftype; + my $include = 0; + if ("ARRAY" eq $reftype) { + my ($i,$j) = (0, 0); + foreach (@$data) { + my $item_path = (length($path) ? $path : '') . '[' . $j . ']'; + if ($self->{callbacks}->{filter}->($item_path) + or $self->_filter($data->[$i],$item_path)) { + $include = 1; + $i++; + } else { + splice(@$data, $i, 1); + } + $j++; + } + } elsif ("HASH" eq $reftype) { + foreach my $key (keys %$data) { + my $item_path = (length($path) ? $path . '.' : '') . $key; + if ($self->{callbacks}->{filter}->($item_path) + or $self->_filter($data->{$key},$item_path)) { + $include = 1; + } else { + delete $data->{$key}; + } + } + } + return $include; + +} + +# transform graph (in-place) +sub transform { + my ($self,$data,$path) = @_; + + # set data to + $data //= $self->{data}; + + $self->_transform($data,$path); + + return $self; #support chaining +} + +sub _transform { + my ($self,$data,$path) = @_; + + my $reftype = reftype $data; + return unless $reftype; + if ("ARRAY" eq $reftype) { + my $i = 0; + foreach (@$data) { + my $item_path = (length($path) ? $path : '') . '[' . $i . ']'; + $self->_transform($data->[$i],$item_path); + $data->[$i] = $self->{callbacks}->{transform}->($data->[$i],$item_path); + $i++; + } + } elsif ("HASH" eq $reftype) { + foreach my $key (keys %$data) { + my $item_path = (length($path) ? $path . '.' : '') . $key; + $self->_transform($data->{$key},$item_path); + $data->{$key} = $self->{callbacks}->{transform}->($data->{$key},$item_path); + } + } + +} + 1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm index a5e4856..cd7d692 100644 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Dao/Tabular.pm @@ -7,7 +7,6 @@ use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw( get_sqlite_db destroy_all_dbs ); -#import_db_tableidentifier use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( $tabular_fields @@ -18,14 +17,11 @@ use NGCP::BulkProcessor::SqlProcessor qw( create_targettable checktableinfo copy_row - insert_stmt - + transfer_table ); -#process_table -use NGCP::BulkProcessor::SqlRecord qw(); -#use NGCP::BulkProcessor::Projects::Migration::Teletek::Dao::import::Subscriber qw(); +use NGCP::BulkProcessor::SqlRecord qw(); require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); @@ -38,7 +34,6 @@ our @EXPORT_OK = qw( get_fieldnames - update_delta findby_delta countby_delta @@ -46,16 +41,12 @@ our @EXPORT_OK = qw( $deleted_delta $updated_delta $added_delta - + + copy_table ); -#@fieldnames -#findby_sipusername -#findby_ccacsn -#countby_ccacsn my $tablename = 'tabular'; my $get_db = \&get_sqlite_db; -#my $get_tablename = \&import_db_tableidentifier; my $fieldnames; my $expected_fieldnames; @@ -76,13 +67,10 @@ sub get_fieldnames { return $expected_fieldnames; } -# table creation: my $primarykey_fieldnames = [ 'uuid' ]; my $indexes = { - #$tablename . '_username_domain' => [ 'username', 'domain' ], $tablename . '_delta' => [ 'delta(7)' ], }; -#my $fixtable_statements = []; our $deleted_delta = 'DELETED'; our $updated_delta = 'UPDATED'; @@ -198,6 +186,26 @@ sub countby_delta { } +sub copy_table { + + my ($get_target_db) = @_; + + check_table(); + #checktableinfo($get_target_db, + # __PACKAGE__,$tablename, + # get_fieldnames(1), + # $indexes); + + return transfer_table( + get_db => $get_db, + class => __PACKAGE__, + get_target_db => $get_target_db, + targetclass => __PACKAGE__, + targettablename => $tablename, + ); + +} + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; @@ -219,47 +227,6 @@ sub buildrecords_fromrows { } -#sub process_records { -# -# my %params = @_; -# my ($process_code, -# $static_context, -# $init_process_context_code, -# $uninit_process_context_code, -# $multithreading, -# $numofthreads) = @params{qw/ -# process_code -# static_context -# init_process_context_code -# uninit_process_context_code -# multithreading -# numofthreads -# /}; -# -# check_table(); -# my $db = &$get_db(); -# my $table = $db->tableidentifier($tablename); -# -# my @cols = map { $db->columnidentifier($_); } qw/domain sip_username/; -# -# return process_table( -# get_db => $get_db, -# class => __PACKAGE__, -# process_code => sub { -# my ($context,$rowblock,$row_offset) = @_; -# return &$process_code($context,$rowblock,$row_offset); -# }, -# static_context => $static_context, -# init_process_context_code => $init_process_context_code, -# uninit_process_context_code => $uninit_process_context_code, -# destroy_reader_dbs_code => \&destroy_all_dbs, -# multithreading => $multithreading, -# tableprocessing_threads => $numofthreads, -# 'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols), -# 'selectcount' => 'SELECT COUNT(DISTINCT(' . join(',',@cols) . ')) FROM ' . $table, -# ); -#} - sub getinsertstatement { my ($insert_ignore) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm index 4fcabae..11fe980 100644 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ExportCustomers.pm @@ -5,8 +5,10 @@ use strict; use threads::shared qw(); +use Tie::IxHash; + use NGCP::BulkProcessor::Serialization qw(); -use Scalar::Util 'blessed'; +use Scalar::Util qw(blessed); use MIME::Base64 qw(encode_base64); use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( @@ -16,7 +18,6 @@ use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( $export_customers_multithreading $export_customers_numofthreads $export_customers_blocksize - run_dao_method get_dao_var @@ -29,6 +30,8 @@ use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( $load_recursive $tabular_single_row_txn $ignore_tabular_unique + $graph_fields + $graph_fields_mode ); use NGCP::BulkProcessor::Logging qw ( @@ -65,11 +68,50 @@ our @EXPORT_OK = qw( export_customers_tabular ); +sub _init_graph_field_map { + my $context = shift; + my %graph_field_map = (); + my %graph_field_globs = (); + tie(%graph_field_globs, 'Tie::IxHash'); + foreach my $graph_field (@$graph_fields) { + my ($c,$a); + if ('HASH' eq ref $graph_field) { + $a = $graph_field->{path}; + $c = $graph_field; + } else { + $a = $graph_field; + $c = 1; + } + #my @a = (); + #my $b = ''; + #foreach my $c (split(/\./,$a)) { + # + # foreach my () { + # $b .= $_ + # push() + # } + #} + if ($a =~ /\*/) { + $a = quotemeta($a); + $a =~ s/(\\\*)+/[^.]+/g; + $a = '^' . $a . '$'; + $graph_field_globs{$a} = $c unless exists $graph_field_globs{$a}; + } else { + $graph_field_map{$a} = $c unless exists $graph_field_map{$a}; + } + } + $context->{graph_field_map} = \%graph_field_map; + $context->{graph_field_globs} = \%graph_field_globs; +} + sub export_customers_graph { my $static_context = { }; + _init_graph_field_map($static_context); + ($static_context->{export_filename},$static_context->{export_format}) = get_export_filename($customer_export_filename_format); + my $result = 1; #_copy_customers_checks($static_context); destroy_all_dbs(); @@ -83,21 +125,18 @@ sub export_customers_graph { my @data = (); foreach my $record (@$records) { next unless _export_customer_graph_init_context($context,$record); - push(@data,_get_contract_graph($context->{contract})); + push(@data,_get_contract_graph($context)); } write_export_file(\@data,$context->{export_filename},$context->{export_format}); return 1; }, init_process_context_code => sub { my ($context)= @_; - #$context->{db} = &get_xa_db(); $context->{error_count} = 0; $context->{warning_count} = 0; - ($context->{export_filename},$context->{export_format}) = get_export_filename($customer_export_filename_format); }, uninit_process_context_code => sub { my ($context)= @_; - #undef $context->{db}; destroy_all_dbs(); { lock $warning_count; @@ -226,69 +265,72 @@ sub _export_customer_graph_init_context { sub _get_contract_graph { my ($context) = @_; - #sub unshare { - # - # my ($obj,) = @_; - # my $ref = ref $obj; - # if ("ARRAY" eq $ref) { - # my @array = (); - # my $i = 0; - # foreach my $value (@$obj) { - # push(@array, unshare($value)) if xx; - # $i++; - # } - # return \@array; - # } elsif ($ref eq "HASH") { - # my %hash = (); - # foreach my $key (keys %$obj) { - # $hash{$key} = unshare($obj->{$key}) if xx; - # } - # return \%hash; - # } - # - #} - - foreach my $bill_subs (@{$context->{contract}->{voip_subscribers}}) { - ($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) = - array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, - sub { return shift->{attribute}; }, sub { my $p = shift; }, 'group' ); - if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) { - foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) { - foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) { - $voicemail->{recording} = encode_base64($voicemail->{recording},''); + my $dp = NGCP::BulkProcessor::DSPath->new($context->{contract}, { + filter => sub { + my $path = shift; + if ('whitelist' eq $graph_fields_mode) { + my $include; + if (exists $context->{graph_field_map}->{$path}) { + $include = $context->{graph_field_map}->{$path}; + } else { + foreach my $glob (keys %{$context->{graph_field_globs}}) { + if ($path =~ /$glob/) { + $include = $context->{graph_field_globs}->{$glob}; + last; + } + } + } + if ('HASH' eq ref $include) { + if (exists $include->{include}) { + return $include->{include}; + } + return 1; + } else { + return $include; + } + } elsif ('blacklist' eq $graph_fields_mode) { + my $exclude; + if (exists $context->{graph_field_map}->{$path}) { + $exclude = $context->{graph_field_map}->{$path}; + } else { + foreach my $glob (keys %{$context->{graph_field_globs}}) { + if ($path =~ /$glob/) { + $exclude = $context->{graph_field_globs}->{$glob}; + last; + } + } + } + if ('HASH' eq ref $exclude) { + if (exists $exclude->{exclude}) { + return not $exclude->{exclude}; + } elsif ($exclude->{transform}) { + return 1; + } + return 0; + } else { + return not $exclude; } } - } - my $dp = NGCP::BulkProcessor::DSPath->new($bill_subs, { - retrieve_key_from_non_hash => sub {}, - key_does_not_exist => sub {}, - index_does_not_exist => sub {}, - }); - #foreach my $graph_field (@$graph_fields) { - # my $a; - # my $sep = ','; - # if ('HASH' eq ref $tabular_field) { - # $a = $tabular_field->{path}; - # $sep = $tabular_field->{sep}; - # } else { - # $a = $tabular_field; - # } - # #eval {'' . ($dp->get('.' . $a) // '');}; if($@){ - # # my $x=5; - # #} - # my $v = $dp->get('.' . $a); - # if ('ARRAY' eq ref $v) { - # if ('HASH' eq ref $v->[0]) { - # $v = join($sep, sort map { $_->{$tabular_field->{field}}; } @$v); - # } else { - # $v = join($sep, sort @$v); - # } - # } else { - # $v = '' . ($v // ''); - # } - # push(@row,$v); - #} - } + }, + transform => sub { + return shift; + # ($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) = + # array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, + # sub { return shift->{attribute}; }, sub { my $p = shift; }, 'group' ); + # if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) { + # foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) { + # foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) { + # $voicemail->{recording} = encode_base64($voicemail->{recording},''); + # } + # } + # } + }, + }); + + $dp->filter()->transform(); + + return $context->{contract}; + } sub _export_customer_tabular_init_context { @@ -335,9 +377,11 @@ sub _get_subscriber_rows { foreach my $tabular_field (@$tabular_fields) { my $a; my $sep = ','; + my $transform; if ('HASH' eq ref $tabular_field) { $a = $tabular_field->{path}; $sep = $tabular_field->{sep}; + $transform = $tabular_field->{transform}; } else { $a = $tabular_field; } @@ -345,6 +389,10 @@ sub _get_subscriber_rows { # my $x=5; #} my $v = $dp->get('.' . $a); + if ('CODE' eq ref $transform) { + my $closure = _closure($transform,_get_closure_context($context)); + $v = $closure->($v,$bill_subs); + } if ('ARRAY' eq ref $v) { if ('HASH' eq ref $v->[0] or (blessed($v->[0]) and $v->[0]->isa('NGCP::BulkProcessor::SqlRecord'))) { @@ -376,13 +424,7 @@ sub _load_contract { my ($context,$record) = @_; $context->{contract} = run_dao_method('billing::contracts::findby_id', $record->{id}, { %$load_recursive, #'contracts.voip_subscribers.domain' => 1, - _context => { - _info => \&_info, - _error => \&_error, - _debug => \&_debug, - _warn => \&_warn, - context => $context, - }, + _context => _get_closure_context($context), }); return 1 if $context->{contract}; @@ -390,6 +432,28 @@ sub _load_contract { } +sub _get_closure_context { + my $context = shift; + return { + _info => \&_info, + _error => \&_error, + _debug => \&_debug, + _warn => \&_warn, + context => $context, + }; +} + +sub _closure { + my ($sub,$context) = @_; + return sub { + foreach my $key (keys %$context) { + no strict "refs"; ## no critic (ProhibitNoStrict) + *{"main::$key"} = $context->{$key} if 'CODE' eq ref $context->{$key}; + } + return $sub->(@_,$context); + }; +} + sub _error { my ($context,$message) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm index b5fed1a..31e8d56 100644 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/ProjectConnectorPool.pm @@ -8,7 +8,7 @@ use Cwd; use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../'); use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( - + $csv_dir $sqlite_db_file ); @@ -16,6 +16,7 @@ use NGCP::BulkProcessor::ConnectorPool qw( get_connectorinstancename ); +use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(); use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode); use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); @@ -26,6 +27,9 @@ our @EXPORT_OK = qw( get_sqlite_db sqlite_db_tableidentifier + + get_csv_db + csv_db_tableidentifier destroy_dbs destroy_all_dbs @@ -34,14 +38,15 @@ our @EXPORT_OK = qw( ); my $sqlite_dbs = {}; +my $csv_dbs = {}; sub get_sqlite_db { my ($instance_name,$reconnect) = @_; - my $name = get_connectorinstancename($instance_name); #threadid(); #shift; + my $name = get_connectorinstancename($instance_name); if (not defined $sqlite_dbs->{$name}) { - $sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); #$name); + $sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); if (not defined $reconnect) { $reconnect = 1; } @@ -62,6 +67,31 @@ sub sqlite_db_tableidentifier { } +sub get_csv_db { + + my ($instance_name,$reconnect) = @_; + my $name = get_connectorinstancename($instance_name); + if (not defined $csv_dbs->{$name}) { + $csv_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::CSVDB->new($instance_name); + if (not defined $reconnect) { + $reconnect = 1; + } + } + if ($reconnect) { + $csv_dbs->{$name}->db_connect($csv_dir); + } + return $csv_dbs->{$name}; + +} + +sub csv_db_tableidentifier { + + my ($get_target_db,$tablename) = @_; + my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; + return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::CSVDB::get_tableidentifier($tablename,$csv_dir)); + +} + sub destroy_dbs { foreach my $name (keys %$sqlite_dbs) { @@ -69,6 +99,12 @@ sub destroy_dbs { undef $sqlite_dbs->{$name}; delete $sqlite_dbs->{$name}; } + + foreach my $name (keys %$csv_dbs) { + cleartableinfo($csv_dbs->{$name}); + undef $csv_dbs->{$name}; + delete $csv_dbs->{$name}; + } } diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm index 801cc7d..512424f 100644 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/Settings.pm @@ -12,7 +12,6 @@ use DateTime::TimeZone qw(); use JSON -support_by_pp, -no_export; *NGCP::BulkProcessor::Serialization::serialize_json = sub { my $input_ref = shift; - #return JSON::XS::encode_json($input_ref); return JSON::to_json($input_ref, { allow_nonref => 1, allow_blessed => 1, convert_blessed => 1, pretty => 1, as_nonblessed => 1 }); }; @@ -40,8 +39,9 @@ use NGCP::BulkProcessor::LoadConfig qw( split_tuple parse_regexp ); + use NGCP::BulkProcessor::Utils qw(prompt timestampdigits threadid load_module); -#format_number + use NGCP::BulkProcessor::Array qw(contains); require Exporter; @@ -55,16 +55,21 @@ our @EXPORT_OK = qw( write_sql_file update_load_recursive - $load_recursive_yml + $load_yml $load_recursive update_tabular_fields - $tabular_fields_yml + $tabular_yml $tabular_fields $ignore_tabular_unique $tabular_single_row_txn + $graph_yml + $graph_fields + $graph_fields_mode + update_graph_fields $sqlite_db_file + $csv_dir check_dry @@ -75,7 +80,6 @@ our @EXPORT_OK = qw( $customer_import_filename $split_customers - $defaultsettings $defaultconfig @@ -83,11 +87,10 @@ our @EXPORT_OK = qw( $skip_errors $force - $export_customers_multithreading $export_customers_numofthreads $export_customers_blocksize - + $cf_default_priority $cf_default_timeout @@ -100,37 +103,42 @@ our @EXPORT_OK = qw( our $defaultconfig = 'config.cfg'; our $defaultsettings = 'settings.cfg'; -our $tabular_fields_yml = 'tabular_fields.yml'; +our $tabular_yml = 'tabular.yml'; our $tabular_fields = []; our $ignore_tabular_unique = 0; our $tabular_single_row_txn = 1; -our $load_recursive_yml = 'load_recursive_yml.yml'; +our $graph_yml = 'graph.yml'; +our $graph_fields = []; +our $graph_fields_mode = 'whitelist'; +my @graph_fields_modes = qw(whitelist blacklist); + +our $load_yml = 'load.yml'; our $load_recursive; our $output_path = $working_path . 'output/'; our $input_path = $working_path . 'input/'; +our $csv_dir = 'customer'; our $customer_export_filename_format = undef; -our $customer_import_filename = undef; -our $customer_import_numofthreads = $cpucount; -our $customer_import_multithreading = 1; -our $customer_reseller_name = 'default'; -our $customer_billing_profile_name = 'Default Billing Profile'; -our $customer_domain = undef; -our $customer_contact_email_format = '%s@example.org'; -our $subscriber_contact_email_format = '%s@example.org'; -our $split_customers = 0; +#our $customer_import_filename = undef; +#our $customer_import_numofthreads = $cpucount; +#our $customer_import_multithreading = 1; +#our $customer_reseller_name = 'default'; +#our $customer_billing_profile_name = 'Default Billing Profile'; +#our $customer_domain = undef; +#our $customer_contact_email_format = '%s@example.org'; +#our $subscriber_contact_email_format = '%s@example.org'; +#our $split_customers = 0; -our $subscriber_timezone = undef; -our $contract_timezone = undef; - -our $subscriber_profile_set_name = undef; -our $subscriber_profile_name = undef; -our $webusername_format = '%1$s'; -our $subscriber_externalid_format = undef; +#our $subscriber_timezone = undef; +#our $contract_timezone = undef; +#our $subscriber_profile_set_name = undef; +#our $subscriber_profile_name = undef; +#our $webusername_format = '%1$s'; +#our $subscriber_externalid_format = undef; our $force = 0; our $dry = 0; @@ -141,18 +149,16 @@ my @supported_mr = ('Trunk'); our $sqlite_db_file = 'sqlite'; - our $export_customers_multithreading = $enablemultithreading; our $export_customers_numofthreads = $cpucount; our $export_customers_blocksize = 1000; +#our $cf_default_priority = 1; +#our $cf_default_timeout = 300; +#our $cft_default_ringtimeout = 20; -our $cf_default_priority = 1; -our $cf_default_timeout = 300; -our $cft_default_ringtimeout = 20; - -our $rollback_sql_export_filename_format = undef; -our $rollback_sql_stmt_format = undef; +#our $rollback_sql_export_filename_format = undef; +#our $rollback_sql_stmt_format = undef; my $file_lock :shared = undef; @@ -171,72 +177,75 @@ sub update_settings { $customer_export_filename_format = $data->{customer_export_filename} if exists $data->{customer_export_filename}; get_export_filename($data->{customer_export_filename},$configfile); - - - $rollback_sql_export_filename_format = $data->{rollback_sql_export_filename_format} if exists $data->{rollback_sql_export_filename_format}; - get_export_filename($data->{rollback_sql_export_filename_format},$configfile); - $rollback_sql_stmt_format = $data->{rollback_sql_stmt_format} if exists $data->{rollback_sql_stmt_format}; + + #$rollback_sql_export_filename_format = $data->{rollback_sql_export_filename_format} if exists $data->{rollback_sql_export_filename_format}; + #get_export_filename($data->{rollback_sql_export_filename_format},$configfile); + #$rollback_sql_stmt_format = $data->{rollback_sql_stmt_format} if exists $data->{rollback_sql_stmt_format}; $sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file}; - - $customer_import_filename = _get_import_filename($customer_import_filename,$data,'customer_import_filename'); - $customer_import_multithreading = $data->{customer_import_multithreading} if exists $data->{customer_import_multithreading}; - $customer_import_numofthreads = _get_numofthreads($cpucount,$data,'customer_import_numofthreads'); - $customer_reseller_name = $data->{customer_reseller_name} if exists $data->{customer_reseller_name}; - $customer_billing_profile_name = $data->{customer_billing_profile_name} if exists $data->{customer_billing_profile_name}; - $customer_domain = $data->{customer_domain} if exists $data->{customer_domain}; - $customer_contact_email_format = $data->{customer_contact_email_format} if exists $data->{customer_contact_email_format}; - $subscriber_contact_email_format = $data->{subscriber_contact_email_format} if exists $data->{subscriber_contact_email_format}; - $split_customers = $data->{split_customers} if exists $data->{split_customers}; + $csv_dir = $data->{csv_dir} if exists $data->{csv_dir}; + + #$customer_import_filename = _get_import_filename($customer_import_filename,$data,'customer_import_filename'); + #$customer_import_multithreading = $data->{customer_import_multithreading} if exists $data->{customer_import_multithreading}; + #$customer_import_numofthreads = _get_numofthreads($cpucount,$data,'customer_import_numofthreads'); + #$customer_reseller_name = $data->{customer_reseller_name} if exists $data->{customer_reseller_name}; + #$customer_billing_profile_name = $data->{customer_billing_profile_name} if exists $data->{customer_billing_profile_name}; + #$customer_domain = $data->{customer_domain} if exists $data->{customer_domain}; + #$customer_contact_email_format = $data->{customer_contact_email_format} if exists $data->{customer_contact_email_format}; + #$subscriber_contact_email_format = $data->{subscriber_contact_email_format} if exists $data->{subscriber_contact_email_format}; + #$split_customers = $data->{split_customers} if exists $data->{split_customers}; - $contract_timezone = $data->{customer_timezone} if exists $data->{customer_timezone}; - if ($contract_timezone and not DateTime::TimeZone->is_valid_name($contract_timezone)) { - configurationerror($configfile,"invalid customer_timezone '$contract_timezone'"); - $result = 0; - } - - $subscriber_timezone = $data->{subscriber_timezone} if exists $data->{subscriber_timezone}; - if ($subscriber_timezone and not DateTime::TimeZone->is_valid_name($subscriber_timezone)) { - configurationerror($configfile,"invalid subscriber_timezone '$subscriber_timezone'"); - $result = 0; - } + #$contract_timezone = $data->{customer_timezone} if exists $data->{customer_timezone}; + #if ($contract_timezone and not DateTime::TimeZone->is_valid_name($contract_timezone)) { + # configurationerror($configfile,"invalid customer_timezone '$contract_timezone'"); + # $result = 0; + #} + + #$subscriber_timezone = $data->{subscriber_timezone} if exists $data->{subscriber_timezone}; + #if ($subscriber_timezone and not DateTime::TimeZone->is_valid_name($subscriber_timezone)) { + # configurationerror($configfile,"invalid subscriber_timezone '$subscriber_timezone'"); + # $result = 0; + #} - $subscriber_profile_set_name = $data->{subscriber_profile_set_name} if exists $data->{subscriber_profile_set_name}; - $subscriber_profile_name = $data->{subscriber_profile_name} if exists $data->{subscriber_profile_name}; - if ($subscriber_profile_set_name and not $subscriber_profile_name - or not $subscriber_profile_set_name and $subscriber_profile_name) { - configurationerror($configfile,"both subscriber_profile_set_name and subscriber_profile_name required"); - $result = 0; - } - $webusername_format = $data->{webusername_format} if exists $data->{webusername_format}; - $subscriber_externalid_format = $data->{subscriber_externalid_format} if exists $data->{subscriber_externalid_format}; + #$subscriber_profile_set_name = $data->{subscriber_profile_set_name} if exists $data->{subscriber_profile_set_name}; + #$subscriber_profile_name = $data->{subscriber_profile_name} if exists $data->{subscriber_profile_name}; + #if ($subscriber_profile_set_name and not $subscriber_profile_name + # or not $subscriber_profile_set_name and $subscriber_profile_name) { + # configurationerror($configfile,"both subscriber_profile_set_name and subscriber_profile_name required"); + # $result = 0; + #} + #$webusername_format = $data->{webusername_format} if exists $data->{webusername_format}; + #$subscriber_externalid_format = $data->{subscriber_externalid_format} if exists $data->{subscriber_externalid_format}; $dry = $data->{dry} if exists $data->{dry}; $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; - $export_customers_multithreading = $data->{export_customers_multithreading} if exists $data->{export_customers_multithreading}; $export_customers_numofthreads = _get_numofthreads($cpucount,$data,'export_customers_numofthreads'); $export_customers_blocksize = $data->{export_customers_blocksize} if exists $data->{export_customers_blocksize}; - $tabular_fields_yml = $data->{tabular_fields_yml} if exists $data->{tabular_fields_yml}; - $load_recursive_yml = $data->{load_recursive_yml} if exists $data->{load_recursive_yml}; + $tabular_yml = $data->{tabular_yml} if exists $data->{tabular_yml}; + $graph_yml = $data->{graph_yml} if exists $data->{graph_yml}; + $graph_fields_mode = $data->{graph_fields_mode} if exists $data->{graph_fields_mode}; + if (not $graph_fields_mode or not contains($graph_fields_mode,\@graph_fields_modes)) { + configurationerror($configfile,'graph_fields_mode must be one of ' . join(', ', @graph_fields_modes)); + $result = 0; + } + $load_yml = $data->{load_yml} if exists $data->{load_yml}; $tabular_single_row_txn = $data->{tabular_single_row_txn} if exists $data->{tabular_single_row_txn}; $ignore_tabular_unique = $data->{ignore_tabular_unique} if exists $data->{ignore_tabular_unique}; - $cf_default_priority = $data->{cf_default_priority} if exists $data->{cf_default_priority}; - $cf_default_timeout = $data->{cf_default_timeout} if exists $data->{cf_default_timeout}; - $cft_default_ringtimeout = $data->{cft_default_ringtimeout} if exists $data->{cft_default_ringtimeout}; + #$cf_default_priority = $data->{cf_default_priority} if exists $data->{cf_default_priority}; + #$cf_default_timeout = $data->{cf_default_timeout} if exists $data->{cf_default_timeout}; + #$cft_default_ringtimeout = $data->{cft_default_ringtimeout} if exists $data->{cft_default_ringtimeout}; $mr = $data->{schema_version}; if (not defined $mr or not contains($mr,\@supported_mr)) { - configurationerror($configfile,'version must be one of ' . join(', ', @supported_mr)); + configurationerror($configfile,'schema_version must be one of ' . join(', ', @supported_mr)); $result = 0; } - return $result; - } return 0; @@ -290,15 +299,23 @@ sub get_export_filename { filewarn('cannot remove ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__)); $export_filename = undef; } - my ($name,$path,$suffix) = fileparse($export_filename,".json",".yml",".yaml",".sql"); + my ($name,$path,$suffix) = fileparse($export_filename,".json",".yml",".yaml",".xml",".php",".pl",".db",".csv"); if ($suffix eq '.json') { $export_format = $NGCP::BulkProcessor::Serialization::format_json; } elsif ($suffix eq '.yml' or $suffix eq '.yaml') { $export_format = $NGCP::BulkProcessor::Serialization::format_yaml; - } elsif ($suffix eq '.sql') { - + } elsif ($suffix eq '.xml') { + $export_format = $NGCP::BulkProcessor::Serialization::format_xml; + } elsif ($suffix eq '.php') { + $export_format = $NGCP::BulkProcessor::Serialization::format_php; + } elsif ($suffix eq '.pl') { + $export_format = $NGCP::BulkProcessor::Serialization::format_perl; + } elsif ($suffix eq '.db') { + $export_format = 'sqlite'; + } elsif ($suffix eq '.csv') { + $export_format = 'csv'; } else { - configurationerror($configfile,"$filename_format: either .json or .yaml export file format required"); + configurationerror($configfile,"$filename_format: either .json/.yaml/.xml/.php/.pl or .db/.csv export file format required"); } } return ($export_filename,$export_format); @@ -308,6 +325,8 @@ sub write_export_file { my ($data,$export_filename,$export_format) = @_; if (defined $export_filename) { + fileerror("invalid extension for output filename $export_filename",getlogger(__PACKAGE__)) + unless contains($export_format,\@NGCP::BulkProcessor::Serialization::formats); # "concatenated json" https://en.wikipedia.org/wiki/JSON_streaming my $str = ''; if (ref $data eq 'ARRAY') { @@ -382,6 +401,29 @@ sub update_tabular_fields { } +sub update_graph_fields { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + eval { + $graph_fields = $data; + }; + if ($@ or 'ARRAY' ne ref $graph_fields) { + $graph_fields //= []; + configurationerror($configfile,'invalid graph fields',getlogger(__PACKAGE__)); + $result = 0; + } + + return $result; + } + return 0; + +} + sub update_load_recursive { my ($data,$configfile) = @_; @@ -405,11 +447,6 @@ sub update_load_recursive { } -sub _get_sqlite_db_file { - my ($run,$name) = @_; - return ((defined $run and length($run) > 0) ? $run . '_' : '') . $name; -} - sub _get_import_filename { my ($old_value,$data,$key) = @_; my $import_filename = $old_value; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/graph.yml b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/graph.yml new file mode 100644 index 0000000..01f088a --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/graph.yml @@ -0,0 +1,5 @@ +# graph.yml: whitelist/blacklist of *contract* fields to export to .json/.yaml/.xml/... + +- id +- voip_subscribers*.provisioning_voip_subscriber.voip_usr_preferences*.attribute.attribute +- voip_subscribers*.provisioning_voip_subscriber.voip_usr_preferences*.value diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/load_recursive.yml b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/load.yml similarity index 86% rename from lib/NGCP/BulkProcessor/Projects/ETL/Customer/load_recursive.yml rename to lib/NGCP/BulkProcessor/Projects/ETL/Customer/load.yml index 218b9a0..8c14467 100644 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/load_recursive.yml +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/load.yml @@ -1,7 +1,10 @@ +# load.yml: define which *contract* relations to fetch from db. + +#contracts.voip_subscribers: 1 contracts.voip_subscribers: include: !!perl/code | { - my ($contract) = @_; + my ($contract,$context) = @_; #return 0 if $contract->{status} eq 'terminated'; return 1; } @@ -20,12 +23,12 @@ contracts.voip_subscribers: return $bill_subs; } -#contracts.contact: 1 +contracts.contact: 1 contracts.voip_subscribers.primary_number: 1 contracts.voip_subscribers.provisioning_voip_subscriber: 1 contracts.voip_subscribers.provisioning_voip_subscriber.voip_dbaliases: 1 contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences: 1 -#contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.attribute: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.attribute: 1 contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.allowed_ips: 1 contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.ncos: 1 contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.cf_mapping: 1 @@ -38,4 +41,4 @@ contracts.voip_subscribers.provisioning_voip_subscriber.voip_fax_destinations: { my ($fax_destinations,$context) = @_; return [ map { $_->{destination} . ' (' . $_->{filetype} . ')'; } @$fax_destinations ]; - } + } \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl index 59e96fe..a298f4d 100644 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/process.pl @@ -15,10 +15,14 @@ use NGCP::BulkProcessor::Globals qw(); use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( update_settings update_tabular_fields - $tabular_fields_yml + update_graph_fields + $tabular_yml + $graph_yml update_load_recursive - $load_recursive_yml + get_export_filename + $customer_export_filename_format + $load_yml check_dry $output_path @@ -27,7 +31,6 @@ use NGCP::BulkProcessor::Projects::ETL::Customer::Settings qw( $dry $skip_errors $force - ); use NGCP::BulkProcessor::Logging qw( @@ -57,17 +60,13 @@ use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); use NGCP::BulkProcessor::Mail qw( cleanupmsgfiles ); -#use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs); + +use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs); use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); -#use NGCP::BulkProcessor::RestConnectors::NGCPRestApi qw(cleanupcertfiles); -use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw(destroy_all_dbs); +use NGCP::BulkProcessor::Projects::ETL::Customer::ProjectConnectorPool qw(destroy_all_dbs get_csv_db get_sqlite_db); -#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); -#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); -#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_dbaliases qw(); -#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_cf_mappings qw(); -#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_trusted_sources qw(); +use NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular qw(); use NGCP::BulkProcessor::Projects::ETL::Customer::ExportCustomers qw( export_customers_graph @@ -77,7 +76,7 @@ use NGCP::BulkProcessor::Projects::ETL::Customer::ExportCustomers qw( # import_customers_json #); -scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet +scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; my @TASK_OPTS = (); @@ -89,8 +88,8 @@ push(@TASK_OPTS,$cleanup_task_opt); my $cleanup_all_task_opt = 'cleanup_all'; push(@TASK_OPTS,$cleanup_all_task_opt); -#my $export_customers_graph_task_opt = 'export_customers_graph'; -#push(@TASK_OPTS,$export_customers_graph_task_opt); +my $export_customers_graph_task_opt = 'export_customers_graph'; +push(@TASK_OPTS,$export_customers_graph_task_opt); my $export_customers_tabular_task_opt = 'export_customers_tabular'; push(@TASK_OPTS,$export_customers_tabular_task_opt); @@ -117,15 +116,16 @@ sub init { "dry" => \$dry, "skip-errors" => \$skip_errors, "force" => \$force, - ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); + ); $tasks = removeduplicates($tasks,1); my $result = load_config($configfile); init_log(); $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); - $result &= load_config($tabular_fields_yml,\&update_tabular_fields,$YAML_CONFIG_TYPE); - $result &= load_config($load_recursive_yml,\&update_load_recursive,$YAML_CONFIG_TYPE); + $result &= load_config($tabular_yml,\&update_tabular_fields,$YAML_CONFIG_TYPE); + $result &= load_config($graph_yml,\&update_graph_fields,$YAML_CONFIG_TYPE); + $result &= load_config($load_yml,\&update_load_recursive,$YAML_CONFIG_TYPE); return $result; } @@ -146,9 +146,9 @@ sub main() { } elsif (lc($cleanup_all_task_opt) eq lc($task)) { $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result); - #} elsif (lc($export_customers_graph_task_opt) eq lc($task)) { - # $result &= export_customers_graph_task(\@messages) if taskinfo($export_customers_graph_task_opt,$result); - # $completion |= 1; + } elsif (lc($export_customers_graph_task_opt) eq lc($task)) { + $result &= export_customers_graph_task(\@messages) if taskinfo($export_customers_graph_task_opt,$result); + $completion |= 1; } elsif (lc($export_customers_tabular_task_opt) eq lc($task)) { $result &= export_customers_tabular_task(\@messages) if taskinfo($export_customers_tabular_task_opt,$result); $completion |= 1; @@ -191,17 +191,15 @@ sub cleanup_task { my $result = 0; if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { eval { - #cleanupcvsdirs() if $clean_generated; - cleanupdbfiles() if $clean_generated; + cleanupcvsdirs(); + cleanupdbfiles(); cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); cleanupmsgfiles(\&fileerror,\&filewarn); - #cleanupcertfiles(); cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; $result = 1; }; } if ($@ or !$result) { - #print $@; push(@$messages,'working directory cleanup INCOMPLETE'); return 0; } else { @@ -240,7 +238,7 @@ sub export_customers_graph_task { } else { push(@$messages,"exporting customers (graph) completed$stats"); } - destroy_all_dbs(); #every task should leave with closed connections. + destroy_all_dbs(); return $result; } @@ -255,27 +253,36 @@ sub export_customers_tabular_task { my $err = $@; my $stats = ": $warning_count warnings"; eval { - #$stats .= "\n total mta subscriber records: " . - # NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() . ' rows'; - #my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( - # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::added_delta - #); - #$stats .= "\n new: $added_count rows"; - #my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( - # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::updated_delta - #); - #$stats .= "\n existing: $existing_count rows"; - #my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( - # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::deleted_delta - #); - #$stats .= "\n removed: $deleted_count rows"; + $stats .= "\n total subscriber records: " . + NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta() . ' rows'; + my $added_count = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta( + $NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::added_delta + ); + $stats .= "\n new: $added_count rows"; + my $existing_count = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta( + $NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::updated_delta + ); + $stats .= "\n existing: $existing_count rows"; + my $deleted_count = NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::countby_delta( + $NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::deleted_delta + ); + $stats .= "\n removed: $deleted_count rows"; + my ($export_filename,$export_format) = get_export_filename($customer_export_filename_format); + if ('sqlite' eq $export_format) { + &get_sqlite_db()->copydbfile($export_filename); + } elsif ('csv' eq $export_format) { + NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::copy_table(\&get_csv_db); + &get_csv_db()->copytablefile(NGCP::BulkProcessor::Projects::ETL::Customer::Dao::Tabular::gettablename(),$export_filename); + } else { + push(@$messages,'invalid extension for output filename $export_filename'); + } }; if ($err or !$result) { push(@$messages,"exporting customers (tabular) INCOMPLETE$stats"); } else { push(@$messages,"exporting customers (tabular) completed$stats"); } - destroy_all_dbs(); #every task should leave with closed connections. + destroy_all_dbs(); return $result; } @@ -302,7 +309,7 @@ sub export_customers_tabular_task { # } else { # push(@$messages,"importing customers (json) completed$stats"); # } -# destroy_all_dbs(); #every task should leave with closed connections. +# destroy_all_dbs(); # return $result; # #} diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg index 0c95b84..42b6cd8 100644 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/settings.cfg @@ -8,52 +8,60 @@ export_customers_numofthreads = 4 export_customers_blocksize = 1000 customer_export_filename=customer_%s.json -#customer_import_filename=customer_20210216173615.json -#split_customers = 1 +load_yml = load.yml +tabular_yml = tabular.yml +graph_yml = graph.yml +graph_fields_mode = whitelist + + + -#customer_import_multithreading = 1 -#customer_import_numofthreads = 4 -#customer_reseller_name = default -#customer_billing_profile_name = Default Billing Profile -#customer_domain = test1610072315.example.org -#customer_contact_email_format = DN0%2$s%3$s@example.org -#customer_timezone = Europe/Vienna -#subscriber_profile_set_name = subscriber_profile_1_set_65261 -#subscriber_profile_name = subscriber_profile_1_65261 -# sip username as webusername: -#webusername_format = %1$s -# webusername = cc+ac+sn: -#webusername_format = %2$s%3$s%4$s -# webusername = 0+ac+sn: -webusername_format = 0%3$s%4$s -# sip username as external_id: -#subscriber_externalid_format = %1$s -# external_id = cc+ac+sn: -#subscriber_externalid_format = %2$s%3$s%4$s -# external_id = 0+ac+sn: -subscriber_externalid_format = 0%3$s%4$s -# subscriber contact will be created, only if one of below is set. -subscriber_contact_email_format = DN0%2$s%3$s@domain.org -subscriber_timezone = Europe/Vienna + +sqlite_db_file = sqlite +csv_dir = customer tabular_single_row_txn = 1 ignore_tabular_unique = 0 -tabular_fields_yml = tabular_fields.yml -load_recursive_yml = load_recursive.yml -sqlite_db_file = sqlite -cf_default_priority: 1 -cf_default_timeout: 300 -cft_default_ringtimeout: 20 +#customer_import_filename=customer_20210216173615.json +#split_customers = 1 +#customer_import_multithreading = 1 +#customer_import_numofthreads = 4 +#customer_reseller_name = default +#customer_billing_profile_name = Default Billing Profile +#customer_domain = test1610072315.example.org +#customer_contact_email_format = DN0%2$s%3$s@example.org +#customer_timezone = Europe/Vienna +#subscriber_profile_set_name = subscriber_profile_1_set_65261 +#subscriber_profile_name = subscriber_profile_1_65261 +## sip username as webusername: +##webusername_format = %1$s +## webusername = cc+ac+sn: +##webusername_format = %2$s%3$s%4$s +## webusername = 0+ac+sn: +#webusername_format = 0%3$s%4$s +## sip username as external_id: +##subscriber_externalid_format = %1$s +## external_id = cc+ac+sn: +##subscriber_externalid_format = %2$s%3$s%4$s +## external_id = 0+ac+sn: +#subscriber_externalid_format = 0%3$s%4$s +## subscriber contact will be created, only if one of below is set. +#subscriber_contact_email_format = DN0%2$s%3$s@domain.org +#subscriber_timezone = Europe/Vienna -#write sql files for legacy db to set/unset the is_external pref of migrated subscribers: +#cf_default_priority: 1 +#cf_default_timeout: 300 +#cft_default_ringtimeout: 20 -rollback_sql_export_filename_format = delete_subscribers_%s.sql -rollback_sql_stmt_format = start transaction;call billing.remove_subscriber("%1$s",%2$s);commit; +##write sql files for legacy db to set/unset the is_external pref of migrated subscribers: +# +#rollback_sql_export_filename_format = delete_subscribers_%s.sql +#rollback_sql_stmt_format = start transaction;call billing.remove_subscriber("%1$s",%2$s);commit; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular_fields.yml b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular.yml similarity index 94% rename from lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular_fields.yml rename to lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular.yml index 0c94704..7ea0796 100644 --- a/lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular_fields.yml +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Customer/tabular.yml @@ -1,6 +1,12 @@ +# tabular.yml: define which *subscriber* columns to add tabular (.db/.csv) exports. + - path: contract.id -- path: id -- path: username + transform: !!perl/code | + { + my ($id,$bill_subs) = @_; + return $id; + } + - path: primary_number.cc - path: primary_number.ac - path: primary_number.sn diff --git a/lib/NGCP/BulkProcessor/Serialization.pm b/lib/NGCP/BulkProcessor/Serialization.pm index 0adf784..ab4266f 100644 --- a/lib/NGCP/BulkProcessor/Serialization.pm +++ b/lib/NGCP/BulkProcessor/Serialization.pm @@ -29,16 +29,24 @@ our @EXPORT_OK = qw( $format_php $format_perl $format_storable_base64 + @formats ); - #$format_storable +#$format_storable #our $format_storable = 0; +our @formats = (); our $format_xml = 1; +push(@formats,$format_xml); our $format_yaml = 2; +push(@formats,$format_yaml); our $format_json = 3; +push(@formats,$format_json); our $format_php = 4; +push(@formats,$format_php); our $format_perl = 5; +push(@formats,$format_perl); our $format_storable_base64 = 6; +push(@formats,$format_storable_base64); use MIME::Base64 qw(encode_base64 decode_base64); diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm index a1ed125..6a24d99 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/CSVDB.pm @@ -46,6 +46,8 @@ use HTML::PullParser qw(); use HTML::Entities qw(decode_entities); use IO::Uncompress::Unzip qw(unzip $UnzipError); +use File::Copy qw(); + # no debian package yet: #use DateTime::Format::Excel; @@ -432,6 +434,21 @@ sub _gettablefilename { } +sub copytablefile { + + my $self = shift; + my $tablename = shift; + my $target = shift; + my $tablefilename = $self->_gettablefilename($tablename); + $self->db_disconnect(); + if (File::Copy::copy($tablefilename,$target)) { + dbinfo($self,"$tablefilename copied to $target",getlogger(__PACKAGE__)); + } else { + dberror($self,"copy from $tablefilename to $target failed: $!",getlogger(__PACKAGE__)); + } + +} + sub create_texttable { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm b/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm index f1c4efa..7b714aa 100644 --- a/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm +++ b/lib/NGCP/BulkProcessor/SqlConnectors/SQLiteDB.pm @@ -26,6 +26,8 @@ use DBI 1.608 qw(:sql_types); use DBD::SQLite 1.29; use NGCP::BulkProcessor::Array qw(arrayeq contains setcontains); +use File::Copy qw(); + use NGCP::BulkProcessor::Utils qw( tempfilename timestampdigits @@ -107,6 +109,19 @@ sub _connectidentifier { } +sub copydbfile { + + my $self = shift; + my $target = shift; + $self->db_disconnect(); + if (File::Copy::copy($self->{dbfilename},$target)) { + dbinfo($self,"$self->{dbfilename} copied to $target",getlogger(__PACKAGE__)); + } else { + dberror($self,"copy from $self->{dbfilename} to $target failed: $!",getlogger(__PACKAGE__)); + } + +} + sub tableidentifier { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/SqlRecord.pm b/lib/NGCP/BulkProcessor/SqlRecord.pm index ac13349..1685d7f 100644 --- a/lib/NGCP/BulkProcessor/SqlRecord.pm +++ b/lib/NGCP/BulkProcessor/SqlRecord.pm @@ -60,13 +60,13 @@ sub load_relation { } my $include = $load_recursive->{$relation_path}; my $filter; - my $transfrom; + my $transform; if ('HASH' eq ref $include) { $filter = $include->{filter}; - $transfrom = $include->{transform}; + $transform = $include->{transform}; if (exists $include->{include}) { $include = $include->{include}; - } elsif ($transfrom or $filter) { + } elsif ($transform or $filter) { $include = 1; } } @@ -81,8 +81,8 @@ sub load_relation { my $closure = _closure($filter,$load_recursive->{_context}); $self->{$relation} = [ grep { $closure->($_); } @{$self->{$relation}}]; } - if ('CODE' eq ref $transfrom) { - my $closure = _closure($transfrom,$load_recursive->{_context}); + if ('CODE' eq ref $transform) { + my $closure = _closure($transform,$load_recursive->{_context}); $self->{$relation} = $closure->($self->{$relation}); } $load_recursive->{_relation_path} = $relation_path_backup;