From d812c82aa65c289d3489b08fc5273a9599dcec49 Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Thu, 13 Jan 2022 13:30:30 +0100 Subject: [PATCH] TT#155901 LGI: exporter for EDR transition data Change-Id: Ie3b6aef065d236c1d80589b3efc8c0545dea4d6a --- .../Dao/Trunk/accounting/events.pm | 287 ++++++++++++++++++ .../Projects/ETL/EDR/Dao/PeriodEvents.pm | 157 ++++++++++ .../Projects/ETL/EDR/ExportEvents.pm | 271 +++++++++++++++++ .../Projects/ETL/EDR/ProjectConnectorPool.pm | 120 ++++++++ .../Projects/ETL/EDR/Settings.pm | 235 ++++++++++++++ .../BulkProcessor/Projects/ETL/EDR/config.cfg | 61 ++++ .../Projects/ETL/EDR/config.debug.cfg | 61 ++++ .../BulkProcessor/Projects/ETL/EDR/process.pl | 214 +++++++++++++ .../Projects/ETL/EDR/settings.cfg | 21 ++ 9 files changed, 1427 insertions(+) create mode 100644 lib/NGCP/BulkProcessor/Dao/Trunk/accounting/events.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/EDR/Dao/PeriodEvents.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/EDR/ExportEvents.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/EDR/ProjectConnectorPool.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/EDR/Settings.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/EDR/config.cfg create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/EDR/config.debug.cfg create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/EDR/process.pl create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/EDR/settings.cfg diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/events.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/events.pm new file mode 100644 index 0000000..de89abb --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/events.pm @@ -0,0 +1,287 @@ +package NGCP::BulkProcessor::Dao::Trunk::accounting::events; +use strict; + +#use Tie::IxHash; +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowsdeleted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_accounting_db + destroy_dbs +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + + copy_row + + process_table +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + update_row + insert_row + + process_subscribers + process_events + findby_subscriberid +); + +my $tablename = 'events'; +my $get_db = \&get_accounting_db; + +my $expected_fieldnames = [ + 'id', + 'type', + 'subscriber_id', + 'reseller_id', + 'old_status', + 'new_status', + 'timestamp', + 'export_status', + 'exported_at', +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub findby_subscriberid { + + my ($xa_db,$subscriber_id,$joins,$conditions,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my @conditions = @{$conditions // []}; + push(@conditions,{ $table . '.subscriber_id' => { '=' => '?' } }); + my $stmt = 'SELECT ' . join(',', map { $table . '.' . $db->columnidentifier($_); } @$expected_fieldnames) . ' ' . + _get_export_stmt($db,$joins,\@conditions) . + ' ORDER BY ' . $table . '.id ASC'; + my @params = ($subscriber_id); + my $rows = $xa_db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub process_subscribers { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $blocksize, + $joins, + $conditions, + #$sort, + $limit) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + blocksize + joins + conditions + limit + /}; + #sort + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $select_stmt; + my $count_stmt; + my $select_format = 'SELECT ' . $table . '.' . $db->columnidentifier('subscriber_id') . ' %s GROUP BY ' . $table . '.' . $db->columnidentifier('subscriber_id'); + my $count_format = 'SELECT COUNT(1) FROM (%s) AS __cnt'; + + $select_stmt = sprintf($select_format,_get_export_stmt_part($db,$joins,$conditions)); + $count_stmt = sprintf($count_format,$db->paginate_sort_query('SELECT 1 ' . _get_export_stmt_part($db,$joins,$conditions),0,$limit,undef)); + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,$rowblock,$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_dbs, + multithreading => $multithreading, + tableprocessing_threads => $numofthreads, + blocksize => $blocksize, + select => $select_stmt, + selectcount => $count_stmt, + ); +} + +sub process_events { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $blocksize, + $joins, + $conditions, + $load_recursive, + $limit) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + blocksize + joins + conditions + load_recursive + limit + /}; + #sort + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $select_stmt; + my $count_stmt; + my $select_format = 'SELECT * %s'; + my $count_format = 'SELECT COUNT(1) FROM (%s) AS __cnt'; + + $select_stmt = $db->paginate_sort_query(sprintf($select_format,_get_export_stmt_part($db,$joins,$conditions)),undef,undef,[ + { numeric => 1, + dir => 1, #-1, + memberchain => [ 'id' ], + } + ]); + $count_stmt = sprintf($count_format,$db->paginate_sort_query('SELECT 1 ' . _get_export_stmt_part($db,$joins,$conditions),0,$limit,undef)); + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,buildrecords_fromrows($rowblock,$load_recursive),$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_dbs, + multithreading => $multithreading, + tableprocessing_threads => $numofthreads, + blocksize => $blocksize, + select => $select_stmt, + selectcount => $count_stmt, + ); +} + +sub _get_export_stmt { + + my ($db,$joins,$conditions) = @_; + return _get_export_stmt_part($db,undef,$joins,$conditions); + +} + +sub _get_export_stmt_part { + + my ($db,$static_context,$joins,$conditions) = @_; + + my $table = $db->tableidentifier($tablename); + + my $stmt = "FROM " . $table; + my @intjoins = (); + if (defined $joins and (scalar @$joins) > 0) { + foreach my $f (@$joins) { + my ($table, $keys) = %{ $f }; + my ($foreign_key, $own_key) = %{ $keys }; + push @intjoins, "LEFT JOIN $table ON $foreign_key = $own_key"; + } + } + + my @conds = (); + $stmt .= " " . join(" ", @intjoins) if (scalar @intjoins) > 0; + if (defined $conditions and (scalar @$conditions) > 0) { + foreach my $f (@$conditions) { + my ($field, $match) = %{ $f }; + my ($op, $val) = %{ $match }; + push @conds, "$field $op $val"; + } + } + $stmt .= " WHERE " . join(" AND ", @conds) if (scalar @conds) > 0; + return $stmt; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/EDR/Dao/PeriodEvents.pm b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/Dao/PeriodEvents.pm new file mode 100644 index 0000000..3edaaca --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/Dao/PeriodEvents.pm @@ -0,0 +1,157 @@ +package NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents; +use strict; + +## no critic + +use NGCP::BulkProcessor::Projects::ETL::EDR::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs +); + +use NGCP::BulkProcessor::SqlProcessor qw( + registertableinfo + create_targettable + checktableinfo + copy_row + insert_stmt + transfer_table +); + +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + create_table + gettablename + check_table + getinsertstatement + + copy_table +); + +my $tablename = 'period_events'; +my $get_db = \&get_sqlite_db; + +my $fieldnames; +my $expected_fieldnames = [ + 'subscriber_id', + 'profile_id', + 'start_profile', + 'update_profile', + 'stop_profile', +]; + +my $primarykey_fieldnames = []; +my $indexes = { + $tablename . '_suscriber_id' => [ 'subscriber_id(11)' ], +}; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub create_table { + + my ($truncate) = @_; + + my $db = &$get_db(); + + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,1,undef); + +} + +sub findby_domainusername { + + my ($domain,$username,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless (defined $domain and defined $username); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . $table . + ' WHERE ' . $db->columnidentifier('domain') . ' = ?' . + ' AND ' . $db->columnidentifier('username') . ' = ?' + , $domain, $username); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub copy_table { + + my ($get_target_db) = @_; + + check_table(); + #checktableinfo($get_target_db, + # __PACKAGE__,$tablename, + # get_fieldnames(1), + # $indexes); + + return transfer_table( + get_db => $get_db, + class => __PACKAGE__, + get_target_db => $get_target_db, + targetclass => __PACKAGE__, + targettablename => $tablename, + ); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub getinsertstatement { + + my ($insert_ignore) = @_; + check_table(); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/EDR/ExportEvents.pm b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/ExportEvents.pm new file mode 100644 index 0000000..a4b17a8 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/ExportEvents.pm @@ -0,0 +1,271 @@ +package NGCP::BulkProcessor::Projects::ETL::EDR::ExportEvents; +use strict; + +## no critic + +use threads::shared qw(); + +use Tie::IxHash; + +#use NGCP::BulkProcessor::Serialization qw(); +#use Scalar::Util qw(blessed); +#use MIME::Base64 qw(encode_base64); + +use NGCP::BulkProcessor::Projects::ETL::EDR::Settings qw( + $dry + $skip_errors + + $export_subscriber_profiles_multithreading + $export_subscriber_profiles_numofthreads + $export_subscriber_profiles_blocksize + $export_subscriber_profiles_joins + $export_subscriber_profiles_conditions + $export_subscriber_profiles_limit + + $period_events_single_row_txn + $ignore_period_events_unique + +); + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn + fileerror +); + +use NGCP::BulkProcessor::Dao::Trunk::accounting::events qw(); + +use NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents qw(); + +use NGCP::BulkProcessor::Projects::ETL::EDR::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs + ping_all_dbs +); + +use NGCP::BulkProcessor::Utils qw(create_uuid threadid timestamp stringtobool trim); #check_ipnet +use NGCP::BulkProcessor::DSSorter qw(sort_by_configs); +use NGCP::BulkProcessor::Array qw(contains); +use NGCP::BulkProcessor::Calendar qw(from_epoch datetime_to_string); +#use NGCP::BulkProcessor::DSPath qw(); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + export_subscriber_profiles +); + +sub export_subscriber_profiles { + + my $result = NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents::create_table(1); + + my $static_context = {}; + + destroy_all_dbs(); + my $warning_count :shared = 0; + return ($result && NGCP::BulkProcessor::Dao::Trunk::accounting::events::process_subscribers( + static_context => $static_context, + process_code => sub { + my ($context,$records,$row_offset) = @_; + ping_all_dbs(); + my @period_event_rows = (); + foreach my $subscriber_id (map { $_->[0]; } @$records) { + if ($subscriber_id == 202) { + my $x=1; + print "blah"; + } + next unless _export_subscriber_profiles_init_context($context,$subscriber_id); + push(@period_event_rows, _get_period_event_rows($context)); + + + + if ($period_events_single_row_txn and (scalar @period_event_rows) > 0) { + while (defined (my $period_event_row = shift @period_event_rows)) { + if ($skip_errors) { + eval { _insert_period_events_rows($context,[$period_event_row]); }; + _warn($context,$@) if $@; + } else { + _insert_period_events_rows($context,[$period_event_row]); + } + } + } + } + + if (not $period_events_single_row_txn and (scalar @period_event_rows) > 0) { + if ($skip_errors) { + eval { insert_period_events_rows($context,\@period_event_rows); }; + _warn($context,$@) if $@; + } else { + insert_period_events_rows($context,\@period_event_rows); + } + } + + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_sqlite_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + destroy_reader_dbs_code => \&destroy_all_dbs, + blocksize => $export_subscriber_profiles_blocksize, + multithreading => $export_subscriber_profiles_multithreading, + numofthreads => $export_subscriber_profiles_numofthreads, + joins => $export_subscriber_profiles_joins, + conditions => $export_subscriber_profiles_conditions, + #sort => [{ column => 'id', numeric => 1, dir => 1 }], + #limit => $export_subscriber_profiles_limit, + ),$warning_count,); + +} + +sub _export_subscriber_profiles_init_context { + + my ($context,$subscriber_id) = @_; + + my $result = 1; + + $context->{events} = NGCP::BulkProcessor::Dao::Trunk::accounting::events::findby_subscriberid( + undef,$subscriber_id,$export_subscriber_profiles_joins,$export_subscriber_profiles_conditions); + + $context->{subscriber_id} = $subscriber_id; + + return $result; + +} + +sub _get_period_event_rows { + + my ($context) = @_; + + my $profile_events = { + start => undef, + update => [], + stop => undef, + }; + my $last_event; + + my %subscriber_profiles = (); + tie(%subscriber_profiles, 'Tie::IxHash'); + + foreach my $event (@{sort_by_configs([ grep { contains($_->{type},[ qw(start_profile update_profile end_profile) ]); } @{$context->{events}} ],[ + { numeric => 1, + dir => 1, #-1, + memberchain => [ 'id' ], + } + ])}) { + if ($event->{type} eq 'start_profile') { + if (not defined $last_event or $last_event->{type} eq 'end_profile') { + $profile_events->{start} = $event; + $last_event = $event; + $subscriber_profiles{$event->{new_status}} = $profile_events; + } else { + + } + } elsif ($event->{type} eq 'update_profile') { + if (defined $last_event and contains($last_event->{type},[ qw(start_profile update_profile) ])) { + push(@{$profile_events->{update}},$event); + $last_event = $event; + } else { + + } + } elsif ($event->{type} eq 'end_profile') { + if (defined $last_event and contains($last_event->{type},[ qw(start_profile update_profile) ])) { + $profile_events->{stop} = $event; + $last_event = $event; + $profile_events = { + start => undef, + update => [], + stop => undef, + }; + } else { + + } + } + } + + my @period_event_rows = (); + foreach my $profile_id (keys %subscriber_profiles) { + $profile_events = $subscriber_profiles{$profile_id}; + push(@period_event_rows,[ + $context->{subscriber_id}, + $profile_id, + datetime_to_string(from_epoch($profile_events->{start}->{timestamp})), + join(",",map { datetime_to_string(from_epoch($_)); } @{$profile_events->{update}}), + (defined $profile_events->{stop} ? datetime_to_string(from_epoch($profile_events->{stop}->{timestamp})) : undef), + ]); + } + + return @period_event_rows; + +} + +sub _insert_period_events_rows { + my ($context,$subscriber_rows) = @_; + $context->{db}->db_do_begin( + NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents::getinsertstatement($ignore_period_events_unique), + ); + eval { + $context->{db}->db_do_rowblock($subscriber_rows); + $context->{db}->db_finish(); + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_finish(1); + }; + die($err); + } + +} + + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + rowprocessingerror($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + rowprocessingwarn($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + } +} + +sub _debug { + + my ($context,$message,$debug) = @_; + processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/EDR/ProjectConnectorPool.pm b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/ProjectConnectorPool.pm new file mode 100644 index 0000000..987d7d3 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/ProjectConnectorPool.pm @@ -0,0 +1,120 @@ +package NGCP::BulkProcessor::Projects::ETL::EDR::ProjectConnectorPool; +use strict; + +## no critic + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../'); + +use NGCP::BulkProcessor::Projects::ETL::EDR::Settings qw( + $csv_dir + $sqlite_db_file +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_connectorinstancename +); + +use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(); +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode); + +use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + + get_sqlite_db + sqlite_db_tableidentifier + + get_csv_db + csv_db_tableidentifier + + destroy_dbs + destroy_all_dbs + ping_all_dbs + +); + +my $sqlite_dbs = {}; +my $csv_dbs = {}; + +sub get_sqlite_db { + + my ($instance_name,$reconnect) = @_; + my $name = get_connectorinstancename($instance_name); + + if (not defined $sqlite_dbs->{$name}) { + $sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); + if (not defined $reconnect) { + $reconnect = 1; + } + } + if ($reconnect) { + $sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file); + } + + return $sqlite_dbs->{$name}; + +} + +sub sqlite_db_tableidentifier { + + my ($get_target_db,$tablename) = @_; + my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; + return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file)); + +} + +sub get_csv_db { + + my ($instance_name,$reconnect) = @_; + my $name = get_connectorinstancename($instance_name); + if (not defined $csv_dbs->{$name}) { + $csv_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::CSVDB->new($instance_name); + if (not defined $reconnect) { + $reconnect = 1; + } + } + if ($reconnect) { + $csv_dbs->{$name}->db_connect($csv_dir); + } + return $csv_dbs->{$name}; + +} + +sub csv_db_tableidentifier { + + my ($get_target_db,$tablename) = @_; + my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; + return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::CSVDB::get_tableidentifier($tablename,$csv_dir)); + +} + +sub destroy_dbs { + + foreach my $name (keys %$sqlite_dbs) { + cleartableinfo($sqlite_dbs->{$name}); + undef $sqlite_dbs->{$name}; + delete $sqlite_dbs->{$name}; + } + + foreach my $name (keys %$csv_dbs) { + cleartableinfo($csv_dbs->{$name}); + undef $csv_dbs->{$name}; + delete $csv_dbs->{$name}; + } + +} + +sub destroy_all_dbs() { + destroy_dbs(); + NGCP::BulkProcessor::ConnectorPool::destroy_dbs(); +} + +sub ping_all_dbs() { + NGCP::BulkProcessor::ConnectorPool::ping_dbs(); +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/EDR/Settings.pm b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/Settings.pm new file mode 100644 index 0000000..809020e --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/Settings.pm @@ -0,0 +1,235 @@ +package NGCP::BulkProcessor::Projects::ETL::EDR::Settings; +use strict; + +## no critic + +use File::Basename qw(fileparse); + +use NGCP::BulkProcessor::Globals qw( + $working_path + $enablemultithreading + $cpucount + create_path +); + +use NGCP::BulkProcessor::Logging qw( + getlogger + scriptinfo + configurationinfo +); + +use NGCP::BulkProcessor::LogError qw( + fileerror + filewarn + configurationwarn + configurationerror +); + +use NGCP::BulkProcessor::LoadConfig qw( + split_tuple + parse_regexp +); + +use NGCP::BulkProcessor::Utils qw(prompt timestampdigits threadid load_module); + +use NGCP::BulkProcessor::Array qw(contains); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + update_settings + + get_export_filename + + $ignore_period_events_unique + $period_events_single_row_txn + + $sqlite_db_file + $csv_dir + + check_dry + + $output_path + $input_path + + $subscriber_profiles_export_filename_format + + $defaultsettings + $defaultconfig + + $dry + $skip_errors + $force + + $export_subscriber_profiles_multithreading + $export_subscriber_profiles_numofthreads + $export_subscriber_profiles_blocksize + + $export_subscriber_profiles_joins + $export_subscriber_profiles_conditions + $export_subscriber_profiles_limit + +); + +our $defaultconfig = 'config.cfg'; +our $defaultsettings = 'settings.cfg'; + +our $ignore_period_events_unique = 0; +our $period_events_single_row_txn = 1; + +our $output_path = $working_path . 'output/'; +our $input_path = $working_path . 'input/'; +our $csv_dir = 'events'; + +our $subscriber_profiles_export_filename_format = undef; + +our $force = 0; +our $dry = 0; +our $skip_errors = 0; + +our $sqlite_db_file = 'sqlite'; + +our $export_subscriber_profiles_multithreading = $enablemultithreading; +our $export_subscriber_profiles_numofthreads = $cpucount; +our $export_subscriber_profiles_blocksize = 1000; + +our $export_subscriber_profiles_joins = []; +our $export_subscriber_profiles_conditions = []; +our $export_subscriber_profiles_limit = undef; + +sub update_settings { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + my $regexp_result; + + #&$configurationinfocode("testinfomessage",$configlogger); + + $result &= _prepare_working_paths(1); + + $subscriber_profiles_export_filename_format = $data->{subscriber_profiles_export_filename} if exists $data->{subscriber_profiles_export_filename}; + get_export_filename($data->{subscriber_profiles_export_filename},$configfile); + + $sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file}; + $csv_dir = $data->{csv_dir} if exists $data->{csv_dir}; + + $dry = $data->{dry} if exists $data->{dry}; + $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; + + my $parse_result; + ($parse_result,$export_subscriber_profiles_joins) = _parse_export_joins($data->{export_subscriber_profiles_joins},$configfile); + $result &= $parse_result; + ($parse_result,$export_subscriber_profiles_conditions) = _parse_export_conditions($data->{export_subscriber_profiles_conditions},$configfile); + $result &= $parse_result; + + $export_subscriber_profiles_limit = $data->{export_subscriber_profiles_limit} if exists $data->{export_subscriber_profiles_limit}; + + $export_subscriber_profiles_multithreading = $data->{export_subscriber_profiles_multithreading} if exists $data->{export_subscriber_profiles_multithreading}; + $export_subscriber_profiles_numofthreads = _get_numofthreads($cpucount,$data,'export_subscriber_profiles_numofthreads'); + $export_subscriber_profiles_blocksize = $data->{export_subscriber_profiles_blocksize} if exists $data->{export_subscriber_profiles_blocksize}; + + $period_events_single_row_txn = $data->{period_events_single_row_txn} if exists $data->{period_events_single_row_txn}; + $ignore_period_events_unique = $data->{ignore_period_events_unique} if exists $data->{ignore_period_events_unique}; + + return $result; + } + return 0; + +} + +sub _prepare_working_paths { + + my ($create) = @_; + my $result = 1; + my $path_result; + + ($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + ($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + + return $result; + +} + +sub _get_numofthreads { + my ($default_value,$data,$key) = @_; + my $numofthreads = $default_value; + $numofthreads = $data->{$key} if exists $data->{$key}; + $numofthreads = $cpucount if $numofthreads > $cpucount; + return $numofthreads; +} + +sub get_export_filename { + my ($filename_format,$configfile) = @_; + my $export_filename; + my $export_format; + if ($filename_format) { + $export_filename = sprintf($filename_format,timestampdigits(),threadid()); + unless ($export_filename =~ /^\//) { + $export_filename = $output_path . $export_filename; + } + if (-e $export_filename and (unlink $export_filename) == 0) { + filewarn('cannot remove ' . $export_filename . ': ' . $!,getlogger(__PACKAGE__)); + $export_filename = undef; + } + my ($name,$path,$suffix) = fileparse($export_filename,".csv"); + if ($suffix eq '.csv') { + $export_format = 'csv'; + } else { + configurationerror($configfile,"$filename_format: .csv export file format required"); + } + } + return ($export_filename,$export_format); +} + +sub _parse_export_joins { + my ($token,$file) = @_; + my @joins = (); + if (defined $token and length($token) > 0) { + foreach my $f (_split(\$token)) { + next unless($f); + $f =~ s/^\s*\{?\s*//; + $f =~ s/\}\s*\}\s*$/}/; + my ($a, $b) = split(/\s*=>\s*{\s*/, $f); + $a =~ s/^\s*\'//; + $a =~ s/\'$//g; + $b =~ s/\s*\}\s*$//; + my ($c, $d) = split(/\s*=>\s*/, $b); + $c =~ s/^\s*\'//g; + $c =~ s/\'\s*//; + $d =~ s/^\s*\'//g; + $d =~ s/\'\s*//; + push @joins, { $a => { $c => $d } }; + } + } + return (1,\@joins); +} + +sub _parse_export_conditions { + my ($token,$file) = @_; + my @conditions = (); + if (defined $token and length($token) > 0) { + foreach my $f (_split(\$token)) { + next unless($f); + $f =~ s/^\s*\{?\s*//; + $f =~ s/\}\s*\}\s*$/}/; + my ($a, $b) = split(/\s*=>\s*{\s*/, $f); + $a =~ s/^\s*\'//; + $a =~ s/\'$//g; + $b =~ s/\s*\}\s*$//; + my ($c, $d) = split(/\s*=>\s*/, $b); + $c =~ s/^\s*\'//g; + $c =~ s/\'\s*//; + $d =~ s/^\s*\'//g; + $d =~ s/\'\s*//; + push @conditions, { $a => { $c => $d } }; + } + } + return (1,\@conditions); +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/EDR/config.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/config.cfg new file mode 100644 index 0000000..442b428 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/config.cfg @@ -0,0 +1,61 @@ +##general settings: +working_path = /var/sipwise +cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = db01 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = db01 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = db01 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = db01 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = db01 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/EDR/config.debug.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/config.debug.cfg new file mode 100644 index 0000000..504dc89 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/config.debug.cfg @@ -0,0 +1,61 @@ +##general settings: +working_path = /home/rkrenn/temp/customer_exporter +cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = 192.168.0.96 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = 192.168.0.96 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = 192.168.0.96 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = 192.168.0.96 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = 192.168.0.96 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/EDR/process.pl b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/process.pl new file mode 100644 index 0000000..21c59da --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/process.pl @@ -0,0 +1,214 @@ +use strict; + +## no critic + +our $VERSION = "0.0"; + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); + +use Getopt::Long qw(GetOptions); +use Fcntl qw(LOCK_EX LOCK_NB); + +use NGCP::BulkProcessor::Globals qw(); +use NGCP::BulkProcessor::Projects::ETL::EDR::Settings qw( + update_settings + + get_export_filename + $subscriber_profiles_export_filename_format + + check_dry + $output_path + $defaultsettings + $defaultconfig + $dry + $skip_errors + $force +); + +use NGCP::BulkProcessor::Logging qw( + init_log + getlogger + $attachmentlogfile + scriptinfo + cleanuplogfiles + $currentlogfile +); +use NGCP::BulkProcessor::LogError qw ( + completion + done + scriptwarn + scripterror + filewarn + fileerror +); +use NGCP::BulkProcessor::LoadConfig qw( + load_config + $SIMPLE_CONFIG_TYPE + $YAML_CONFIG_TYPE + $ANY_CONFIG_TYPE +); +use NGCP::BulkProcessor::Array qw(removeduplicates); +use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); +use NGCP::BulkProcessor::Mail qw( + cleanupmsgfiles +); + +use NGCP::BulkProcessor::SqlConnectors::CSVDB qw(cleanupcvsdirs); +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); + +use NGCP::BulkProcessor::Projects::ETL::EDR::ProjectConnectorPool qw(destroy_all_dbs get_csv_db get_sqlite_db); + +use NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents qw(); + +use NGCP::BulkProcessor::Projects::ETL::EDR::ExportEvents qw( + export_subscriber_profiles +); + +scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; + +my @TASK_OPTS = (); + +my $tasks = []; + +my $cleanup_task_opt = 'cleanup'; +push(@TASK_OPTS,$cleanup_task_opt); + +my $cleanup_all_task_opt = 'cleanup_all'; +push(@TASK_OPTS,$cleanup_all_task_opt); + +my $export_subscriber_profiles_task_opt = 'export_subscriber_profiles'; +push(@TASK_OPTS,$export_subscriber_profiles_task_opt); + +if (init()) { + main(); + exit(0); +} else { + exit(1); +} + +sub init { + + my $configfile = $defaultconfig; + my $settingsfile = $defaultsettings; + + return 0 unless GetOptions( + "config=s" => \$configfile, + "settings=s" => \$settingsfile, + "task=s" => $tasks, + "skip-errors" => \$skip_errors, + "force" => \$force, + ); + + $tasks = removeduplicates($tasks,1); + + my $result = load_config($configfile); + init_log(); + $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); + return $result; + +} + +sub main() { + + my @messages = (); + my @attachmentfiles = (); + my $result = 1; + my $completion = 0; + + if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { + scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; + foreach my $task (@$tasks) { + + if (lc($cleanup_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result); + } elsif (lc($cleanup_all_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result); + + } elsif (lc($export_subscriber_profiles_task_opt) eq lc($task)) { + $result &= export_subscriber_profiles_task(\@messages) if taskinfo($export_subscriber_profiles_task_opt,$result); + $completion |= 1; + + } else { + $result = 0; + scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + last; + } + } + } else { + $result = 0; + scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + } + + push(@attachmentfiles,$attachmentlogfile); + if ($completion) { + completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } else { + done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } + + return $result; +} + +sub taskinfo { + my ($task,$result) = @_; + scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); + return $result; +} + +sub cleanup_task { + my ($messages,$clean_generated) = @_; + my $result = 0; + if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { + eval { + cleanupcvsdirs(); + cleanupdbfiles(); + cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); + cleanupmsgfiles(\&fileerror,\&filewarn); + cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + $result = 1; + }; + } + if ($@ or !$result) { + push(@$messages,'working directory cleanup INCOMPLETE'); + return 0; + } else { + push(@$messages,'working directory folders cleaned up'); + return 1; + } +} + +sub export_subscriber_profiles_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = export_subscriber_profiles(); + }; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + my ($export_filename,$export_format) = get_export_filename($subscriber_profiles_export_filename_format); + if ('sqlite' eq $export_format) { + &get_sqlite_db()->copydbfile($export_filename); + } elsif ('csv' eq $export_format) { + NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents::copy_table(\&get_csv_db); + &get_csv_db()->copytablefile(NGCP::BulkProcessor::Projects::ETL::EDR::Dao::PeriodEvents::gettablename(),$export_filename); + } else { + push(@$messages,'invalid extension for output filename $export_filename'); + } + }; + if ($err or !$result) { + push(@$messages,"exporting subscriber profiles INCOMPLETE$stats"); + } else { + push(@$messages,"exporting subscriber profiles completed$stats"); + } + destroy_all_dbs(); + return $result; + +} + +__DATA__ +This exists to allow the locking code at the beginning of the file to work. +DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/EDR/settings.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/settings.cfg new file mode 100644 index 0000000..2d7b235 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/EDR/settings.cfg @@ -0,0 +1,21 @@ +#dry=0 +#skip_errors=0 + +export_subscriber_profiles_multithreading = 1 +export_subscriber_profiles_numofthreads = 2 +export_subscriber_profiles_blocksize = 1000 +export_subscriber_profiles_limit = 10000 + +#export_cdr_conditions = { 'accounting.cdr.destination_domain' => { 'IN' => '("80.110.2.164","ccs.upc.at")' } } +#export_cdr_conditions = { 'accounting.cdr.destination_domain' => { '=' => '"ccs.upc.at"' } } +#, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } } +#{ 'accounting.cdr.call_status' => { '=' => '"ok"' } } +#export_cdr_joins = { 'accounting.cdr_export_status_data esd' => { 'esd.cdr_id' => 'accounting.cdr.id' } }, { 'accounting.cdr_export_status es' => { 'es.id' => 'esd.status_id' } } +export_cdr_conditions = { 'accounting.cdr.id' => { 'IN' => '(51,53, 87,89, 55, 79, 65,67,69, 81,83,85, 111, 113)' } } + +subscriber_profiles_export_filename=subscriber_profiles_%s.csv + +sqlite_db_file = sqlite +csv_dir = events +period_events_single_row_txn = 1 +ignore_period_events_unique = 0