diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm index 69be201..f121b13 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm @@ -550,6 +550,15 @@ sub buildrecords_fromrows { $record->load_relation($load_recursive,'cdr_groups','NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_group::findby_cdrid',undef,$record->{id},$load_recursive); $record->load_relation($load_recursive,'cdr_tags','NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_tag_data::findby_cdrid',undef,$record->{id},$load_recursive); + + #$record->load_relation($load_recursive,'cdr_cash_balance + #$record->load_relation($load_recursive,'cdr_export_status + #$record->load_relation($load_recursive,'cdr_mos_data + #$record->load_relation($load_recursive,'cdr_relation + #$record->load_relation($load_recursive,'cdr_time_balance + + + push @records,$record; } } diff --git a/lib/NGCP/BulkProcessor/Logging.pm b/lib/NGCP/BulkProcessor/Logging.pm index d0d2dce..c699ff9 100644 --- a/lib/NGCP/BulkProcessor/Logging.pm +++ b/lib/NGCP/BulkProcessor/Logging.pm @@ -801,7 +801,7 @@ sub fetching_entries { my ($store,$scan_pattern,$start,$blocksize,$logger) = @_; if (defined $logger) { - $logger->info(_getnosqlconnectorinstanceprefix($store) . _getnosqlconnectidentifiermessage($store,'fetching ' . $scan_pattern . ' entries: ' . ($start + 1) . '-' . ($start + $blocksize))); + $logger->info(_getnosqlconnectorinstanceprefix($store) . _getnosqlconnectidentifiermessage($store,'fetching ' . ($scan_pattern ? "$scan_pattern " : '') . 'entries: ' . ($start + 1) . '-' . ($start + $blocksize))); } } diff --git a/lib/NGCP/BulkProcessor/NoSqlConnector.pm b/lib/NGCP/BulkProcessor/NoSqlConnector.pm index f563a20..b5b6fdf 100644 --- a/lib/NGCP/BulkProcessor/NoSqlConnector.pm +++ b/lib/NGCP/BulkProcessor/NoSqlConnector.pm @@ -10,7 +10,7 @@ use NGCP::BulkProcessor::Logging qw( getlogger nosqlinfo nosqldebug); -use NGCP::BulkProcessor::LogError qw(nosqlerror); +use NGCP::BulkProcessor::LogError qw(nosqlerror notimplementederror); use NGCP::BulkProcessor::Utils qw(threadid); diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/Dao/Tabular.pm b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/Dao/Tabular.pm new file mode 100644 index 0000000..ada8853 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/Dao/Tabular.pm @@ -0,0 +1,279 @@ +package NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular; +use strict; + +## no critic + +use NGCP::BulkProcessor::Projects::ETL::CDR::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs +); + +use NGCP::BulkProcessor::Projects::ETL::CDR::Settings qw( + $tabular_fields + $csv_all_expected_fields +); + +use NGCP::BulkProcessor::SqlProcessor qw( + registertableinfo + create_targettable + checktableinfo + copy_row + insert_stmt + transfer_table +); + +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + create_table + gettablename + check_table + getinsertstatement + getupsertstatement + + get_fieldnames + + update_delta + findby_delta + countby_delta + + $deleted_delta + $updated_delta + $added_delta + + copy_table +); + +my $tablename = 'tabular'; +my $get_db = \&get_sqlite_db; + +my $fieldnames; +my $expected_fieldnames; +sub get_fieldnames { + my $expected = shift; + unless (defined $fieldnames and defined $expected_fieldnames) { + $fieldnames = [ map { + local $_ = (ref $_ ? (exists $_->{colname} ? $_->{colname} : $_->{path}) : $_); + $_ =~ s/\./_/g; + $_ =~ s/\[(\d+)\]/_$1/g; + $_; + } @$tabular_fields ]; + $expected_fieldnames = [ @$fieldnames ]; + push(@$expected_fieldnames,'id') unless grep { 'id' eq $_; } @$expected_fieldnames; + push(@$expected_fieldnames,'delta'); + } + return $fieldnames unless $expected; + return $expected_fieldnames; +} + +my $primarykey_fieldnames = [ 'id' ]; +my $indexes = { + $tablename . '_delta' => [ 'delta(7)' ], +}; + +our $deleted_delta = 'DELETED'; +our $updated_delta = 'UPDATED'; +our $added_delta = 'ADDED'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,get_fieldnames(1),$indexes); + + copy_row($self,shift,get_fieldnames(1)); + + return $self; + +} + +sub create_table { + + my ($truncate) = @_; + + my $db = &$get_db(); + + registertableinfo($db,__PACKAGE__,$tablename,get_fieldnames(1),$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); + +} + +sub findby_delta { + + my ($delta,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless defined $delta; + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('delta') . ' = ?' + , $delta); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_domainusername { + + my ($domain,$username,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless (defined $domain and defined $username); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . $table . + ' WHERE ' . $db->columnidentifier('domain') . ' = ?' . + ' AND ' . $db->columnidentifier('username') . ' = ?' + , $domain, $username); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub update_delta { + + my ($id,$delta) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'UPDATE ' . $table . ' SET delta = ?'; + my @params = (); + push(@params,$delta); + if (defined $id) { + $stmt .= ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + push(@params, $id); + } + + return $db->db_do($stmt,@params); + +} + +sub countby_delta { + + my ($deltas) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1'; + my @params = (); + if (defined $deltas and 'HASH' eq ref $deltas) { + foreach my $in (keys %$deltas) { + my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $deltas and length($deltas) > 0) { + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?'; + push(@params,$deltas); + } + + return $db->db_get_value($stmt,@params); + +} + +sub copy_table { + + my ($get_target_db) = @_; + + if ($csv_all_expected_fields) { + check_table(); + } else { + checktableinfo($get_db, + __PACKAGE__,$tablename, + get_fieldnames(0), + $indexes); + } + + return transfer_table( + get_db => $get_db, + class => __PACKAGE__, + get_target_db => $get_target_db, + targetclass => __PACKAGE__, + targettablename => $tablename, + ); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub getinsertstatement { + + my ($insert_ignore) = @_; + check_table(); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); + +} + +sub getupsertstatement { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' . + join(', ', map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @{get_fieldnames(1)}) . ')'; + my @values = (); + foreach my $fieldname (@{get_fieldnames(1)}) { + if ('delta' eq $fieldname) { + my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')'); + } else { + push(@values,'?'); + } + } + $upsert_stmt .= ' VALUES (' . join(',',@values) . ')'; + return $upsert_stmt; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + get_fieldnames(1), + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/ExportCDR.pm b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/ExportCDR.pm new file mode 100644 index 0000000..6dc6fa1 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/ExportCDR.pm @@ -0,0 +1,490 @@ +package NGCP::BulkProcessor::Projects::ETL::CDR::ExportCDR; +use strict; + +## no critic + +use threads::shared qw(); + +use Tie::IxHash; + +use NGCP::BulkProcessor::Serialization qw(); +use Scalar::Util qw(blessed); +use MIME::Base64 qw(encode_base64); + +use NGCP::BulkProcessor::Projects::ETL::CDR::Settings qw( + $dry + $skip_errors + + $export_cdr_multithreading + $export_cdr_numofthreads + $export_cdr_blocksize + + run_dao_method + get_dao_var + get_export_filename + + write_export_file + $cdr_export_filename_format + + $tabular_fields + $load_recursive + $tabular_single_row_txn + $ignore_tabular_unique + $graph_fields + $graph_fields_mode +); + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn + fileerror +); + +use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); + +use NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular qw(); + +use NGCP::BulkProcessor::Projects::ETL::CDR::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs + ping_all_dbs +); + +use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp stringtobool trim); #check_ipnet +#use NGCP::BulkProcessor::DSSorter qw(sort_by_configs); +#use NGCP::BulkProcessor::Table qw(get_rowhash); +use NGCP::BulkProcessor::Array qw(array_to_map); +use NGCP::BulkProcessor::DSPath qw(); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + export_cdr_graph + export_cdr_tabular +); + +sub _init_graph_field_map { + my $context = shift; + my %graph_field_map = (); + my %graph_field_globs = (); + tie(%graph_field_globs, 'Tie::IxHash'); + foreach my $graph_field (@$graph_fields) { + my ($c,$a); + if ('HASH' eq ref $graph_field) { + $a = $graph_field->{path}; + $c = $graph_field; + } else { + $a = $graph_field; + $c = 1; + } + #my @a = (); + #my $b = ''; + #foreach my $c (split(/\./,$a)) { + # + # foreach my () { + # $b .= $_ + # push() + # } + #} + if ($a =~ /\*/) { + $a = quotemeta($a); + $a =~ s/(\\\*)+/[^.]+/g; + $a = '^' . $a . '$'; + $graph_field_globs{$a} = $c unless exists $graph_field_globs{$a}; + } else { + $graph_field_map{$a} = $c unless exists $graph_field_map{$a}; + } + } + $context->{graph_field_map} = \%graph_field_map; + $context->{graph_field_globs} = \%graph_field_globs; +} + +sub export_cdr_graph { + + my $static_context = { + + }; + _init_graph_field_map($static_context); + ($static_context->{export_filename},$static_context->{export_format}) = get_export_filename($cdr_export_filename_format); + + my $result = 1; #_copy_cdr_checks($static_context); + + destroy_all_dbs(); + my $warning_count :shared = 0; + return ($result && run_dao_method('accounting::cdr::process_fromto', + #source_dbs => $static_context->{source_dbs}, + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + ping_all_dbs(); + my @data = (); + foreach my $record (@$records) { + next unless _export_cdr_graph_init_context($context,$record); + push(@data,_get_contract_graph($context)); + } + write_export_file(\@data,$context->{export_filename},$context->{export_format}); + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + destroy_reader_dbs_code => \&destroy_all_dbs, + blocksize => $export_cdr_blocksize, + multithreading => $export_cdr_multithreading, + numofthreads => $export_cdr_numofthreads, + ),$warning_count,); + +} + +sub export_cdr_tabular { + + my $result = NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::create_table(0); + + my $static_context = { + upsert => _tabular_rows_reset_delta(), + }; + + destroy_all_dbs(); + my $warning_count :shared = 0; + return ($result && run_dao_method('billing::contracts::process_records', + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + ping_all_dbs(); + my @subscriber_rows = (); + foreach my $record (@$records) { + next unless _export_cdr_tabular_init_context($context,$record); + push(@subscriber_rows, _get_subscriber_rows($context)); + + if ($tabular_single_row_txn and (scalar @subscriber_rows) > 0) { + while (defined (my $subscriber_row = shift @subscriber_rows)) { + if ($skip_errors) { + eval { _insert_tabular_rows($context,[$subscriber_row]); }; + _warn($context,$@) if $@; + } else { + _insert_tabular_rows($context,[$subscriber_row]); + } + } + } + } + + if (not $tabular_single_row_txn and (scalar @subscriber_rows) > 0) { + if ($skip_errors) { + eval { insert_tabular_rows($context,\@subscriber_rows); }; + _warn($context,$@) if $@; + } else { + insert_tabular_rows($context,\@subscriber_rows); + } + } + + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_sqlite_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + destroy_reader_dbs_code => \&destroy_all_dbs, + blocksize => $export_cdr_blocksize, + multithreading => $export_cdr_multithreading, + numofthreads => $export_cdr_numofthreads, + ),$warning_count,); + +} + +sub _tabular_rows_reset_delta { + my $upsert = 0; + if (NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::countby_delta() > 0) { + processing_info(threadid(),'resetting delta of ' . + NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::update_delta(undef, + $NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::deleted_delta) . + ' records',getlogger(__PACKAGE__)); + $upsert |= 1; + } + return $upsert; +} + +sub _insert_tabular_rows { + my ($context,$subscriber_rows) = @_; + $context->{db}->db_do_begin( + ($context->{upsert} ? + NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::getupsertstatement() + : NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::getinsertstatement($ignore_tabular_unique)), + ); + eval { + $context->{db}->db_do_rowblock($subscriber_rows); + $context->{db}->db_finish(); + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_finish(1); + }; + die($err); + } + +} + +sub _export_cdr_graph_init_context { + + my ($context,$record) = @_; + + my $result = 1; + + return 0 unless _load_contract($context,$record); + + return $result; + +} + +sub _get_contract_graph { + my ($context) = @_; + + my $dp = NGCP::BulkProcessor::DSPath->new($context->{contract}, { + filter => sub { + my $path = shift; + if ('whitelist' eq $graph_fields_mode) { + my $include; + if (exists $context->{graph_field_map}->{$path}) { + $include = $context->{graph_field_map}->{$path}; + } else { + foreach my $glob (keys %{$context->{graph_field_globs}}) { + if ($path =~ /$glob/) { + $include = $context->{graph_field_globs}->{$glob}; + last; + } + } + } + if ('HASH' eq ref $include) { + if (exists $include->{include}) { + return $include->{include}; + } + return 1; + } else { + return $include; + } + } elsif ('blacklist' eq $graph_fields_mode) { + my $exclude; + if (exists $context->{graph_field_map}->{$path}) { + $exclude = $context->{graph_field_map}->{$path}; + } else { + foreach my $glob (keys %{$context->{graph_field_globs}}) { + if ($path =~ /$glob/) { + $exclude = $context->{graph_field_globs}->{$glob}; + last; + } + } + } + if ('HASH' eq ref $exclude) { + if (exists $exclude->{exclude}) { + return not $exclude->{exclude}; + } elsif ($exclude->{transform}) { + return 1; + } + return 0; + } else { + return not $exclude; + } + } + }, + transform => sub { + return shift; + # ($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) = + # array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, + # sub { return shift->{attribute}; }, sub { my $p = shift; }, 'group' ); + # if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) { + # foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) { + # foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) { + # $voicemail->{recording} = encode_base64($voicemail->{recording},''); + # } + # } + # } + }, + }); + + $dp->filter()->transform(); + + return $context->{contract}; + +} + +sub _export_cdr_tabular_init_context { + + my ($context,$record) = @_; + + my $result = 1; + + return 0 unless _load_contract($context,$record); + + if (defined $context->{contract}->{voip_subscribers} + and not scalar @{$context->{contract}->{voip_subscribers}}) { + _info($context,"contract ID $record->{id} has no subscribers, skipping",1); + $result = 0; + } + + return $result; + +} + +sub _get_subscriber_rows { + + my ($context) = @_; + + my @rows = (); + foreach my $bill_subs (@{$context->{contract}->{voip_subscribers}}) { + my @row = (); + $bill_subs->{contract} = NGCP::BulkProcessor::Dao::Trunk::billing::contracts->new($context->{contract}); #no circular ref + ($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, my $as, my $vs) = + array_to_map($bill_subs->{provisioning_voip_subscriber}->{voip_usr_preferences}, + sub { return shift->{_attribute}; }, sub { my $p = shift; }, 'group' ); + if (my $prov_subscriber = $bill_subs->{provisioning_voip_subscriber}) { + foreach my $voicemail_user (@{$prov_subscriber->{voicemail_users}}) { + foreach my $voicemail (@{$voicemail_user->{voicemail_spool}}) { + $voicemail->{recording} = encode_base64($voicemail->{recording},''); + } + } + } + my $dp = NGCP::BulkProcessor::DSPath->new($bill_subs, { + retrieve_key_from_non_hash => sub {}, + key_does_not_exist => sub {}, + index_does_not_exist => sub {}, + }); + foreach my $tabular_field (@$tabular_fields) { + my $a; + my $sep = ','; + my $transform; + if ('HASH' eq ref $tabular_field) { + $a = $tabular_field->{path}; + $sep = $tabular_field->{sep}; + $transform = $tabular_field->{transform}; + } else { + $a = $tabular_field; + } + #eval {'' . ($dp->get('.' . $a) // '');}; if($@){ + # my $x=5; + #} + my $v = $dp->get('.' . $a); + if ('CODE' eq ref $transform) { + my $closure = _closure($transform,_get_closure_context($context)); + $v = $closure->($v,$bill_subs); + } + if ('ARRAY' eq ref $v) { + if ('HASH' eq ref $v->[0] + or (blessed($v->[0]) and $v->[0]->isa('NGCP::BulkProcessor::SqlRecord'))) { + $v = join($sep, sort map { $_->{$tabular_field->{field}}; } @$v); + } else { + $v = join($sep, sort @$v); + } + } else { + $v = '' . ($v // ''); + } + push(@row,$v); + } + push(@row,$bill_subs->{uuid}) unless grep { 'uuid' eq $_; } @{NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::get_fieldnames()}; + if ($context->{upsert}) { + push(@row,$bill_subs->{uuid}); + } else { + push(@row,$NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::added_delta); + } + + push(@rows,\@row); + } + + return @rows; + +} + +sub _load_contract { + + my ($context,$record) = @_; + $context->{contract} = run_dao_method('billing::contracts::findby_id', $record->{id}, { %$load_recursive, + #'contracts.voip_subscribers.domain' => 1, + _context => _get_closure_context($context), + }); + + return 1 if $context->{contract}; + return 0; + +} + +sub _get_closure_context { + my $context = shift; + return { + _info => \&_info, + _error => \&_error, + _debug => \&_debug, + _warn => \&_warn, + context => $context, + }; +} + +sub _closure { + my ($sub,$context) = @_; + return sub { + foreach my $key (keys %$context) { + no strict "refs"; ## no critic (ProhibitNoStrict) + *{"main::$key"} = $context->{$key} if 'CODE' eq ref $context->{$key}; + } + return $sub->(@_,$context); + }; +} + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + rowprocessingerror($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + rowprocessingwarn($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + } +} + +sub _debug { + + my ($context,$message,$debug) = @_; + processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/ProjectConnectorPool.pm b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/ProjectConnectorPool.pm new file mode 100644 index 0000000..2986c3d --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/ProjectConnectorPool.pm @@ -0,0 +1,120 @@ +package NGCP::BulkProcessor::Projects::ETL::CDR::ProjectConnectorPool; +use strict; + +## no critic + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../'); + +use NGCP::BulkProcessor::Projects::ETL::CDR::Settings qw( + $csv_dir + $sqlite_db_file +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_connectorinstancename +); + +use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(); +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode); + +use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + + get_sqlite_db + sqlite_db_tableidentifier + + get_csv_db + csv_db_tableidentifier + + destroy_dbs + destroy_all_dbs + ping_all_dbs + +); + +my $sqlite_dbs = {}; +my $csv_dbs = {}; + +sub get_sqlite_db { + + my ($instance_name,$reconnect) = @_; + my $name = get_connectorinstancename($instance_name); + + if (not defined $sqlite_dbs->{$name}) { + $sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); + if (not defined $reconnect) { + $reconnect = 1; + } + } + if ($reconnect) { + $sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file); + } + + return $sqlite_dbs->{$name}; + +} + +sub sqlite_db_tableidentifier { + + my ($get_target_db,$tablename) = @_; + my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; + return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file)); + +} + +sub get_csv_db { + + my ($instance_name,$reconnect) = @_; + my $name = get_connectorinstancename($instance_name); + if (not defined $csv_dbs->{$name}) { + $csv_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::CSVDB->new($instance_name); + if (not defined $reconnect) { + $reconnect = 1; + } + } + if ($reconnect) { + $csv_dbs->{$name}->db_connect($csv_dir); + } + return $csv_dbs->{$name}; + +} + +sub csv_db_tableidentifier { + + my ($get_target_db,$tablename) = @_; + my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; + return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::CSVDB::get_tableidentifier($tablename,$csv_dir)); + +} + +sub destroy_dbs { + + foreach my $name (keys %$sqlite_dbs) { + cleartableinfo($sqlite_dbs->{$name}); + undef $sqlite_dbs->{$name}; + delete $sqlite_dbs->{$name}; + } + + foreach my $name (keys %$csv_dbs) { + cleartableinfo($csv_dbs->{$name}); + undef $csv_dbs->{$name}; + delete $csv_dbs->{$name}; + } + +} + +sub destroy_all_dbs() { + destroy_dbs(); + NGCP::BulkProcessor::ConnectorPool::destroy_dbs(); +} + +sub ping_all_dbs() { + NGCP::BulkProcessor::ConnectorPool::ping_dbs(); +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/Settings.pm b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/Settings.pm new file mode 100644 index 0000000..3c770d4 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/Settings.pm @@ -0,0 +1,484 @@ +package NGCP::BulkProcessor::Projects::ETL::CDR::Settings; +use strict; + +## no critic + +use threads::shared qw(); + +use File::Basename qw(fileparse); +use NGCP::BulkProcessor::Serialization qw(); +use DateTime::TimeZone qw(); + +use JSON -support_by_pp, -no_export; +*NGCP::BulkProcessor::Serialization::serialize_json = sub { + my $input_ref = shift; + return JSON::to_json($input_ref, { allow_nonref => 1, allow_blessed => 1, convert_blessed => 1, pretty => 1, as_nonblessed => 1 }); +}; + +use NGCP::BulkProcessor::Globals qw( + $working_path + $enablemultithreading + $cpucount + create_path +); + +use NGCP::BulkProcessor::Logging qw( + getlogger + scriptinfo + configurationinfo +); + +use NGCP::BulkProcessor::LogError qw( + fileerror + filewarn + configurationwarn + configurationerror +); + +use NGCP::BulkProcessor::LoadConfig qw( + split_tuple + parse_regexp +); + +use NGCP::BulkProcessor::Utils qw(prompt timestampdigits threadid load_module); + +use NGCP::BulkProcessor::Array qw(contains); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + update_settings + run_dao_method + get_dao_var + get_export_filename + write_export_file + write_sql_file + + update_load_recursive + $load_yml + $load_recursive + + update_tabular_fields + $tabular_yml + $tabular_fields + $ignore_tabular_unique + $tabular_single_row_txn + $graph_yml + $graph_fields + $graph_fields_mode + update_graph_fields + + $sqlite_db_file + $csv_dir + + check_dry + + $output_path + $input_path + + $cdr_export_filename_format + $cdr_import_filename + + $defaultsettings + $defaultconfig + + $dry + $skip_errors + $force + + $export_cdr_multithreading + $export_cdr_numofthreads + $export_cdr_blocksize + + $csv_all_expected_fields +); +#$cf_default_priority +#$cf_default_timeout +#$cft_default_ringtimeout + +our $defaultconfig = 'config.cfg'; +our $defaultsettings = 'settings.cfg'; + +our $tabular_yml = 'tabular.yml'; +our $tabular_fields = []; +our $ignore_tabular_unique = 0; +our $tabular_single_row_txn = 1; + +our $graph_yml = 'graph.yml'; +our $graph_fields = []; +our $graph_fields_mode = 'whitelist'; +my @graph_fields_modes = qw(whitelist blacklist); + +our $load_yml = 'load.yml'; +our $load_recursive; + +our $output_path = $working_path . 'output/'; +our $input_path = $working_path . 'input/'; +our $csv_dir = 'cdr'; + +our $cdr_export_filename_format = undef; + +our $csv_all_expected_fields = 1; + +#our $cdr_import_filename = undef; +#our $cdr_import_numofthreads = $cpucount; +#our $cdr_import_multithreading = 1; +#our $cdr_reseller_name = 'default'; +#our $cdr_billing_profile_name = 'Default Billing Profile'; +#our $cdr_domain = undef; +#our $cdr_contact_email_format = '%s@example.org'; +#our $subscriber_contact_email_format = '%s@example.org'; +#our $split_cdrs = 0; + +#our $subscriber_timezone = undef; +#our $contract_timezone = undef; + +#our $subscriber_profile_set_name = undef; +#our $subscriber_profile_name = undef; +#our $webusername_format = '%1$s'; +#our $subscriber_externalid_format = undef; + +our $force = 0; +our $dry = 0; +our $skip_errors = 0; + +my $mr = 'Trunk'; +my @supported_mr = ('Trunk'); + +our $sqlite_db_file = 'sqlite'; + +our $export_cdr_multithreading = $enablemultithreading; +our $export_cdr_numofthreads = $cpucount; +our $export_cdr_blocksize = 1000; + +#our $cf_default_priority = 1; +#our $cf_default_timeout = 300; +#our $cft_default_ringtimeout = 20; + +#our $rollback_sql_export_filename_format = undef; +#our $rollback_sql_stmt_format = undef; + +my $file_lock :shared = undef; + +sub update_settings { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + my $regexp_result; + + #&$configurationinfocode("testinfomessage",$configlogger); + + $result &= _prepare_working_paths(1); + + $cdr_export_filename_format = $data->{cdr_export_filename} if exists $data->{cdr_export_filename}; + get_export_filename($data->{cdr_export_filename},$configfile); + + #$rollback_sql_export_filename_format = $data->{rollback_sql_export_filename_format} if exists $data->{rollback_sql_export_filename_format}; + #get_export_filename($data->{rollback_sql_export_filename_format},$configfile); + #$rollback_sql_stmt_format = $data->{rollback_sql_stmt_format} if exists $data->{rollback_sql_stmt_format}; + + $sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file}; + $csv_dir = $data->{csv_dir} if exists $data->{csv_dir}; + + #$cdr_import_filename = _get_import_filename($cdr_import_filename,$data,'cdr_import_filename'); + #$cdr_import_multithreading = $data->{cdr_import_multithreading} if exists $data->{cdr_import_multithreading}; + #$cdr_import_numofthreads = _get_numofthreads($cpucount,$data,'cdr_import_numofthreads'); + #$cdr_reseller_name = $data->{cdr_reseller_name} if exists $data->{cdr_reseller_name}; + #$cdr_billing_profile_name = $data->{cdr_billing_profile_name} if exists $data->{cdr_billing_profile_name}; + #$cdr_domain = $data->{cdr_domain} if exists $data->{cdr_domain}; + #$cdr_contact_email_format = $data->{cdr_contact_email_format} if exists $data->{cdr_contact_email_format}; + #$subscriber_contact_email_format = $data->{subscriber_contact_email_format} if exists $data->{subscriber_contact_email_format}; + #$split_cdrs = $data->{split_cdrs} if exists $data->{split_cdrs}; + + #$contract_timezone = $data->{cdr_timezone} if exists $data->{cdr_timezone}; + #if ($contract_timezone and not DateTime::TimeZone->is_valid_name($contract_timezone)) { + # configurationerror($configfile,"invalid cdr_timezone '$contract_timezone'"); + # $result = 0; + #} + + #$subscriber_timezone = $data->{subscriber_timezone} if exists $data->{subscriber_timezone}; + #if ($subscriber_timezone and not DateTime::TimeZone->is_valid_name($subscriber_timezone)) { + # configurationerror($configfile,"invalid subscriber_timezone '$subscriber_timezone'"); + # $result = 0; + #} + + #$subscriber_profile_set_name = $data->{subscriber_profile_set_name} if exists $data->{subscriber_profile_set_name}; + #$subscriber_profile_name = $data->{subscriber_profile_name} if exists $data->{subscriber_profile_name}; + #if ($subscriber_profile_set_name and not $subscriber_profile_name + # or not $subscriber_profile_set_name and $subscriber_profile_name) { + # configurationerror($configfile,"both subscriber_profile_set_name and subscriber_profile_name required"); + # $result = 0; + #} + #$webusername_format = $data->{webusername_format} if exists $data->{webusername_format}; + #$subscriber_externalid_format = $data->{subscriber_externalid_format} if exists $data->{subscriber_externalid_format}; + + $dry = $data->{dry} if exists $data->{dry}; + $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; + + $export_cdr_multithreading = $data->{export_cdr_multithreading} if exists $data->{export_cdr_multithreading}; + $export_cdr_numofthreads = _get_numofthreads($cpucount,$data,'export_cdr_numofthreads'); + $export_cdr_blocksize = $data->{export_cdr_blocksize} if exists $data->{export_cdr_blocksize}; + + $tabular_yml = $data->{tabular_yml} if exists $data->{tabular_yml}; + $graph_yml = $data->{graph_yml} if exists $data->{graph_yml}; + $graph_fields_mode = $data->{graph_fields_mode} if exists $data->{graph_fields_mode}; + if (not $graph_fields_mode or not contains($graph_fields_mode,\@graph_fields_modes)) { + configurationerror($configfile,'graph_fields_mode must be one of ' . join(', ', @graph_fields_modes)); + $result = 0; + } + $load_yml = $data->{load_yml} if exists $data->{load_yml}; + $tabular_single_row_txn = $data->{tabular_single_row_txn} if exists $data->{tabular_single_row_txn}; + $ignore_tabular_unique = $data->{ignore_tabular_unique} if exists $data->{ignore_tabular_unique}; + + #$cf_default_priority = $data->{cf_default_priority} if exists $data->{cf_default_priority}; + #$cf_default_timeout = $data->{cf_default_timeout} if exists $data->{cf_default_timeout}; + #$cft_default_ringtimeout = $data->{cft_default_ringtimeout} if exists $data->{cft_default_ringtimeout}; + + $csv_all_expected_fields = $data->{csv_all_expected_fields} if exists $data->{csv_all_expected_fields}; + + $mr = $data->{schema_version}; + if (not defined $mr or not contains($mr,\@supported_mr)) { + configurationerror($configfile,'schema_version must be one of ' . join(', ', @supported_mr)); + $result = 0; + } + + return $result; + } + return 0; + +} + +sub run_dao_method { + my $method_name = 'NGCP::BulkProcessor::Dao::' . $mr . '::' . shift; + load_module($method_name); + no strict 'refs'; + return $method_name->(@_); +} + +sub get_dao_var { + my $var_name = 'NGCP::BulkProcessor::Dao::' . $mr . '::' . shift; + load_module($var_name); + no strict 'refs'; + return @{$var_name} if wantarray; + return ${$var_name}; +} + +sub _prepare_working_paths { + + my ($create) = @_; + my $result = 1; + my $path_result; + + ($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + ($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + + return $result; + +} + +sub _get_numofthreads { + my ($default_value,$data,$key) = @_; + my $numofthreads = $default_value; + $numofthreads = $data->{$key} if exists $data->{$key}; + $numofthreads = $cpucount if $numofthreads > $cpucount; + return $numofthreads; +} + +sub get_export_filename { + my ($filename_format,$configfile) = @_; + my $export_filename; + my $export_format; + if ($filename_format) { + $export_filename = sprintf($filename_format,timestampdigits(),threadid()); + unless ($export_filename =~ /^\//) { + $export_filename = $output_path . $export_filename; + } + if (-e $export_filename and (unlink $export_filename) == 0) { + filewarn('cannot remove ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__)); + $export_filename = undef; + } + my ($name,$path,$suffix) = fileparse($export_filename,".json",".yml",".yaml",".xml",".php",".pl",".db",".csv"); + if ($suffix eq '.json') { + $export_format = $NGCP::BulkProcessor::Serialization::format_json; + } elsif ($suffix eq '.yml' or $suffix eq '.yaml') { + $export_format = $NGCP::BulkProcessor::Serialization::format_yaml; + } elsif ($suffix eq '.xml') { + $export_format = $NGCP::BulkProcessor::Serialization::format_xml; + } elsif ($suffix eq '.php') { + $export_format = $NGCP::BulkProcessor::Serialization::format_php; + } elsif ($suffix eq '.pl') { + $export_format = $NGCP::BulkProcessor::Serialization::format_perl; + } elsif ($suffix eq '.db') { + $export_format = 'sqlite'; + } elsif ($suffix eq '.csv') { + $export_format = 'csv'; + } else { + configurationerror($configfile,"$filename_format: either .json/.yaml/.xml/.php/.pl or .db/.csv export file format required"); + } + } + return ($export_filename,$export_format); +} + +sub write_export_file { + + my ($data,$export_filename,$export_format) = @_; + if (defined $export_filename) { + fileerror("invalid extension for output filename $export_filename",getlogger(__PACKAGE__)) + unless contains($export_format,\@NGCP::BulkProcessor::Serialization::formats); + # "concatenated json" https://en.wikipedia.org/wiki/JSON_streaming + my $str = ''; + if (ref $data eq 'ARRAY') { + foreach my $obj (@$data) { + #$str .= "\n" if length($str); + $str .= NGCP::BulkProcessor::Serialization::serialize($obj,$export_format); + } + } else { + $str = NGCP::BulkProcessor::Serialization::serialize($data,$export_format); + } + _write_file($str,$export_filename); + } + +} + +sub write_sql_file { + + my ($data,$export_filename,$stmt_format) = @_; + if (defined $export_filename and $stmt_format) { + my $str = ''; + if (ref $data eq 'ARRAY') { + foreach my $obj (@$data) { + $str .= "\n" if length($str); + if (ref $obj eq 'ARRAY') { + $str .= sprintf($stmt_format,@$obj); + } else { + $str .= sprintf($stmt_format,$str); + } + } + } else { + $str = sprintf($stmt_format,$data); + } + $str .= "\n"; + _write_file($str,$export_filename); + } + +} + +sub _write_file { + + my ($str,$export_filename) = @_; + if (defined $export_filename) { + lock $file_lock; + open(my $fh, '>>', $export_filename) or fileerror('cannot open file ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__)); + binmode($fh); + print $fh $str; + close $fh; + } + +} + +sub update_tabular_fields { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + eval { + $tabular_fields = $data; + }; + if ($@ or 'ARRAY' ne ref $tabular_fields) { + $tabular_fields //= []; + configurationerror($configfile,'invalid tabular fields',getlogger(__PACKAGE__)); + $result = 0; + } + + return $result; + } + return 0; + +} + +sub update_graph_fields { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + eval { + $graph_fields = $data; + }; + if ($@ or 'ARRAY' ne ref $graph_fields) { + $graph_fields //= []; + configurationerror($configfile,'invalid graph fields',getlogger(__PACKAGE__)); + $result = 0; + } + + return $result; + } + return 0; + +} + +sub update_load_recursive { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + + eval { + $load_recursive = $data; + }; + if ($@ or 'HASH' ne ref $load_recursive) { + undef $load_recursive; + configurationerror($configfile,'invalid load recursive def',getlogger(__PACKAGE__)); + $result = 0; + } + + return $result; + } + return 0; + +} + +sub _get_import_filename { + my ($old_value,$data,$key) = @_; + my $import_filename = $old_value; + $import_filename = $data->{$key} if exists $data->{$key}; + if (defined $import_filename and length($import_filename) > 0) { + $import_filename = $input_path . $import_filename unless -e $import_filename; + } + return $import_filename; +} + +sub check_dry { + + if ($dry) { + scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__)); + return 1; + } else { + scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__)); + if (!$force) { + if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) { + return 1; + } else { + return 0; + } + } else { + scriptinfo('force option applied',getlogger(__PACKAGE__)); + return 1; + } + } + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/config.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/config.cfg new file mode 100644 index 0000000..442b428 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/config.cfg @@ -0,0 +1,61 @@ +##general settings: +working_path = /var/sipwise +cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = db01 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = db01 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = db01 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = db01 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = db01 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/config.debug.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/config.debug.cfg new file mode 100644 index 0000000..200aede --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/config.debug.cfg @@ -0,0 +1,61 @@ +##general settings: +working_path = /home/rkrenn/temp/customer_exporter +cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = 192.168.0.178 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = 192.168.0.178 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = 192.168.0.178 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = 192.168.0.178 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = 192.168.0.178 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/graph.yml b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/graph.yml new file mode 100644 index 0000000..01f088a --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/graph.yml @@ -0,0 +1,5 @@ +# graph.yml: whitelist/blacklist of *contract* fields to export to .json/.yaml/.xml/... + +- id +- voip_subscribers*.provisioning_voip_subscriber.voip_usr_preferences*.attribute.attribute +- voip_subscribers*.provisioning_voip_subscriber.voip_usr_preferences*.value diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/load.yml b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/load.yml new file mode 100644 index 0000000..8c14467 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/load.yml @@ -0,0 +1,44 @@ +# load.yml: define which *contract* relations to fetch from db. + +#contracts.voip_subscribers: 1 +contracts.voip_subscribers: + include: !!perl/code | + { + my ($contract,$context) = @_; + #return 0 if $contract->{status} eq 'terminated'; + return 1; + } + + filter: !!perl/code | + { + my ($bill_subs,$context) = @_; + #_debug($context,"skipping terminated subscriber $bill_subs->{username}") if $bill_subs->{status} eq 'terminated'; + #return 0 if $bill_subs->{status} eq 'terminated'; + return 1; + } + + transform: !!perl/code | + { + my ($bill_subs,$context) = @_; + return $bill_subs; + } + +contracts.contact: 1 +contracts.voip_subscribers.primary_number: 1 +contracts.voip_subscribers.provisioning_voip_subscriber: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_dbaliases: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.attribute: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.allowed_ips: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.ncos: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.cf_mapping: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_usr_preferences.cf_mapping.destinations: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voicemail_users: 1 +#contracts.voip_subscribers.provisioning_voip_subscriber.voicemail_users.voicemail_spool: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_fax_preferences: 1 +contracts.voip_subscribers.provisioning_voip_subscriber.voip_fax_destinations: + transform: !!perl/code | + { + my ($fax_destinations,$context) = @_; + return [ map { $_->{destination} . ' (' . $_->{filetype} . ')'; } @$fax_destinations ]; + } \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/process.pl b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/process.pl new file mode 100644 index 0000000..aea2662 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/process.pl @@ -0,0 +1,319 @@ +use strict; + +## no critic + +our $VERSION = "0.0"; + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); + +use Getopt::Long qw(GetOptions); +use Fcntl qw(LOCK_EX LOCK_NB); + +use NGCP::BulkProcessor::Globals qw(); +use NGCP::BulkProcessor::Projects::ETL::CDR::Settings qw( + update_settings + update_tabular_fields + update_graph_fields + $tabular_yml + $graph_yml + + update_load_recursive + get_export_filename + $cdr_export_filename_format + $load_yml + + check_dry + $output_path + $defaultsettings + $defaultconfig + $dry + $skip_errors + $force +); + +use NGCP::BulkProcessor::Logging qw( + init_log + getlogger + $attachmentlogfile + scriptinfo + cleanuplogfiles + $currentlogfile +); +use NGCP::BulkProcessor::LogError qw ( + completion + done + scriptwarn + scripterror + filewarn + fileerror +); +use NGCP::BulkProcessor::LoadConfig qw( + load_config + $SIMPLE_CONFIG_TYPE + $YAML_CONFIG_TYPE + $ANY_CONFIG_TYPE +); +use NGCP::BulkProcessor::Array qw(removeduplicates); +use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); +use NGCP::BulkProcessor::Mail qw( + cleanupmsgfiles +); + +use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs); +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); + +use NGCP::BulkProcessor::Projects::ETL::CDR::ProjectConnectorPool qw(destroy_all_dbs get_csv_db get_sqlite_db); + +use NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular qw(); + +use NGCP::BulkProcessor::Projects::ETL::CDR::ExportCDR qw( + export_cdr_graph + export_cdr_tabular +); +#use NGCP::BulkProcessor::Projects::ETL::Cdr::ImportCdr qw( +# import_cdr_json +#); + +scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; + +my @TASK_OPTS = (); + +my $tasks = []; + +my $cleanup_task_opt = 'cleanup'; +push(@TASK_OPTS,$cleanup_task_opt); + +my $cleanup_all_task_opt = 'cleanup_all'; +push(@TASK_OPTS,$cleanup_all_task_opt); + +my $export_cdr_graph_task_opt = 'export_cdr_graph'; +push(@TASK_OPTS,$export_cdr_graph_task_opt); + +my $export_cdr_tabular_task_opt = 'export_cdr_tabular'; +push(@TASK_OPTS,$export_cdr_tabular_task_opt); + +#my $import_cdr_json_task_opt = 'import_cdr_json'; +#push(@TASK_OPTS,$import_cdr_json_task_opt); + +if (init()) { + main(); + exit(0); +} else { + exit(1); +} + +sub init { + + my $configfile = $defaultconfig; + my $settingsfile = $defaultsettings; + + return 0 unless GetOptions( + "config=s" => \$configfile, + "settings=s" => \$settingsfile, + "task=s" => $tasks, + "dry" => \$dry, + "skip-errors" => \$skip_errors, + "force" => \$force, + ); + + $tasks = removeduplicates($tasks,1); + + my $result = load_config($configfile); + init_log(); + $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); + $result &= load_config($tabular_yml,\&update_tabular_fields,$YAML_CONFIG_TYPE); + $result &= load_config($graph_yml,\&update_graph_fields,$YAML_CONFIG_TYPE); + $result &= load_config($load_yml,\&update_load_recursive,$YAML_CONFIG_TYPE); + return $result; + +} + +sub main() { + + my @messages = (); + my @attachmentfiles = (); + my $result = 1; + my $completion = 0; + + if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { + scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; + foreach my $task (@$tasks) { + + if (lc($cleanup_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result); + } elsif (lc($cleanup_all_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result); + + } elsif (lc($export_cdr_graph_task_opt) eq lc($task)) { + $result &= export_cdr_graph_task(\@messages) if taskinfo($export_cdr_graph_task_opt,$result); + $completion |= 1; + } elsif (lc($export_cdr_tabular_task_opt) eq lc($task)) { + $result &= export_cdr_tabular_task(\@messages) if taskinfo($export_cdr_tabular_task_opt,$result); + $completion |= 1; + #} elsif (lc($import_cdr_json_task_opt) eq lc($task)) { + # if (taskinfo($import_cdr_json_task_opt,$result,1)) { + # next unless check_dry(); + # $result &= import_cdr_json_task(\@messages); + # $completion |= 1; + # } + + } else { + $result = 0; + scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + last; + } + } + } else { + $result = 0; + scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + } + + push(@attachmentfiles,$attachmentlogfile); + if ($completion) { + completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } else { + done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } + + return $result; +} + +sub taskinfo { + my ($task,$result) = @_; + scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); + return $result; +} + +sub cleanup_task { + my ($messages,$clean_generated) = @_; + my $result = 0; + if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { + eval { + cleanupcvsdirs(); + cleanupdbfiles(); + cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); + cleanupmsgfiles(\&fileerror,\&filewarn); + cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + $result = 1; + }; + } + if ($@ or !$result) { + push(@$messages,'working directory cleanup INCOMPLETE'); + return 0; + } else { + push(@$messages,'working directory folders cleaned up'); + return 1; + } +} + +sub export_cdr_graph_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = export_cdr_graph(); + }; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + #$stats .= "\n total mta subscriber records: " . + # NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_ccacsn() . ' rows'; + #my $added_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::added_delta + #); + #$stats .= "\n new: $added_count rows"; + #my $existing_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::updated_delta + #); + #$stats .= "\n existing: $existing_count rows"; + #my $deleted_count = NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::countby_delta( + # $NGCP::BulkProcessor::Projects::Migration::UPCAT::Dao::import::MtaSubscriber::deleted_delta + #); + #$stats .= "\n removed: $deleted_count rows"; + }; + if ($err or !$result) { + push(@$messages,"exporting cdr (graph) INCOMPLETE$stats"); + } else { + push(@$messages,"exporting cdr (graph) completed$stats"); + } + destroy_all_dbs(); + return $result; + +} + +sub export_cdr_tabular_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = export_cdr_tabular(); + }; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + $stats .= "\n total subscriber records: " . + NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::countby_delta() . ' rows'; + my $added_count = NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::countby_delta( + $NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::added_delta + ); + $stats .= "\n new: $added_count rows"; + my $existing_count = NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::countby_delta( + $NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::updated_delta + ); + $stats .= "\n existing: $existing_count rows"; + my $deleted_count = NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::countby_delta( + $NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::deleted_delta + ); + $stats .= "\n removed: $deleted_count rows"; + my ($export_filename,$export_format) = get_export_filename($cdr_export_filename_format); + if ('sqlite' eq $export_format) { + &get_sqlite_db()->copydbfile($export_filename); + } elsif ('csv' eq $export_format) { + NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::copy_table(\&get_csv_db); + &get_csv_db()->copytablefile(NGCP::BulkProcessor::Projects::ETL::CDR::Dao::Tabular::gettablename(),$export_filename); + } else { + push(@$messages,'invalid extension for output filename $export_filename'); + } + }; + if ($err or !$result) { + push(@$messages,"exporting cdr (tabular) INCOMPLETE$stats"); + } else { + push(@$messages,"exporting cdr (tabular) completed$stats"); + } + destroy_all_dbs(); + return $result; + +} + +#sub import_cdr_json_task { +# +# my ($messages) = @_; +# my ($result,$warning_count,$contract_read_count,$subscriber_read_count,$contract_created_count,$subscriber_created_count,$contract_failed_count,$subscriber_failed_count) = (0,0,0,0,0,0,0,0); +# eval { +# ($result,$warning_count,$contract_read_count,$subscriber_read_count,$contract_created_count,$subscriber_created_count,$contract_failed_count,$subscriber_failed_count) = import_cdr_json(); +# }; +# my $err = $@; +# my $stats = ": $warning_count warnings"; +# eval { +# $stats .= "\n contracts read: " . $contract_read_count; +# $stats .= "\n contracts created: " . $contract_created_count; +# $stats .= "\n contracts failed: " . $contract_failed_count; +# $stats .= "\n subscribers read: " . $subscriber_read_count; +# $stats .= "\n subscribers created: " . $subscriber_created_count; +# $stats .= "\n subscribers failed: " . $subscriber_failed_count; +# }; +# if ($err or !$result) { +# push(@$messages,"importing cdr (json) INCOMPLETE$stats"); +# } else { +# push(@$messages,"importing cdr (json) completed$stats"); +# } +# destroy_all_dbs(); +# return $result; +# +#} + +__DATA__ +This exists to allow the locking code at the beginning of the file to work. +DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/settings.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/settings.cfg new file mode 100644 index 0000000..1f381df --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/settings.cfg @@ -0,0 +1,58 @@ +#dry=0 +#skip_errors=0 + +schema_version = Trunk + +export_cdr_multithreading = 1 +export_cdr_numofthreads = 4 +export_cdr_blocksize = 1000 + +cdr_export_filename=cdr_%s.csv + +load_yml = load.yml +tabular_yml = tabular.yml +graph_yml = graph.yml +graph_fields_mode = whitelist + +csv_all_expected_fields = 0 + +sqlite_db_file = sqlite +csv_dir = cdr +tabular_single_row_txn = 1 +ignore_tabular_unique = 0 + +#cdr_import_filename=cdr_20210216173615.json +#split_cdr = 1 +#cdr_import_multithreading = 1 +#cdr_import_numofthreads = 4 +#cdr_reseller_name = default +#cdr_billing_profile_name = Default Billing Profile +#cdr_domain = test1610072315.example.org +#cdr_contact_email_format = DN0%2$s%3$s@example.org +#cdr_timezone = Europe/Vienna +#subscriber_profile_set_name = subscriber_profile_1_set_65261 +#subscriber_profile_name = subscriber_profile_1_65261 +## sip username as webusername: +##webusername_format = %1$s +## webusername = cc+ac+sn: +##webusername_format = %2$s%3$s%4$s +## webusername = 0+ac+sn: +#webusername_format = 0%3$s%4$s +## sip username as external_id: +##subscriber_externalid_format = %1$s +## external_id = cc+ac+sn: +##subscriber_externalid_format = %2$s%3$s%4$s +## external_id = 0+ac+sn: +#subscriber_externalid_format = 0%3$s%4$s +## subscriber contact will be created, only if one of below is set. +#subscriber_contact_email_format = DN0%2$s%3$s@domain.org +#subscriber_timezone = Europe/Vienna + +#cf_default_priority: 1 +#cf_default_timeout: 300 +#cft_default_ringtimeout: 20 + +##write sql files for legacy db to set/unset the is_external pref of migrated subscribers: +# +#rollback_sql_export_filename_format = delete_subscribers_%s.sql +#rollback_sql_stmt_format = start transaction;call billing.remove_subscriber("%1$s",%2$s);commit; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/CDR/tabular.yml b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/tabular.yml new file mode 100644 index 0000000..7ea0796 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/CDR/tabular.yml @@ -0,0 +1,70 @@ +# tabular.yml: define which *subscriber* columns to add tabular (.db/.csv) exports. + +- path: contract.id + transform: !!perl/code | + { + my ($id,$bill_subs) = @_; + return $id; + } + +- path: primary_number.cc +- path: primary_number.ac +- path: primary_number.sn +- path: provisioning_voip_subscriber.voicemail_users[0].attach +- path: provisioning_voip_subscriber.voicemail_users[0].delete +- path: provisioning_voip_subscriber.voicemail_users[0].email +- path: provisioning_voip_subscriber.voicemail_users[0].password +- path: provisioning_voip_subscriber.voip_usr_preferences.allowed_clis + sep: ',' + field: 'value' +- path: provisioning_voip_subscriber.voip_usr_preferences.allowed_ips_grp[0].allowed_ips + sep: ',' + field: 'ipnet' +- path: provisioning_voip_subscriber.voip_usr_preferences.block_out_list + sep: ',' + field: 'value' +- path: provisioning_voip_subscriber.voip_usr_preferences.block_out_mode[0].value +- path: provisioning_voip_subscriber.voip_usr_preferences.block_in_list + sep: ',' + field: 'value' +- path: provisioning_voip_subscriber.voip_usr_preferences.block_in_mode[0].value +- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_in_list + sep: ',' + field: 'value' +- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_in_mode[0].value +- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_out_list + sep: ',' + field: 'value' +- path: provisioning_voip_subscriber.voip_usr_preferences.adm_block_out_mode[0].value +- path: provisioning_voip_subscriber.voip_usr_preferences.ncos_id[0].ncos.level +- path: provisioning_voip_subscriber.voip_usr_preferences.adm_ncos_id[0].ncos.level +- path: provisioning_voip_subscriber.voip_usr_preferences.cfb[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cfna[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cfo[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cfr[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cfs[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cft[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_usr_preferences.cfu[0].cf_mapping.destinations + sep: ',' + field: 'destination' +- path: provisioning_voip_subscriber.voip_fax_preferences.active +- path: provisioning_voip_subscriber.voip_fax_preferences.ecm +- path: provisioning_voip_subscriber.voip_fax_preferences.name +- path: provisioning_voip_subscriber.voip_fax_preferences.t38 +- path: provisioning_voip_subscriber.voip_fax_destinations + sep: ',' +- path: provisioning_voip_subscriber.voip_usr_preferences.force_inbound_calls_to_peer[0].value +- path: provisioning_voip_subscriber.voip_usr_preferences.lnp_for_local_sub[0].value + diff --git a/lib/NGCP/BulkProcessor/SqlProcessor.pm b/lib/NGCP/BulkProcessor/SqlProcessor.pm index 3542f96..8553591 100644 --- a/lib/NGCP/BulkProcessor/SqlProcessor.pm +++ b/lib/NGCP/BulkProcessor/SqlProcessor.pm @@ -813,7 +813,7 @@ sub transfer_table { if (length($select) > 0) { $selectstatement = $select; } else { - $selectstatement = 'SELECT ' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames) . ' FROM ' . $db->tableidentifier($tablename) + $selectstatement = 'SELECT ' . join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @fieldnames) . ' FROM ' . $db->tableidentifier($tablename); } my $insertstatement = 'INSERT INTO ' . $target_db->tableidentifier($targettablename) . ' (' . join(', ',map { local $_ = $_; $_ = $target_db->columnidentifier($_); $_; } @fieldnames) . ') VALUES (' . $valueplaceholders . ')';