TT#108605 redis "location" store schema #2

Change-Id: Id1195d5c0aa57015a0680f849df5c92af40dbd85
(cherry picked from commit 20b5920a7a)
mr7.5.5
Rene Krenn 5 years ago
parent 1673e11728
commit 0a285e1665

@ -346,11 +346,16 @@ sub nosqlwarn {
sub nosqlprocessingfailed {
my ($store,$scan_pattern,$logger) = @_;
my $message = 'keystore processing failed: [' . $store->connectidentifier() . '] ' . $scan_pattern;
my $msg = 'keystore processing failed: ';
my $connectidentifier = $store->connectidentifier();
if ($connectidentifier) {
$msg .= '[' . $connectidentifier . '] ';
}
$msg .= $scan_pattern;
if (defined $logger) {
$logger->error($message);
$logger->error($msg);
}
terminate($message, $logger);
terminate($msg, $logger);
}

@ -823,7 +823,14 @@ sub nosqlprocessingstarted {
my ($store,$scan_pattern,$logger) = @_;
if (defined $logger) {
$logger->info('keystore processing started: [' . $store->connectidentifier() . '] ' . $scan_pattern);
my $msg = 'keystore processing started: ';
my $connectidentifier = $store->connectidentifier();
if ($connectidentifier) {
$msg .= '[' . $connectidentifier . '] ';
}
$msg .= $scan_pattern;
$logger->info($msg);
}
}
@ -832,7 +839,14 @@ sub nosqlprocessingdone {
my ($store,$scan_pattern,$logger) = @_;
if (defined $logger) {
$logger->info('keystore processing done: [' . $store->connectidentifier() . '] ' . $scan_pattern);
my $msg = 'keystore processing done: ';
my $connectidentifier = $store->connectidentifier();
if ($connectidentifier) {
$msg .= '[' . $connectidentifier . '] ';
}
$msg .= $scan_pattern;
$logger->info($msg);
}
}

@ -5,11 +5,11 @@ use strict;
use Tie::IxHash;
use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw(init_entry);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
copy_value
$HASH_TYPE
$SET_TYPE
$LIST_TYPE
@ -30,26 +30,32 @@ sub new {
my $class = shift;
my $type = shift;
my $self = bless {}, $class;
$type = 'default' unless $type;
$self->{key} = shift;
$type = '' unless $type;
$type = lc($type);
my $value;
if ($type eq 'set') {
if ($type eq $SET_TYPE) {
# a redis "set" is a perl hash with undetermined values.
$value = {};
} elsif ($type eq 'list') {
} elsif ($type eq $LIST_TYPE) {
# a redis "list" is an perl array.
$value = [];
} elsif ($type eq 'zset') {
} elsif ($type eq $ZSET_TYPE) {
# a redis "zset" is a perl hash with ordered keys and undetermined values.
my %value = ();
tie(%value, 'Tie::IxHash');
$value = \%value;
} elsif ($type eq 'hash') {
} elsif ($type eq $HASH_TYPE) {
# a redis "hash" is a perl hash.
$value = {};
} else { #($type eq 'string') {
$type = 'string';
} else {
# a redis "string" is a perl scalar.
$type = $STRING_TYPE;
$value = undef;
}
$self->{type} = $type;
$self->{value} = $value;
return init_entry($self,@_);
return _init_entry($self,@_);
}
@ -64,16 +70,21 @@ sub gettype {
return $self->{type};
}
sub getkey {
my $self = shift;
return $self->{key};
}
sub gethash {
my $self = shift;
my $fieldvalues;
if ($self->{type} eq 'set') {
if ($self->{type} eq $SET_TYPE) {
$fieldvalues = [ sort keys %{$self->{value}} ];
} elsif ($self->{type} eq 'list') {
} elsif ($self->{type} eq $LIST_TYPE) {
$fieldvalues = $self->{value};
} elsif ($self->{type} eq 'zset') {
} elsif ($self->{type} eq $ZSET_TYPE) {
$fieldvalues = [ keys %{$self->{value}} ];
} elsif ($self->{type} eq 'hash') {
} elsif ($self->{type} eq $HASH_TYPE) {
$fieldvalues = [ map { $self->{value}->{$_}; } sort keys %{$self->{value}} ];
} else { #($type eq 'string') {
$fieldvalues = [ $self->{value} ];
@ -81,4 +92,115 @@ sub gethash {
return get_rowhash($fieldvalues);
}
sub _init_entry {
my ($entry,$fieldnames) = @_;
if (defined $fieldnames) {
# if there are fieldnames defined, we make a member variable for each and set it to undef
foreach my $fieldname (@$fieldnames) {
$entry->{value}->{$fieldname} = undef;
}
}
return $entry;
}
sub copy_value {
my ($entry,$value,$fieldnames) = @_;
if (defined $entry) {
if (defined $value) {
if ($entry->{type} eq $SET_TYPE) {
if (ref $value eq 'ARRAY') {
%{$entry->{value}} = map { $_ => undef; } @$value;
} elsif (ref $value eq 'HASH') {
%{$entry->{value}} = map { $_ => undef; } %$value;
} elsif (ref $value eq ref $entry) {
die('redis type mismatch') if $entry->{type} ne $value->{type};
%{$entry->{value}} = %{$value->{value}};
} else {
$entry->{value} = { $value => undef, };
}
} elsif ($entry->{type} eq $LIST_TYPE) {
if (ref $value eq 'ARRAY') {
@{$entry->{value}} = @$value;
} elsif (ref $value eq 'HASH') {
@{$entry->{value}} = %$value;
} elsif (ref $value eq ref $entry) {
die('redis type mismatch') if $entry->{type} ne $value->{type};
@{$entry->{value}} = @{$value->{value}};
} else {
$entry->{value} = [ $value, ];
}
} elsif ($entry->{type} eq $ZSET_TYPE) {
my %value = ();
tie(%value, 'Tie::IxHash');
$entry->{value} = \%value;
if (ref $value eq 'ARRAY') {
map { $entry->{value}->Push($_ => undef); } @$value;
} elsif (ref $value eq 'HASH') {
map { $entry->{value}->Push($_ => undef); } %$value;
} elsif (ref $value eq ref $entry) {
die('redis type mismatch') if $entry->{type} ne $value->{type};
map { $entry->{value}->Push($_ => undef); } keys %{$value->{value}};
} else {
$entry->{value}->Push($value => undef);
}
} elsif ($entry->{type} eq $HASH_TYPE) {
my $i;
if (ref $value eq 'ARRAY') {
$i = 0;
} elsif (ref $value eq 'HASH') {
$i = -1;
} elsif (ref $value eq ref $entry) {
die('redis type mismatch') if $entry->{type} ne $value->{type};
$i = -2;
} else {
$i = -3;
}
foreach my $fieldname (@$fieldnames) {
if ($i >= 0) {
$entry->{value}->{$fieldname} = $value->[$i];
$i++;
} elsif ($i == -1) {
if (exists $value->{$fieldname}) {
$entry->{value}->{$fieldname} = $value->{$fieldname};
} elsif (exists $value->{uc($fieldname)}) {
$entry->{value}->{$fieldname} = $value->{uc($fieldname)};
} else {
$entry->{value}->{$fieldname} = undef;
}
} elsif ($i == -2) {
if (exists $value->{value}->{$fieldname}) {
$entry->{value}->{$fieldname} = $value->{value}->{$fieldname};
} elsif (exists $entry->{value}->{uc($fieldname)}) {
$entry->{value}->{$fieldname} = $value->{value}->{uc($fieldname)};
} else {
$entry->{value}->{$fieldname} = undef;
}
} else {
$entry->{value}->{$fieldname} = $value; #scalar
last;
}
}
} else { #($type eq 'string') {
if (ref $value eq 'ARRAY') {
$entry->{value} = $value->[0];
} elsif (ref $value eq 'HASH') {
my @keys = keys %$value; #Experimental shift on scalar is now forbidden at..
$entry->{value} = $value->{shift @keys};
} elsif (ref $value eq ref $entry) {
die('redis type mismatch') if $entry->{type} ne $value->{type};
$entry->{value} = $value->{value};
} else {
$entry->{value} = $value;
}
}
}
}
return $entry;
}
1;

@ -36,15 +36,14 @@ use NGCP::BulkProcessor::NoSqlConnectors::Redis qw(get_scan_args);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
init_entry
copy_value
process_entries
);
my $nosqlprocessing_threadqueuelength = 10;
my $reader_connection_name = 'reader';
#my $reader_connection_name = 'reader';
my $thread_sleep_secs = 0.1;
@ -52,118 +51,6 @@ my $RUNNING = 1;
my $COMPLETED = 2;
my $ERROR = 4;
sub init_entry {
my ($entry,$fieldnames) = @_;
if (defined $fieldnames) {
# if there are fieldnames defined, we make a member variable for each and set it to undef
foreach my $fieldname (@$fieldnames) {
$entry->{value}->{$fieldname} = undef;
}
}
return $entry;
}
sub copy_value {
my ($entry,$value,$fieldnames) = @_;
if (defined $entry) {
if (defined $value) {
if ($entry->{type} eq 'set') {
if (ref $value eq 'ARRAY') {
%{$entry->{value}} = map { $_ => undef; } @$value;
} elsif (ref $value eq 'HASH') {
%{$entry->{value}} = map { $_ => undef; } %$value;
} elsif (ref $value eq ref $entry) {
die('redis type mismatch') if $entry->{type} ne $value->{type};
%{$entry->{value}} = %{$value->{value}};
} else {
$entry->{value} = { $value => undef, };
}
} elsif ($entry->{type} eq 'list') {
if (ref $value eq 'ARRAY') {
@{$entry->{value}} = @$value;
} elsif (ref $value eq 'HASH') {
@{$entry->{value}} = %$value;
} elsif (ref $value eq ref $entry) {
die('redis type mismatch') if $entry->{type} ne $value->{type};
@{$entry->{value}} = @{$value->{value}};
} else {
$entry->{value} = [ $value, ];
}
} elsif ($entry->{type} eq 'zset') {
my %value = ();
tie(%value, 'Tie::IxHash');
$entry->{value} = \%value;
if (ref $value eq 'ARRAY') {
map { $entry->{value}->Push($_ => undef); } @$value;
} elsif (ref $value eq 'HASH') {
map { $entry->{value}->Push($_ => undef); } %$value;
} elsif (ref $value eq ref $entry) {
die('redis type mismatch') if $entry->{type} ne $value->{type};
map { $entry->{value}->Push($_ => undef); } keys %{$value->{value}};
} else {
$entry->{value}->Push($value => undef);
}
} elsif ($entry->{type} eq 'hash') {
my $i;
if (ref $value eq 'ARRAY') {
$i = 0;
} elsif (ref $value eq 'HASH') {
$i = -1;
} elsif (ref $value eq ref $entry) {
die('redis type mismatch') if $entry->{type} ne $value->{type};
$i = -2;
} else {
$i = -3;
}
foreach my $fieldname (@$fieldnames) {
if ($i >= 0) {
$entry->{value}->{$fieldname} = $value->[$i];
$i++;
} elsif ($i == -1) {
if (exists $value->{$fieldname}) {
$entry->{value}->{$fieldname} = $value->{$fieldname};
} elsif (exists $value->{uc($fieldname)}) {
$entry->{value}->{$fieldname} = $value->{uc($fieldname)};
} else {
$entry->{value}->{$fieldname} = undef;
}
} elsif ($i == -2) {
if (exists $value->{value}->{$fieldname}) {
$entry->{value}->{$fieldname} = $value->{value}->{$fieldname};
} elsif (exists $entry->{value}->{uc($fieldname)}) {
$entry->{value}->{$fieldname} = $value->{value}->{uc($fieldname)};
} else {
$entry->{value}->{$fieldname} = undef;
}
} else {
$entry->{value}->{$fieldname} = $value; #scalar
last;
}
}
} else { #($type eq 'string') {
if (ref $value eq 'ARRAY') {
$entry->{value} = $value->[0];
} elsif (ref $value eq 'HASH') {
my @keys = keys %$value; #Experimental shift on scalar is now forbidden at..
$entry->{value} = $value->{shift @keys};
} elsif (ref $value eq ref $entry) {
die('redis type mismatch') if $entry->{type} ne $value->{type};
$entry->{value} = $value->{value};
} else {
$entry->{value} = $value;
}
}
}
}
return $entry;
}
sub process_entries {
my %params = @_;
@ -175,7 +62,7 @@ sub process_entries {
$blocksize,
$init_process_context_code,
$uninit_process_context_code,
$destroy_reader_dbs_code,
$destroy_reader_stores_code,
$multithreading,
$nosqlprocessing_threads) = @params{qw/
get_store
@ -186,16 +73,14 @@ sub process_entries {
blocksize
init_process_context_code
uninit_process_context_code
destroy_reader_dbs_code
destroy_reader_stores_code
multithreading
nosqlprocessing_threads
/};
if (ref $get_store eq 'CODE') {
my $store = &$get_store($reader_connection_name,1);
nosqlprocessingstarted(&$get_store(),$scan_pattern,getlogger(__PACKAGE__));
nosqlprocessingstarted(&$get_store(undef,0),$scan_pattern,getlogger(__PACKAGE__));
my $errorstate = $RUNNING;
my $tid = threadid();
@ -211,7 +96,7 @@ sub process_entries {
nosqlthreadingdebug('shutting down connections ...',getlogger(__PACKAGE__));
$store->disconnect();
#$store->disconnect();
my $default_connection = &$get_store(undef,0);
my $default_connection_reconnect = $default_connection->is_connected();
$default_connection->disconnect();
@ -226,7 +111,7 @@ sub process_entries {
scan_pattern => $scan_pattern,
type => $type,
blocksize => $blocksize,
destroy_stores_code => $destroy_reader_dbs_code,
destroy_stores_code => $destroy_reader_stores_code,
});
for (my $i = 0; $i < $nosqlprocessing_threads; $i++) {
@ -315,10 +200,10 @@ sub process_entries {
}
if ($errorstate == $COMPLETED) {
nosqlprocessingdone(&$get_store(),$scan_pattern,getlogger(__PACKAGE__));
nosqlprocessingdone(&$get_store(undef,0),$scan_pattern,getlogger(__PACKAGE__));
return 1;
} else {
nosqlprocessingfailed(&$get_store(),$scan_pattern,getlogger(__PACKAGE__));
nosqlprocessingfailed(&$get_store(undef,0),$scan_pattern,getlogger(__PACKAGE__));
}
}

@ -6,9 +6,16 @@ use NGCP::BulkProcessor::NoSqlConnectors::Redis qw();
use NGCP::BulkProcessor::Redis::Trunk::location::entry qw(
get_entry
get_entry_by_ruid
process_keys
);
use NGCP::BulkProcessor::Redis::Trunk::location::usrdom qw(
get_usrdom
get_usrdom_by_username_domain
);
$NGCP::BulkProcessor::Globals::location_host = '192.168.0.146';
goto SKIP;
{
my $host = '192.168.0.146';
@ -28,16 +35,16 @@ goto SKIP;
#print join("\n",@keys);
}
SKIP:
#SKIP:
{
$NGCP::BulkProcessor::Globals::location_host = '192.168.0.146';
my $static_context = {
};
my $result = process_keys(
my $result = NGCP::BulkProcessor::Redis::Trunk::location::entry::process_keys(
process_code => sub {
my ($context,$records,$row_offset) = @_;
return 1;
return 0;
},
static_context => $static_context,
blocksize => 10000,
@ -55,10 +62,43 @@ SKIP:
}
#SKIP:
{
my $location = get_entry_by_ruid("x uloc-1-6007e9b2-302b-ce673");
$location = get_entry("x location:entry::uloc-1-6007e9b2-302b-ce673");
$location = get_entry_by_ruid("uloc-1-6007e9b2-302b-ce673");
$location = get_entry("location:entry::uloc-1-6007e9b2-302b-ce673");
}
SKIP:
{
my $location = get_entry_by_ruid();
my $static_context = {
};
my $result = NGCP::BulkProcessor::Redis::Trunk::location::usrdom::process_keys(
process_code => sub {
my ($context,$records,$row_offset) = @_;
#die();
print @$records . " done\n";
return 0;
},
static_context => $static_context,
blocksize => 10000,
init_process_context_code => sub {
my ($context)= @_;
},
uninit_process_context_code => sub {
my ($context)= @_;
destroy_stores();
},
multithreading => 1,
numofthreads => 4,
load_recursive => { _entries => 1, },
);
}
#destroy_stores();
exit;

@ -9,11 +9,12 @@ use NGCP::BulkProcessor::ConnectorPool qw(
);
use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw(
copy_value
process_entries
);
use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw();
use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw(
copy_value
);
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnectors::RedisEntry);
@ -27,7 +28,7 @@ my $get_store = \&get_location_store;
my $table = 'location:entry';
my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::HASH_TYPE;
my $get_key_from_ruid = sub {
my $get_key = sub {
my ($ruid) = @_;
return $table . '::' . $ruid;
};
@ -60,7 +61,7 @@ my $fieldnames = [
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,$fieldnames);
my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,shift,$fieldnames);
copy_value($self,shift,$fieldnames);
@ -73,7 +74,10 @@ sub get_entry {
my ($key,$load_recursive) = @_;
my $store = &$get_store();
return builditems_fromrows({ $store->hgetall($key) },$load_recursive);
if (length($key) and my %res = $store->hgetall($key)) {
return builditems_fromrows($key,\%res,$load_recursive);
}
return undef;
}
@ -82,28 +86,32 @@ sub get_entry_by_ruid {
my ($ruid,$load_recursive) = @_;
my $store = &$get_store();
return builditems_fromrows({ $store->hgetall(&$get_key_from_ruid($ruid)) },$load_recursive);
if ($ruid and my %res = $store->hgetall(my $key = &$get_key($ruid))) {
return builditems_fromrows($key,\%res,$load_recursive);
}
return undef;
}
sub builditems_fromrows {
my ($rows,$load_recursive) = @_;
my ($keys,$rows,$load_recursive) = @_;
my $item;
if (defined $rows and ref $rows eq 'ARRAY') {
my @items = ();
foreach my $row (@$rows) {
$item = __PACKAGE__->new($row);
$item = __PACKAGE__->new($keys->[scalar @items], $row);
# transformations go here ...
push @items,$item;
}
return \@items;
} elsif (defined $rows and ref $rows eq 'HASH') {
$item = __PACKAGE__->new($rows);
$item = __PACKAGE__->new($keys,$rows);
return $item;
}
return undef;
@ -133,11 +141,11 @@ sub process_keys {
return process_entries(
get_store => $get_store,
scan_pattern => &$get_key_from_ruid('*'),
scan_pattern => &$get_key('*'),
type => $type,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,builditems_fromrows([
return &$process_code($context,builditems_fromrows(\@$rowblock,[
map { { &$get_store()->hgetall($_) }; } @$rowblock
],$load_recursive),$row_offset);
},

@ -0,0 +1,160 @@
package NGCP::BulkProcessor::Redis::Trunk::location::usrdom;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_location_store
destroy_stores
);
use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw(
process_entries
);
use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw(
copy_value
);
use NGCP::BulkProcessor::Redis::Trunk::location::entry qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnectors::RedisEntry);
our @EXPORT_OK = qw(
get_usrdom
get_usrdom_by_username_domain
process_keys
);
my $get_store = \&get_location_store;
my $table = 'location:usrdom';
my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::SET_TYPE;
my $get_key = sub {
my ($username,$domain) = @_;
my $result = $table . '::' . $username;
$result .= ':' . $domain if $domain;
return $result;
};
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,shift);
copy_value($self,shift);
return $self;
}
sub get_usrdom {
my ($key,$load_recursive) = @_;
my $store = &$get_store();
if (length($key) and my @res = $store->smembers($key)) {
return builditems_fromrows($key,\@res,$load_recursive);
}
return undef;
}
sub get_usrdom_by_username_domain {
my ($username,$domain,$load_recursive) = @_;
my $store = &$get_store();
if ($username and $domain and my @res = $store->smembers(my $key = &$get_key($username,$domain))) {
return builditems_fromrows($key,\@res,$load_recursive);
}
return undef;
}
sub builditems_fromrows {
my ($keys,$rows,$load_recursive) = @_;
my $item;
if (defined $keys and ref $keys eq 'ARRAY') {
my @items = ();
foreach my $key (@$keys) {
$item = __PACKAGE__->new($key, $rows->[scalar @items]);
transformitem($item,$load_recursive);
push @items,$item;
}
return \@items;
} else {
$item = __PACKAGE__->new($keys,$rows);
transformitem($item,$load_recursive);
return $item;
}
}
sub transformitem {
my ($item,$load_recursive) = @_;
# transformations go here ...
if ($load_recursive) {
$load_recursive = {} unless ref $load_recursive;
my $field = "_entries";
if ($load_recursive->{$field}) {
my @entries = ();
foreach my $element (keys %{$item->getvalue()}) {
push(@entries,NGCP::BulkProcessor::Redis::Trunk::location::entry::get_entry($element,$load_recursive));
}
$item->{$field} = \@entries;
}
}
}
sub process_keys {
my %params = @_;
my ($process_code,
$static_context,
$blocksize,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads,
$load_recursive) = @params{qw/
process_code
static_context
blocksize
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
load_recursive
/};
return process_entries(
get_store => $get_store,
scan_pattern => &$get_key('*'),
type => $type,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,builditems_fromrows(\@$rowblock,[
map { [ &$get_store()->smembers($_) ]; } @$rowblock
],$load_recursive),$row_offset);
},
static_context => $static_context,
blocksize => $blocksize,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_stores_code => \&destroy_stores,
multithreading => $multithreading,
nosqlprocessing_threads => $numofthreads,
);
}
1;

@ -65,5 +65,5 @@ doneemailrecipient =
##logging:
fileloglevel = OFF
screenloglevel = INFO
screenloglevel = OFF
emailloglevel = OFF

Loading…
Cancel
Save