TT#108604 introduce nosql abstraction and redis connector

Change-Id: Ia2f7682c6a9b88c42300dc667f1302eaf447bab3
(cherry picked from commit 207a82b621)
mr7.5.4
Rene Krenn 5 years ago
parent 95d9cb5594
commit 24a23de99b

1
debian/control vendored

@ -54,6 +54,7 @@ Depends:
libmime-tools-perl,
libnet-address-ip-local-perl,
libphp-serialization-perl,
libredis-perl,
libspreadsheet-parseexcel-perl,
libstring-mkpasswd-perl,
libsys-cpuaffinity-perl,

@ -47,6 +47,8 @@ our @EXPORT_OK = qw(
faketimeerror
dberror
dbwarn
nosqlerror
nosqlwarn
fieldnamesdiffer
transferzerorowcount
processzerorowcount
@ -312,6 +314,34 @@ sub dbwarn {
}
sub nosqlerror {
my ($connector, $message, $logger) = @_;
$message = _getnosqlconnectorinstanceprefix($connector) . _getnosqlconnectidentifiermessage($connector,$message);
if (defined $logger) {
$logger->error($message);
}
terminate($message, $logger);
#terminatethreads();
#die();
}
sub nosqlwarn {
my ($connector, $message, $logger) = @_;
$message = _getnosqlconnectorinstanceprefix($connector) . _getnosqlconnectidentifiermessage($connector,$message);
if (defined $logger) {
$logger->warn($message);
}
#die();
warning($message, $logger);
}
sub resterror {
my ($restapi, $message, $logger) = @_;
@ -794,4 +824,29 @@ sub _getrestconnectidentifiermessage {
return $result . $message;
}
sub _getnosqlconnectorinstanceprefix {
my ($connector) = @_;
my $instancestring = $connector->instanceidentifier();
if (length($instancestring) > 0) {
if ($connector->{tid} != $root_threadid) {
return '[' . $connector->{tid} . '/' . $instancestring . '] ';
} else {
return '[' . $instancestring . '] ';
}
} elsif ($connector->{tid} != $root_threadid) {
return '[' . $connector->{tid} . '] ';
}
return '';
}
sub _getnosqlconnectidentifiermessage {
my ($connector,$message) = @_;
my $result = $connector->connectidentifier();
if (length($result) > 0) {
$result .= ' - ';
}
return $result . $message;
}
1;

@ -33,6 +33,8 @@ our @EXPORT_OK = qw(
dbinfo
restdebug
restinfo
nosqldebug
nosqlinfo
attachmentdownloaderdebug
attachmentdownloaderinfo
@ -290,6 +292,28 @@ sub dbinfo {
}
sub nosqldebug {
my ($connector, $message, $logger) = @_;
if (defined $logger) {
$logger->debug(_getnosqlconnectorinstanceprefix($connector) . _getnosqlconnectidentifiermessage($connector,$message));
}
#die();
}
sub nosqlinfo {
my ($connector, $message, $logger) = @_;
if (defined $logger) {
$logger->info(_getnosqlconnectorinstanceprefix($connector) . _getnosqlconnectidentifiermessage($connector,$message));
}
#die();
}
sub restdebug {
my ($restapi, $message, $logger) = @_;
@ -856,6 +880,30 @@ sub _getsqlconnectidentifiermessage {
return $result . $message;
}
sub _getnosqlconnectorinstanceprefix {
my ($connector) = @_;
my $instancestring = $connector->instanceidentifier();
if (length($instancestring) > 0) {
if ($connector->{tid} != $root_threadid) {
return '[' . $connector->{tid} . '/' . $instancestring . '] ';
} else {
return '[' . $instancestring . '] ';
}
} elsif ($connector->{tid} != $root_threadid) {
return '[' . $connector->{tid} . '] ';
}
return '';
}
sub _getnosqlconnectidentifiermessage {
my ($connector,$message) = @_;
my $result = $connector->connectidentifier();
if (length($result) > 0) {
$result .= ' - ';
}
return $result . $message;
}
sub _getrestconnectorinstanceprefix {
my ($restapi) = @_;
my $instancestring = $restapi->instanceidentifier();

@ -3,4 +3,121 @@ use strict;
## no critic
use threads;
use threads::shared;
use NGCP::BulkProcessor::Logging qw(
getlogger
nosqlinfo
nosqldebug);
use NGCP::BulkProcessor::LogError qw(nosqlerror);
use NGCP::BulkProcessor::Utils qw(threadid);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
_share_scalar
_share_list
);
sub new {
my $class = shift;
my $self = bless {}, $class;
my $instanceid = shift;
$self->{instanceid} = $instanceid;
$self->{tid} = threadid();
return $self;
}
sub connectidentifier {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
return undef;
}
sub instanceidentifier {
my $self = shift;
$self->{instanceid} = shift if @_;
return $self->{instanceid};
}
sub connect {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub disconnect {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub is_connected {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub ping {
my $self = shift;
notimplementederror((ref $self) . ': ' . (caller(0))[3] . ' not implemented',getlogger(__PACKAGE__));
}
sub multithreading_supported {
my $self = shift;
return 0;
}
sub DESTROY {
my $self = shift;
# perl threads works like a fork, each thread owns a shalow? copy
# of the entire current context, at the moment it starts.
# due to this, if the thread is finished, perl gc will invoke destructors
# on the thread's scope elements, that potentially contains connectors from
# the main tread. it will actually attempt destroy them (disconnect, etc.)
# this is a problem with destructors that change object state like this one
#
# to avoid this, we perform destruction tasks only if the destructing tid
# is the same as the creating one:
if ($self->{tid} == threadid()) {
$self->disconnect();
eval {
nosqldebug($self,(ref $self) . ' connector destroyed',getlogger(__PACKAGE__));
};
}
}
sub _share_list {
my @args = @_;
my $result = shared_clone(\@args);
return @$result;
}
sub _share_scalar {
my $result = shared_clone(shift @_);
return $result;
}
1;

@ -0,0 +1,227 @@
package NGCP::BulkProcessor::NoSqlConnectors::Redis;
use strict;
## no critic
use NGCP::BulkProcessor::Globals qw(
$system_abbreviation
$enablemultithreading
$local_fqdn);
use NGCP::BulkProcessor::Logging qw(
getlogger
nosqlinfo
nosqldebug);
use NGCP::BulkProcessor::LogError qw(nosqlerror nosqlwarn);
use Redis;
#use NGCP::BulkProcessor::Array qw(arrayeq itemcount contains setcontains);
use NGCP::BulkProcessor::NoSqlConnector qw(
_share_scalar
_share_list
);
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnector);
our @EXPORT_OK = qw();
our $AUTOLOAD;
my $log_operations = 0;
my $defaulthost = '127.0.0.1';
my $defaultport = '6379';
my $defaultsock = undef;
my $defaultpassword = undef;
my $defaultdatabaseindex = '0';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::NoSqlConnector->new(@_);
$self->{host} = undef;
$self->{port} = undef;
$self->{sock} = undef;
$self->{databaseindex} = undef;
$self->{password} = undef;
$self->{redis} = undef;
bless($self,$class);
nosqldebug($self,__PACKAGE__ . ' connector created',getlogger(__PACKAGE__));
return $self;
}
sub connectidentifier {
my $self = shift;
if (defined $self->{databaseindex}) {
if ($self->{sock}) {
return $self->{sock} . '/' . $self->{databaseindex};
} else {
return $self->{host} . ':' . $self->{port} . '/' . $self->{databaseindex};
}
} else {
return undef;
}
}
sub connect {
my $self = shift;
my ($databaseindex,$password,$host,$port,$sock) = @_;
$self->disconnect();
$host = $defaulthost if (not $host);
$port = $defaultport if (not $port);
$sock = $defaultsock if (not $sock);
$databaseindex = $defaultdatabaseindex if (not $databaseindex);
$password = $defaultpassword if (not $password);
$self->{host} = $host;
$self->{port} = $port;
$self->{sock} = $sock;
$self->{databaseindex} = $databaseindex;
$self->{password} = $password;
#if (not contains($databasename,$self->getdatabases(),0)) {
# $self->_createdatabase($databasename);
#}
nosqldebug($self,'connecting',getlogger(__PACKAGE__));
my $name = $system_abbreviation;
$name .= '_' . $self->instanceidentifier() if $self->instanceidentifier();
$name =~ s/[^a-z0-9]+/_/gi;
$name =~ s/(^_+)|(_+$)//g;
if ($enablemultithreading) {
$name .= '_thread_' . $self->{tid};
}
my $redis = Redis->new(
($sock ?
(sock => $sock) :
(server => ($host . ':' . $port))
),
(defined $password ? (password => $password) : ()),
name => $name,
no_auto_connect_on_new => 1,
#debug => 1,
);
eval {
$redis->connect();
$redis->select($databaseindex);
#or die($!);
#my $dbsize = $redis->dbsize();
};
if ($@) {
nosqlerror($self, 'error connecting: ' . $@, getlogger(__PACKAGE__));
} else {
$self->{redis} = $redis;
nosqlinfo($self,'connected',getlogger(__PACKAGE__));
}
}
sub disconnect {
my $self = shift;
# since this is also called from DESTROY, no die() here!
if (defined $self->{redis}) {
nosqldebug($self,'disconnecting',getlogger(__PACKAGE__));
#$self->{redis}->wait_all_responses; #already part of quit()
$self->{redis}->quit() or nosqlwarn($self,'error disconnecting: ' . $!,getlogger(__PACKAGE__));
$self->{redis} = undef;
nosqlinfo($self,'disconnected',getlogger(__PACKAGE__));
}
}
sub is_connected {
my $self = shift;
return (defined $self->{redis});
}
sub ping {
my $self = shift;
return $self->{redis}->ping();
}
sub AUTOLOAD {
my $self = shift;
my @args = @_;
my $called = $AUTOLOAD;
my $regex = quotemeta(__PACKAGE__ . '::');
$called =~ s/^$regex//;
my $shared = 0;
if ($called =~ /_shared$/) {
$called = substr($called, 0, length($called) - length('_shared'));
$shared = 1;
if (ref $args[-1] eq 'CODE') {
my $cb = pop @args;
push(@args, sub {
return $cb->(_share_list(@_));
});
}
}
nosqldebug($self, $called . '(' . join(', ', @args) . ')', getlogger(__PACKAGE__)) if $log_operations;
if (wantarray) {
my @result;
eval {
@result = $self->{redis}->$called(@args);
};
if ($@) {
nosqlerror($self, $called . '(' . join(', ', @args) . ') error: ' . $@, getlogger(__PACKAGE__));
}
if ($shared) {
@result = _share_list(@result);
}
return @result;
} else {
my $result;
eval {
$result = $self->{redis}->$called(@args);
};
if ($@) {
nosqlerror($self, $called . '(' . join(', ', @args) . ') error: ' . $@, getlogger(__PACKAGE__));
}
if ($shared) {
$result = _share_scalar($result);
}
return $result;
}
}
sub multithreading_supported {
my $self = shift;
return 1;
}
1;
Loading…
Cancel
Save