From 24a23de99be74189eef8c458b8b1cf1ad3ee477f Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Mon, 18 Jan 2021 19:25:29 +0100 Subject: [PATCH] TT#108604 introduce nosql abstraction and redis connector Change-Id: Ia2f7682c6a9b88c42300dc667f1302eaf447bab3 (cherry picked from commit 207a82b621b87e2265e8127e73712855ff45e5e5) --- debian/control | 1 + lib/NGCP/BulkProcessor/LogError.pm | 55 +++++ lib/NGCP/BulkProcessor/Logging.pm | 48 ++++ lib/NGCP/BulkProcessor/NoSqlConnector.pm | 117 +++++++++ .../BulkProcessor/NoSqlConnectors/Redis.pm | 227 ++++++++++++++++++ 5 files changed, 448 insertions(+) create mode 100644 lib/NGCP/BulkProcessor/NoSqlConnectors/Redis.pm diff --git a/debian/control b/debian/control index 5aecf4b..57fb97f 100644 --- a/debian/control +++ b/debian/control @@ -54,6 +54,7 @@ Depends: libmime-tools-perl, libnet-address-ip-local-perl, libphp-serialization-perl, + libredis-perl, libspreadsheet-parseexcel-perl, libstring-mkpasswd-perl, libsys-cpuaffinity-perl, diff --git a/lib/NGCP/BulkProcessor/LogError.pm b/lib/NGCP/BulkProcessor/LogError.pm index ad17472..c01c3b2 100644 --- a/lib/NGCP/BulkProcessor/LogError.pm +++ b/lib/NGCP/BulkProcessor/LogError.pm @@ -47,6 +47,8 @@ our @EXPORT_OK = qw( faketimeerror dberror dbwarn + nosqlerror + nosqlwarn fieldnamesdiffer transferzerorowcount processzerorowcount @@ -312,6 +314,34 @@ sub dbwarn { } +sub nosqlerror { + + my ($connector, $message, $logger) = @_; + $message = _getnosqlconnectorinstanceprefix($connector) . _getnosqlconnectidentifiermessage($connector,$message); + if (defined $logger) { + $logger->error($message); + } + + terminate($message, $logger); + #terminatethreads(); + #die(); + +} + +sub nosqlwarn { + + my ($connector, $message, $logger) = @_; + $message = _getnosqlconnectorinstanceprefix($connector) . _getnosqlconnectidentifiermessage($connector,$message); + if (defined $logger) { + $logger->warn($message); + } + + #die(); + warning($message, $logger); + +} + + sub resterror { my ($restapi, $message, $logger) = @_; @@ -794,4 +824,29 @@ sub _getrestconnectidentifiermessage { return $result . $message; } + +sub _getnosqlconnectorinstanceprefix { + my ($connector) = @_; + my $instancestring = $connector->instanceidentifier(); + if (length($instancestring) > 0) { + if ($connector->{tid} != $root_threadid) { + return '[' . $connector->{tid} . '/' . $instancestring . '] '; + } else { + return '[' . $instancestring . '] '; + } + } elsif ($connector->{tid} != $root_threadid) { + return '[' . $connector->{tid} . '] '; + } + return ''; +} + +sub _getnosqlconnectidentifiermessage { + my ($connector,$message) = @_; + my $result = $connector->connectidentifier(); + if (length($result) > 0) { + $result .= ' - '; + } + return $result . $message; +} + 1; diff --git a/lib/NGCP/BulkProcessor/Logging.pm b/lib/NGCP/BulkProcessor/Logging.pm index a0f2c49..5b57e9f 100644 --- a/lib/NGCP/BulkProcessor/Logging.pm +++ b/lib/NGCP/BulkProcessor/Logging.pm @@ -33,6 +33,8 @@ our @EXPORT_OK = qw( dbinfo restdebug restinfo + nosqldebug + nosqlinfo attachmentdownloaderdebug attachmentdownloaderinfo @@ -290,6 +292,28 @@ sub dbinfo { } +sub nosqldebug { + + my ($connector, $message, $logger) = @_; + if (defined $logger) { + $logger->debug(_getnosqlconnectorinstanceprefix($connector) . _getnosqlconnectidentifiermessage($connector,$message)); + } + + #die(); + +} + +sub nosqlinfo { + + my ($connector, $message, $logger) = @_; + if (defined $logger) { + $logger->info(_getnosqlconnectorinstanceprefix($connector) . _getnosqlconnectidentifiermessage($connector,$message)); + } + + #die(); + +} + sub restdebug { my ($restapi, $message, $logger) = @_; @@ -856,6 +880,30 @@ sub _getsqlconnectidentifiermessage { return $result . $message; } +sub _getnosqlconnectorinstanceprefix { + my ($connector) = @_; + my $instancestring = $connector->instanceidentifier(); + if (length($instancestring) > 0) { + if ($connector->{tid} != $root_threadid) { + return '[' . $connector->{tid} . '/' . $instancestring . '] '; + } else { + return '[' . $instancestring . '] '; + } + } elsif ($connector->{tid} != $root_threadid) { + return '[' . $connector->{tid} . '] '; + } + return ''; +} + +sub _getnosqlconnectidentifiermessage { + my ($connector,$message) = @_; + my $result = $connector->connectidentifier(); + if (length($result) > 0) { + $result .= ' - '; + } + return $result . $message; +} + sub _getrestconnectorinstanceprefix { my ($restapi) = @_; my $instancestring = $restapi->instanceidentifier(); diff --git a/lib/NGCP/BulkProcessor/NoSqlConnector.pm b/lib/NGCP/BulkProcessor/NoSqlConnector.pm index a239802..263845e 100644 --- a/lib/NGCP/BulkProcessor/NoSqlConnector.pm +++ b/lib/NGCP/BulkProcessor/NoSqlConnector.pm @@ -3,4 +3,121 @@ use strict; ## no critic +use threads; +use threads::shared; + +use NGCP::BulkProcessor::Logging qw( + getlogger + nosqlinfo + nosqldebug); +use NGCP::BulkProcessor::LogError qw(nosqlerror); + +use NGCP::BulkProcessor::Utils qw(threadid); + + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + _share_scalar + _share_list +); + +sub new { + + my $class = shift; + my $self = bless {}, $class; + my $instanceid = shift; + + $self->{instanceid} = $instanceid; + $self->{tid} = threadid(); + + return $self; + +} + +sub connectidentifier { + + my $self = shift; + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); + return undef; + +} + +sub instanceidentifier { + my $self = shift; + + $self->{instanceid} = shift if @_; + return $self->{instanceid}; + +} + +sub connect { + + my $self = shift; + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); + +} + +sub disconnect { + + my $self = shift; + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); + +} + +sub is_connected { + + my $self = shift; + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); + +} + +sub ping { + + my $self = shift; + notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__)); + +} + +sub multithreading_supported { + + my $self = shift; + return 0; + +} + +sub DESTROY { + + my $self = shift; + + # perl threads works like a fork, each thread owns a shalow? copy + # of the entire current context, at the moment it starts. + # due to this, if the thread is finished, perl gc will invoke destructors + # on the thread's scope elements, that potentially contains connectors from + # the main tread. it will actually attempt destroy them (disconnect, etc.) + # this is a problem with destructors that change object state like this one + # + # to avoid this, we perform destruction tasks only if the destructing tid + # is the same as the creating one: + + if ($self->{tid} == threadid()) { + $self->disconnect(); + eval { + nosqldebug($self,(ref $self) . ' connector destroyed',getlogger(__PACKAGE__)); + }; + } + +} + +sub _share_list { + my @args = @_; + my $result = shared_clone(\@args); + return @$result; +} + +sub _share_scalar { + my $result = shared_clone(shift @_); + return $result; +} + 1; diff --git a/lib/NGCP/BulkProcessor/NoSqlConnectors/Redis.pm b/lib/NGCP/BulkProcessor/NoSqlConnectors/Redis.pm new file mode 100644 index 0000000..1ee74f3 --- /dev/null +++ b/lib/NGCP/BulkProcessor/NoSqlConnectors/Redis.pm @@ -0,0 +1,227 @@ +package NGCP::BulkProcessor::NoSqlConnectors::Redis; +use strict; + +## no critic + +use NGCP::BulkProcessor::Globals qw( + $system_abbreviation + $enablemultithreading + $local_fqdn); +use NGCP::BulkProcessor::Logging qw( + getlogger + nosqlinfo + nosqldebug); +use NGCP::BulkProcessor::LogError qw(nosqlerror nosqlwarn); + +use Redis; + +#use NGCP::BulkProcessor::Array qw(arrayeq itemcount contains setcontains); + +use NGCP::BulkProcessor::NoSqlConnector qw( + _share_scalar + _share_list +); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnector); +our @EXPORT_OK = qw(); + +our $AUTOLOAD; + +my $log_operations = 0; + +my $defaulthost = '127.0.0.1'; +my $defaultport = '6379'; +my $defaultsock = undef; +my $defaultpassword = undef; +my $defaultdatabaseindex = '0'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::NoSqlConnector->new(@_); + + $self->{host} = undef; + $self->{port} = undef; + $self->{sock} = undef; + $self->{databaseindex} = undef; + $self->{password} = undef; + + $self->{redis} = undef; + + bless($self,$class); + + nosqldebug($self,__PACKAGE__ . ' connector created',getlogger(__PACKAGE__)); + + return $self; + +} + +sub connectidentifier { + + my $self = shift; + if (defined $self->{databaseindex}) { + if ($self->{sock}) { + return $self->{sock} . '/' . $self->{databaseindex}; + } else { + return $self->{host} . ':' . $self->{port} . '/' . $self->{databaseindex}; + } + } else { + return undef; + } + +} + +sub connect { + + my $self = shift; + + my ($databaseindex,$password,$host,$port,$sock) = @_; + + $self->disconnect(); + + $host = $defaulthost if (not $host); + $port = $defaultport if (not $port); + $sock = $defaultsock if (not $sock); + $databaseindex = $defaultdatabaseindex if (not $databaseindex); + $password = $defaultpassword if (not $password); + + $self->{host} = $host; + $self->{port} = $port; + $self->{sock} = $sock; + $self->{databaseindex} = $databaseindex; + $self->{password} = $password; + + #if (not contains($databasename,$self->getdatabases(),0)) { + # $self->_createdatabase($databasename); + #} + + nosqldebug($self,'connecting',getlogger(__PACKAGE__)); + + my $name = $system_abbreviation; + $name .= '_' . $self->instanceidentifier() if $self->instanceidentifier(); + $name =~ s/[^a-z0-9]+/_/gi; + $name =~ s/(^_+)|(_+$)//g; + if ($enablemultithreading) { + $name .= '_thread_' . $self->{tid}; + } + + my $redis = Redis->new( + ($sock ? + (sock => $sock) : + (server => ($host . ':' . $port)) + ), + (defined $password ? (password => $password) : ()), + name => $name, + no_auto_connect_on_new => 1, + #debug => 1, + ); + + eval { + $redis->connect(); + $redis->select($databaseindex); + #or die($!); + #my $dbsize = $redis->dbsize(); + }; + if ($@) { + nosqlerror($self, 'error connecting: ' . $@, getlogger(__PACKAGE__)); + } else { + $self->{redis} = $redis; + nosqlinfo($self,'connected',getlogger(__PACKAGE__)); + } + +} + +sub disconnect { + + my $self = shift; + + # since this is also called from DESTROY, no die() here! + + if (defined $self->{redis}) { + + nosqldebug($self,'disconnecting',getlogger(__PACKAGE__)); + + #$self->{redis}->wait_all_responses; #already part of quit() + $self->{redis}->quit() or nosqlwarn($self,'error disconnecting: ' . $!,getlogger(__PACKAGE__)); + $self->{redis} = undef; + + nosqlinfo($self,'disconnected',getlogger(__PACKAGE__)); + + } + +} + +sub is_connected { + + my $self = shift; + return (defined $self->{redis}); + +} + +sub ping { + + my $self = shift; + + return $self->{redis}->ping(); + +} + +sub AUTOLOAD { + + my $self = shift; + my @args = @_; + + my $called = $AUTOLOAD; + my $regex = quotemeta(__PACKAGE__ . '::'); + $called =~ s/^$regex//; + my $shared = 0; + if ($called =~ /_shared$/) { + $called = substr($called, 0, length($called) - length('_shared')); + $shared = 1; + if (ref $args[-1] eq 'CODE') { + my $cb = pop @args; + push(@args, sub { + return $cb->(_share_list(@_)); + }); + } + } + + nosqldebug($self, $called . '(' . join(', ', @args) . ')', getlogger(__PACKAGE__)) if $log_operations; + + if (wantarray) { + my @result; + eval { + @result = $self->{redis}->$called(@args); + }; + if ($@) { + nosqlerror($self, $called . '(' . join(', ', @args) . ') error: ' . $@, getlogger(__PACKAGE__)); + } + if ($shared) { + @result = _share_list(@result); + } + return @result; + } else { + my $result; + eval { + $result = $self->{redis}->$called(@args); + }; + if ($@) { + nosqlerror($self, $called . '(' . join(', ', @args) . ') error: ' . $@, getlogger(__PACKAGE__)); + } + if ($shared) { + $result = _share_scalar($result); + } + return $result; + } + +} + +sub multithreading_supported { + + my $self = shift; + return 1; + +} + +1;