From fbdccef6e97b8f22f9c8140276947be58ffd360a Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Mon, 8 Feb 2021 16:09:49 +0100 Subject: [PATCH] TT#110203 redis registration monitoring to SQLite PoC Change-Id: I3c7bcb14f3bff3de04258938d5e9a1bed2c493ab (cherry picked from commit c2a2b7c64de480a033f46b9322d55289a4050810) --- .../NoSqlConnectors/RedisProcessor.pm | 2 +- .../RegistrationMonitoring/Dao/Location.pm | 330 ++++++++++++++++++ .../Massive/RegistrationMonitoring/Process.pm | 214 ++++++++++++ .../ProjectConnectorPool.pm | 80 +++++ .../Massive/RegistrationMonitoring/Redis.t | 9 +- .../RegistrationMonitoring/Settings.pm | 147 ++++++++ .../Massive/RegistrationMonitoring/config.cfg | 69 ++++ .../RegistrationMonitoring/config.debug.cfg | 69 ++++ .../Massive/RegistrationMonitoring/process.pl | 222 +++++++++++- .../RegistrationMonitoring/settings.cfg | 10 + .../RegistrationMonitoring/settings.debug.cfg | 10 + lib/NGCP/BulkProcessor/Utils.pm | 35 ++ 12 files changed, 1193 insertions(+), 4 deletions(-) create mode 100644 lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Dao/Location.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Process.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/ProjectConnectorPool.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Settings.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/config.cfg create mode 100644 lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/config.debug.cfg create mode 100644 lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.cfg create mode 100644 lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.debug.cfg diff --git a/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm b/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm index b16867d..043effc 100644 --- a/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm +++ b/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm @@ -155,7 +155,7 @@ sub process_entries { } else { - my $store = &$get_store(); #$reader_connection_name); + my $store = &$get_store(undef,1); #$reader_connection_name); $blocksize //= $store->get_defaultblockcount(); my $context = _create_process_context($static_context,{ tid => $tid }); diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Dao/Location.pm b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Dao/Location.pm new file mode 100644 index 0000000..40f69bb --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Dao/Location.pm @@ -0,0 +1,330 @@ +package NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location; +use strict; + +## no critic + +use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs +); + +use NGCP::BulkProcessor::SqlProcessor qw( + registertableinfo + create_targettable + checktableinfo + copy_row + + insert_stmt + +); +#process_table +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + create_table + gettablename + check_table + getinsertstatement + getupsertstatement + + findby_domainusername + findby_ruid + + update_delta + findby_delta + countby_delta + + $deleted_delta + $updated_delta + $added_delta + + @fieldnames + +); + +my $tablename = 'location'; +my $get_db = \&get_sqlite_db; + +our @fieldnames = ( + 'instance', + 'domain', + 'cseq', + 'partition', + 'ruid', + 'connection_id', + 'username', + 'keepalive', + 'path', + 'reg_id', + 'contact', + 'flags', + 'received', + 'callid', + 'socket', + 'cflags', + 'expires', + 'methods', + 'user_agent', + 'q', + 'last_modified', + 'server_id', +); + +my $expected_fieldnames = [ + @fieldnames, + 'delta', +]; + +# table creation: +my $primarykey_fieldnames = [ 'ruid' ]; +my $indexes = { + #$tablename . '_number' => [ 'number(32)' ], + #$tablename . '_rownum' => [ 'rownum(11)' ], + $tablename . '_domain_username' => [ 'domain', 'username' ], + $tablename . '_delta' => [ 'delta(7)' ], +}; +#my $fixtable_statements = []; + +our $deleted_delta = 'DELETED'; +our $updated_delta = 'UPDATED'; +our $added_delta = 'ADDED'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub create_table { + + my ($truncate) = @_; + + my $db = &$get_db(); + + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); + +} + +sub findby_delta { + + my ($delta,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless defined $delta; + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . + $table . + ' WHERE ' . + $db->columnidentifier('delta') . ' = ?' + , $delta); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub findby_domainusername { + + my ($domain,$username,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return [] unless (defined $domain and defined $username); + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . $table . + ' WHERE ' . $db->columnidentifier('domain') . ' = ?' . + ' AND ' . $db->columnidentifier('username') . ' = ?' + , $domain, $username); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub findby_ruid { + + my ($ruid,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + return undef unless defined $ruid; + + my $rows = $db->db_get_all_arrayref( + 'SELECT * FROM ' . $table . + ' WHERE ' . $db->columnidentifier('ruid') . ' = ?' + , $ruid); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub update_delta { + + my ($ruid,$delta) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'UPDATE ' . $table . ' SET delta = ?'; + my @params = (); + push(@params,$delta); + if (defined $ruid) { + $stmt .= ' WHERE ' . + $db->columnidentifier('ruid') . ' = ?'; + push(@params, $ruid); + } + + return $db->db_do($stmt,@params); + +} + +sub countby_delta { + + my ($deltas) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table . ' WHERE 1=1'; + my @params = (); + if (defined $deltas and 'HASH' eq ref $deltas) { + foreach my $in (keys %$deltas) { + my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $deltas and length($deltas) > 0) { + $stmt .= ' AND ' . $db->columnidentifier('delta') . ' = ?'; + push(@params,$deltas); + } + + return $db->db_get_value($stmt,@params); + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +#sub process_records { +# +# my %params = @_; +# my ($process_code, +# $static_context, +# $init_process_context_code, +# $uninit_process_context_code, +# $multithreading, +# $numofthreads) = @params{qw/ +# process_code +# static_context +# init_process_context_code +# uninit_process_context_code +# multithreading +# numofthreads +# /}; +# +# check_table(); +# my $db = &$get_db(); +# my $table = $db->tableidentifier($tablename); +# +# my @cols = map { $db->columnidentifier($_); } qw/domain sip_username/; +# +# return process_table( +# get_db => $get_db, +# class => __PACKAGE__, +# process_code => sub { +# my ($context,$rowblock,$row_offset) = @_; +# return &$process_code($context,$rowblock,$row_offset); +# }, +# static_context => $static_context, +# init_process_context_code => $init_process_context_code, +# uninit_process_context_code => $uninit_process_context_code, +# destroy_reader_dbs_code => \&destroy_all_dbs, +# multithreading => $multithreading, +# tableprocessing_threads => $numofthreads, +# 'select' => 'SELECT ' . join(',',@cols) . ' FROM ' . $table . ' GROUP BY ' . join(',',@cols), +# 'selectcount' => 'SELECT COUNT(DISTINCT(' . join(',',@cols) . ')) FROM ' . $table, +# ); +#} + +sub getinsertstatement { + + my ($insert_ignore) = @_; + check_table(); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); + +} + +sub getupsertstatement { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' . + join(', ', map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')'; + my @values = (); + foreach my $fieldname (@$expected_fieldnames) { + if ('delta' eq $fieldname) { + my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' . + $db->columnidentifier('ruid') . ' = ?'; + push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')'); + } else { + push(@values,'?'); + } + } + $upsert_stmt .= ' VALUES (' . join(',',@values) . ')'; + return $upsert_stmt; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Process.pm b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Process.pm new file mode 100644 index 0000000..2dee47b --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Process.pm @@ -0,0 +1,214 @@ +package NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Process; +use strict; + +## no critic + +use threads::shared qw(); + +#use Encode qw(); + +use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings qw( + + $usernames_filename + $usernames_rownum_start + $load_registrations_multithreading + $load_registrations_numofthreads + $ignore_location_unique + $location_single_row_txn + + $skip_errors + +); +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + fileprocessingwarn + fileprocessingerror +); + +use NGCP::BulkProcessor::FileProcessors::CSVFileSimple qw(); +#use NGCP::BulkProcessor::FileProcessors::XslxFileSimple qw(); + +use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs +); +use NGCP::BulkProcessor::ConnectorPool qw( + destroy_stores +); + +use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location qw(); + +use NGCP::BulkProcessor::Redis::Trunk::location::usrdom qw(); + +use NGCP::BulkProcessor::Utils qw(threadid trim); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + load_registrations +); + +sub load_registrations { + + my ($file) = @_; + $file //= $usernames_filename; + _error({ filename => '', },'no file specified') unless $file; + + my $result = NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::create_table(0); + + my $importer = NGCP::BulkProcessor::FileProcessors::CSVFileSimple->new($load_registrations_numofthreads,undef,';'); + + my $upsert = _locations_reset_delta(); + + destroy_all_dbs(); #close all db connections before forking.. + destroy_stores(); + my $warning_count :shared = 0; + return ($result && $importer->process( + file => $file, + process_code => sub { + my ($context,$records,$row_offset) = @_; + my @data = (); + my @location_rows = (); + foreach my $record (@$records) { + if (_load_registrations_init_context($context,$record->[0],$record->[1])) { + foreach my $registration (@{$context->{registrations}}) { + my %r = %{$registration->getvalue}; my @row_ext = @r{@NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::fieldnames}; + if ($context->{upsert}) { + push(@row_ext,$registration->getvalue()->{ruid}); + } else { + push(@row_ext,$NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::added_delta); + } + push(@location_rows,\@row_ext); + } + if ($location_single_row_txn and (scalar @location_rows) > 0) { + while (defined (my $location_row = shift @location_rows)) { + if ($skip_errors) { + eval { _insert_location_rows($context,[$location_row]); }; + _warn($context,$@) if $@; + } else { + _insert_location_rows($context,[$location_row]); + } + } + } + } + } + if (not $location_single_row_txn and (scalar @location_rows) > 0) { + if ($skip_errors) { + eval { _insert_location_rows($context,\@location_rows); }; + _warn($context,$@) if $@; + } else { + _insert_location_rows($context,\@location_rows); + } + } + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_sqlite_db(); + #$context->{redis} = &get_location_store(); + $context->{upsert} = $upsert; + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + #undef $context->{redis}; + destroy_all_dbs(); + destroy_stores(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + multithreading => $load_registrations_multithreading, + numofthreads => $load_registrations_numofthreads, + ),$warning_count); + +} + +sub _locations_reset_delta { + my $upsert = 0; + if (NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta() > 0) { + processing_info(threadid(),'resetting delta of ' . + NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::update_delta(undef, + $NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::deleted_delta) . + ' location records',getlogger(__PACKAGE__)); + $upsert |= 1; + } + return $upsert; +} + +sub _insert_location_rows { + my ($context,$location_rows) = @_; + $context->{db}->db_do_begin( + ($context->{upsert} ? + NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::getupsertstatement() + : NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::getinsertstatement($ignore_location_unique)), + ); + eval { + $context->{db}->db_do_rowblock($location_rows); + $context->{db}->db_finish(); + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_finish(1); + }; + die($err); + } +} + +sub _load_registrations_init_context() { + + my ($context,$username,$domain) = @_; + $context->{username} = $username; + $context->{domain} = $domain; + my @registrations = (); + my $result = 1; + $context->{usrdom} = NGCP::BulkProcessor::Redis::Trunk::location::usrdom::get_usrdom_by_username_domain($username,$domain,{ _entries => 1, }); + if ($context->{usrdom}) { + foreach my $entry (@{$context->{usrdom}->{_entries}}) { + push(@registrations,$entry); # if expiry > now + } + } + $result = 0 unless scalar @registrations; + $context->{registrations} = \@registrations; + return $result; + +} + + + + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + fileprocessingerror($context->{filename},$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + fileprocessingwarn($context->{filename},$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid},$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid},$message,getlogger(__PACKAGE__)); + } +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/ProjectConnectorPool.pm b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/ProjectConnectorPool.pm new file mode 100644 index 0000000..cecdc51 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/ProjectConnectorPool.pm @@ -0,0 +1,80 @@ +package NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::ProjectConnectorPool; +use strict; + +## no critic + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../'); + +use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings qw( + + $sqlite_db_file +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_connectorinstancename + +); + +#use NGCP::BulkProcessor::SqlConnectors::MySQLDB; +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode); + +use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + + get_sqlite_db + + destroy_dbs + destroy_all_dbs + +); + +my $sqlite_dbs = {}; + +sub get_sqlite_db { + + my ($instance_name,$reconnect) = @_; + my $name = get_connectorinstancename($instance_name); #threadid(); #shift; + + if (not defined $sqlite_dbs->{$name}) { + $sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); #$name); + if (not defined $reconnect) { + $reconnect = 1; + } + } + if ($reconnect) { + $sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file); + } + + return $sqlite_dbs->{$name}; + +} + +sub sqlite_db_tableidentifier { + + my ($get_target_db,$tablename) = @_; + my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; + return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file)); + +} + +sub destroy_dbs { + + foreach my $name (keys %$sqlite_dbs) { + cleartableinfo($sqlite_dbs->{$name}); + undef $sqlite_dbs->{$name}; + delete $sqlite_dbs->{$name}; + } + +} + +sub destroy_all_dbs() { + destroy_dbs(); + NGCP::BulkProcessor::ConnectorPool::destroy_dbs(); +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t index dfb2064..995d2a5 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t @@ -80,8 +80,13 @@ SKIP: process_code => sub { my ($context,$records,$row_offset) = @_; #die(); - print @$records . " done\n"; - return 0; + print join("\n", map { + my $key = $_->getkey(); + $key =~ s/location\:usrdom\:\://; + $key =~ s/\:/;/; + $key; + } @$records); + return 1; }, static_context => $static_context, blocksize => 10000, diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Settings.pm new file mode 100644 index 0000000..48d1dd4 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Settings.pm @@ -0,0 +1,147 @@ +package NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings; +use strict; + +## no critic + +use threads::shared qw(); + +use File::Basename qw(fileparse); +use DateTime::TimeZone qw(); + +use NGCP::BulkProcessor::Globals qw( + $working_path + $enablemultithreading + $cpucount + create_path +); + +use NGCP::BulkProcessor::Logging qw( + getlogger + scriptinfo + configurationinfo +); + +use NGCP::BulkProcessor::LogError qw( + fileerror + filewarn + configurationwarn + configurationerror +); + +use NGCP::BulkProcessor::LoadConfig qw( + split_tuple + parse_regexp +); +use NGCP::BulkProcessor::Utils qw(prompt timestampdigits threadid); +#format_number check_ipnet +use NGCP::BulkProcessor::Array qw(contains); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + update_settings + + $sqlite_db_file + + $output_path + $input_path + + $defaultsettings + $defaultconfig + + $usernames_filename + $usernames_rownum_start + $load_registrations_numofthreads + $load_registrations_multithreading + $ignore_location_unique + $location_single_row_txn + + $skip_errors + $force + +); + +our $defaultconfig = 'config.cfg'; +our $defaultsettings = 'settings.cfg'; + +our $output_path = $working_path . 'output/'; +our $input_path = $working_path . 'input/'; + +our $usernames_filename = undef; +our $usernames_rownum_start = 0; +our $load_registrations_multithreading = $enablemultithreading; +our $load_registrations_numofthreads = $cpucount; +our $ignore_location_unique = 0; +our $location_single_row_txn = 0; + +our $skip_errors = 0; +our $force = 0; + +our $sqlite_db_file = 'sqlite'; + +sub update_settings { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + my $regexp_result; + + #&$configurationinfocode("testinfomessage",$configlogger); + + $result &= _prepare_working_paths(1); + + $sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file}; + + $load_registrations_multithreading = $data->{load_registrations_multithreading} if exists $data->{load_registrations_multithreading}; + $usernames_filename = _get_import_filename($usernames_filename,$data,'usernames_filename'); + $usernames_rownum_start = $data->{usernames_rownum_start} if exists $data->{usernames_rownum_start}; + $load_registrations_numofthreads = _get_numofthreads($cpucount,$data,'load_registrations_numofthreads'); + $ignore_location_unique = $data->{ignore_location_unique} if exists $data->{ignore_location_unique}; + $location_single_row_txn = $data->{location_single_row_txn} if exists $data->{location_single_row_txn}; + + $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; + $force = $data->{force} if exists $data->{force}; + + return $result; + + } + return 0; + +} + +sub _prepare_working_paths { + + my ($create) = @_; + my $result = 1; + my $path_result; + + ($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + ($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + + return $result; + +} + +sub _get_numofthreads { + my ($default_value,$data,$key) = @_; + my $numofthreads = $default_value; + $numofthreads = $data->{$key} if exists $data->{$key}; + $numofthreads = $cpucount if $numofthreads > $cpucount; + return $numofthreads; +} + +sub _get_import_filename { + my ($old_value,$data,$key) = @_; + my $import_filename = $old_value; + $import_filename = $data->{$key} if exists $data->{$key}; + if (defined $import_filename and length($import_filename) > 0) { + $import_filename = $input_path . $import_filename unless -e $import_filename; + } + return $import_filename; +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/config.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/config.cfg new file mode 100644 index 0000000..e0a5701 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/config.cfg @@ -0,0 +1,69 @@ +##general settings: +working_path = /var/sipwise +#cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = localhost +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = localhost +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = localhost +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = localhost +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = localhost +xa_port = 3306 +xa_databasename = billing +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://10.0.2.15:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##NGCP Redis connectivity - "location" store: +location_databaseindex = 20 +#location_password = +location_host = 127.0.0.1 +location_port = 6379 +#location_sock = + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +#INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/config.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/config.debug.cfg new file mode 100644 index 0000000..9850a3b --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/config.debug.cfg @@ -0,0 +1,69 @@ +##general settings: +working_path = /home/rkrenn/temp/massive +#cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = 192.168.0.146 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = 192.168.0.146 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = 192.168.0.146 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = 192.168.0.146 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = 192.168.0.146 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##NGCP Redis connectivity - "location" store: +location_databaseindex = 20 +#location_password = +location_host = 192.168.0.146 +location_port = 6379 +#location_sock = + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +#INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl index 684f6d8..9e42a54 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl @@ -1,3 +1,223 @@ +use strict; +## no critic -get_location_store \ No newline at end of file +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); + +use Getopt::Long qw(GetOptions); +use Fcntl qw(LOCK_EX LOCK_NB); + +use NGCP::BulkProcessor::Globals qw(); +use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Settings qw( + update_settings + + $output_path + $defaultsettings + $defaultconfig + + $skip_errors + $force +); + +use NGCP::BulkProcessor::Logging qw( + init_log + getlogger + $attachmentlogfile + scriptinfo + cleanuplogfiles + $currentlogfile +); +use NGCP::BulkProcessor::LogError qw ( + completion + done + scriptwarn + scripterror + filewarn + fileerror +); +use NGCP::BulkProcessor::LoadConfig qw( + load_config + $SIMPLE_CONFIG_TYPE + $YAML_CONFIG_TYPE + $ANY_CONFIG_TYPE +); +use NGCP::BulkProcessor::Array qw(removeduplicates); +use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); +use NGCP::BulkProcessor::Mail qw( + cleanupmsgfiles +); + +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); + +use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location qw(); + +use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::ProjectConnectorPool qw( + destroy_all_dbs +); +use NGCP::BulkProcessor::ConnectorPool qw( + destroy_stores +); + +use NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Process qw( + load_registrations +); + +scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet + +my @TASK_OPTS = (); + +my $tasks = []; +my $file = undef; + +my $cleanup_task_opt = 'cleanup'; +push(@TASK_OPTS,$cleanup_task_opt); + +my $cleanup_all_task_opt = 'cleanup_all'; +push(@TASK_OPTS,$cleanup_all_task_opt); + +my $load_registrations_task_opt = 'load_registrations'; +push(@TASK_OPTS,$load_registrations_task_opt); + +if (init()) { + main(); + exit(0); +} else { + exit(1); +} + +sub init { + + my $configfile = $defaultconfig; + my $settingsfile = $defaultsettings; + + return 0 unless GetOptions( + "config=s" => \$configfile, + "settings=s" => \$settingsfile, + "task=s" => $tasks, + #"dry" => \$dry, + "skip-errors" => \$skip_errors, + "force" => \$force, + "file=s" => \$file, + ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); + + $tasks = removeduplicates($tasks,1); + + my $result = load_config($configfile); + init_log(); + $result &= load_config($settingsfile,\&update_settings,$SIMPLE_CONFIG_TYPE); + return $result; + +} + +sub main() { + + my @messages = (); + my @attachmentfiles = (); + my $result = 1; + my $completion = 0; + + if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { + scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; + foreach my $task (@$tasks) { + + if (lc($cleanup_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,0) if taskinfo($cleanup_task_opt,$result); + } elsif (lc($cleanup_all_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_all_task_opt,$result); + + } elsif (lc($load_registrations_task_opt) eq lc($task)) { + $result &= load_registrations_task(\@messages) if taskinfo($load_registrations_task_opt,$result); + + } else { + $result = 0; + scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + last; + } + } + } else { + $result = 0; + scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + } + + push(@attachmentfiles,$attachmentlogfile); + if ($completion) { + completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } else { + done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } + + return $result; +} + +sub taskinfo { + my ($task,$result) = @_; + scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); + return $result; +} + +sub cleanup_task { + my ($messages,$clean_generated) = @_; + my $result = 0; + if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { + eval { + #cleanupcvsdirs() if $clean_generated; + cleanupdbfiles() if $clean_generated; + cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); + cleanupmsgfiles(\&fileerror,\&filewarn); + #cleanupcertfiles(); + cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + $result = 1; + }; + } + if ($@ or !$result) { + #print $@; + push(@$messages,'working directory cleanup INCOMPLETE'); + return 0; + } else { + push(@$messages,'working directory folders cleaned up'); + return 1; + } +} + +sub load_registrations_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = load_registrations($file); + }; + #print $@; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + $stats .= "\n total loaded registrations: " . + NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta() . ' rows'; + my $added_count = NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta( + $NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::added_delta + ); + $stats .= "\n new: $added_count rows"; + my $existing_count = NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta( + $NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::updated_delta + ); + $stats .= "\n existing: $existing_count rows"; + my $deleted_count = NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::countby_delta( + $NGCP::BulkProcessor::Projects::Massive::RegistrationMonitoring::Dao::Location::deleted_delta + ); + $stats .= "\n removed: $deleted_count rows"; + }; + if ($err or !$result) { + push(@$messages,"loading registrations INCOMPLETE$stats"); + } else { + push(@$messages,"loading registrations completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + destroy_stores(); + return $result; + +} + +__DATA__ +This exists to allow the locking code at the beginning of the file to work. +DO NOT REMOVE THESE LINES! diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.cfg new file mode 100644 index 0000000..1496ca3 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.cfg @@ -0,0 +1,10 @@ + +#force=0 +#skip_errors=0 + +load_registrations_multithreading = 1 +#load_registrations_numofthreads = 4 +usernames_filename = +usernames_rownum_start = 1 +#ignore_location_unique = 0 +location_single_row_txn = 0 \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.debug.cfg new file mode 100644 index 0000000..defe69a --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/settings.debug.cfg @@ -0,0 +1,10 @@ + +#force=0 +#skip_errors=0 + +load_registrations_multithreading = 1 +#load_registrations_numofthreads = 4 +usernames_filename = /home/rkrenn/temp/massive/input/test.csv +usernames_rownum_start = 1 +#ignore_location_unique = 0 +location_single_row_txn = 0 \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Utils.pm b/lib/NGCP/BulkProcessor/Utils.pm index a5f302d..ed4af0b 100644 --- a/lib/NGCP/BulkProcessor/Utils.pm +++ b/lib/NGCP/BulkProcessor/Utils.pm @@ -111,6 +111,8 @@ our @EXPORT_OK = qw( prompt check_int check_ipnet + + unshare ); our $chmod_umask = 0777; @@ -1025,4 +1027,37 @@ sub check_int { return 0; } +sub unshare { + + # PP deep-copy without tie-ing, to un-share shared datastructures, + # so they can be manipulated without errors + my ($obj) = @_; + return undef if not defined $obj; # terminal for: undefined + my $ref = ref $obj; + if (not $ref) { # terminal for: scalar + return $obj; + } elsif ("SCALAR" eq $ref) { # terminal for: scalar ref + $obj = $$obj; + return \$obj; + } elsif ("ARRAY" eq $ref) { # terminal for: array + my @array = (); + foreach my $value (@$obj) { + push(@array, unshare($value)); + } + return \@array; + } elsif ($ref eq "HASH") { # terminal for: hash + my %hash = (); + foreach my $key (keys %$obj) { + $hash{$key} = unshare($obj->{$key}); + } + return \%hash; + } elsif ("REF" eq $ref) { # terminal for: ref of scalar ref, array, hash etc. + $obj = unshare($$obj); + return \$obj; + } else { + die("unsharing $ref not supported\n"); + } + +} + 1;