From 0a285e166576b3f475ff9f345cca77bb0d96cecf Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Wed, 3 Feb 2021 20:23:13 +0100 Subject: [PATCH] TT#108605 redis "location" store schema #2 Change-Id: Id1195d5c0aa57015a0680f849df5c92af40dbd85 (cherry picked from commit 20b5920a7ad9d55ef251b2157252329cc396291e) --- lib/NGCP/BulkProcessor/LogError.pm | 11 +- lib/NGCP/BulkProcessor/Logging.pm | 18 +- .../NoSqlConnectors/RedisEntry.pm | 150 ++++++++++++++-- .../NoSqlConnectors/RedisProcessor.pm | 133 +-------------- .../Massive/RegistrationMonitoring/Redis.t | 52 +++++- .../Redis/Trunk/location/entry.pm | 30 ++-- .../Redis/Trunk/location/usrdom.pm | 160 ++++++++++++++++++ lib/NGCP/BulkProcessor/default.cfg | 2 +- 8 files changed, 395 insertions(+), 161 deletions(-) create mode 100644 lib/NGCP/BulkProcessor/Redis/Trunk/location/usrdom.pm diff --git a/lib/NGCP/BulkProcessor/LogError.pm b/lib/NGCP/BulkProcessor/LogError.pm index b1fe24f..837f1c2 100644 --- a/lib/NGCP/BulkProcessor/LogError.pm +++ b/lib/NGCP/BulkProcessor/LogError.pm @@ -346,11 +346,16 @@ sub nosqlwarn { sub nosqlprocessingfailed { my ($store,$scan_pattern,$logger) = @_; - my $message = 'keystore processing failed: [' . $store->connectidentifier() . '] ' . $scan_pattern; + my $msg = 'keystore processing failed: '; + my $connectidentifier = $store->connectidentifier(); + if ($connectidentifier) { + $msg .= '[' . $connectidentifier . '] '; + } + $msg .= $scan_pattern; if (defined $logger) { - $logger->error($message); + $logger->error($msg); } - terminate($message, $logger); + terminate($msg, $logger); } diff --git a/lib/NGCP/BulkProcessor/Logging.pm b/lib/NGCP/BulkProcessor/Logging.pm index 950ed87..d0d2dce 100644 --- a/lib/NGCP/BulkProcessor/Logging.pm +++ b/lib/NGCP/BulkProcessor/Logging.pm @@ -823,7 +823,14 @@ sub nosqlprocessingstarted { my ($store,$scan_pattern,$logger) = @_; if (defined $logger) { - $logger->info('keystore processing started: [' . $store->connectidentifier() . '] ' . $scan_pattern); + my $msg = 'keystore processing started: '; + my $connectidentifier = $store->connectidentifier(); + if ($connectidentifier) { + $msg .= '[' . $connectidentifier . '] '; + } + $msg .= $scan_pattern; + + $logger->info($msg); } } @@ -832,7 +839,14 @@ sub nosqlprocessingdone { my ($store,$scan_pattern,$logger) = @_; if (defined $logger) { - $logger->info('keystore processing done: [' . $store->connectidentifier() . '] ' . $scan_pattern); + my $msg = 'keystore processing done: '; + my $connectidentifier = $store->connectidentifier(); + if ($connectidentifier) { + $msg .= '[' . $connectidentifier . '] '; + } + $msg .= $scan_pattern; + + $logger->info($msg); } } diff --git a/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisEntry.pm b/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisEntry.pm index 7ce58a0..d1fa77a 100644 --- a/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisEntry.pm +++ b/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisEntry.pm @@ -5,11 +5,11 @@ use strict; use Tie::IxHash; -use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw(init_entry); - require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( + copy_value + $HASH_TYPE $SET_TYPE $LIST_TYPE @@ -30,26 +30,32 @@ sub new { my $class = shift; my $type = shift; my $self = bless {}, $class; - $type = 'default' unless $type; + $self->{key} = shift; + $type = '' unless $type; $type = lc($type); my $value; - if ($type eq 'set') { + if ($type eq $SET_TYPE) { + # a redis "set" is a perl hash with undetermined values. $value = {}; - } elsif ($type eq 'list') { + } elsif ($type eq $LIST_TYPE) { + # a redis "list" is an perl array. $value = []; - } elsif ($type eq 'zset') { + } elsif ($type eq $ZSET_TYPE) { + # a redis "zset" is a perl hash with ordered keys and undetermined values. my %value = (); tie(%value, 'Tie::IxHash'); $value = \%value; - } elsif ($type eq 'hash') { + } elsif ($type eq $HASH_TYPE) { + # a redis "hash" is a perl hash. $value = {}; - } else { #($type eq 'string') { - $type = 'string'; + } else { + # a redis "string" is a perl scalar. + $type = $STRING_TYPE; $value = undef; } $self->{type} = $type; $self->{value} = $value; - return init_entry($self,@_); + return _init_entry($self,@_); } @@ -64,16 +70,21 @@ sub gettype { return $self->{type}; } +sub getkey { + my $self = shift; + return $self->{key}; +} + sub gethash { my $self = shift; my $fieldvalues; - if ($self->{type} eq 'set') { + if ($self->{type} eq $SET_TYPE) { $fieldvalues = [ sort keys %{$self->{value}} ]; - } elsif ($self->{type} eq 'list') { + } elsif ($self->{type} eq $LIST_TYPE) { $fieldvalues = $self->{value}; - } elsif ($self->{type} eq 'zset') { + } elsif ($self->{type} eq $ZSET_TYPE) { $fieldvalues = [ keys %{$self->{value}} ]; - } elsif ($self->{type} eq 'hash') { + } elsif ($self->{type} eq $HASH_TYPE) { $fieldvalues = [ map { $self->{value}->{$_}; } sort keys %{$self->{value}} ]; } else { #($type eq 'string') { $fieldvalues = [ $self->{value} ]; @@ -81,4 +92,115 @@ sub gethash { return get_rowhash($fieldvalues); } +sub _init_entry { + + my ($entry,$fieldnames) = @_; + + if (defined $fieldnames) { + # if there are fieldnames defined, we make a member variable for each and set it to undef + foreach my $fieldname (@$fieldnames) { + $entry->{value}->{$fieldname} = undef; + } + } + + return $entry; + +} + +sub copy_value { + my ($entry,$value,$fieldnames) = @_; + if (defined $entry) { + if (defined $value) { + if ($entry->{type} eq $SET_TYPE) { + if (ref $value eq 'ARRAY') { + %{$entry->{value}} = map { $_ => undef; } @$value; + } elsif (ref $value eq 'HASH') { + %{$entry->{value}} = map { $_ => undef; } %$value; + } elsif (ref $value eq ref $entry) { + die('redis type mismatch') if $entry->{type} ne $value->{type}; + %{$entry->{value}} = %{$value->{value}}; + } else { + $entry->{value} = { $value => undef, }; + } + } elsif ($entry->{type} eq $LIST_TYPE) { + if (ref $value eq 'ARRAY') { + @{$entry->{value}} = @$value; + } elsif (ref $value eq 'HASH') { + @{$entry->{value}} = %$value; + } elsif (ref $value eq ref $entry) { + die('redis type mismatch') if $entry->{type} ne $value->{type}; + @{$entry->{value}} = @{$value->{value}}; + } else { + $entry->{value} = [ $value, ]; + } + } elsif ($entry->{type} eq $ZSET_TYPE) { + my %value = (); + tie(%value, 'Tie::IxHash'); + $entry->{value} = \%value; + if (ref $value eq 'ARRAY') { + map { $entry->{value}->Push($_ => undef); } @$value; + } elsif (ref $value eq 'HASH') { + map { $entry->{value}->Push($_ => undef); } %$value; + } elsif (ref $value eq ref $entry) { + die('redis type mismatch') if $entry->{type} ne $value->{type}; + map { $entry->{value}->Push($_ => undef); } keys %{$value->{value}}; + } else { + $entry->{value}->Push($value => undef); + } + } elsif ($entry->{type} eq $HASH_TYPE) { + my $i; + if (ref $value eq 'ARRAY') { + $i = 0; + } elsif (ref $value eq 'HASH') { + $i = -1; + } elsif (ref $value eq ref $entry) { + die('redis type mismatch') if $entry->{type} ne $value->{type}; + $i = -2; + } else { + $i = -3; + } + foreach my $fieldname (@$fieldnames) { + if ($i >= 0) { + $entry->{value}->{$fieldname} = $value->[$i]; + $i++; + } elsif ($i == -1) { + if (exists $value->{$fieldname}) { + $entry->{value}->{$fieldname} = $value->{$fieldname}; + } elsif (exists $value->{uc($fieldname)}) { + $entry->{value}->{$fieldname} = $value->{uc($fieldname)}; + } else { + $entry->{value}->{$fieldname} = undef; + } + } elsif ($i == -2) { + if (exists $value->{value}->{$fieldname}) { + $entry->{value}->{$fieldname} = $value->{value}->{$fieldname}; + } elsif (exists $entry->{value}->{uc($fieldname)}) { + $entry->{value}->{$fieldname} = $value->{value}->{uc($fieldname)}; + } else { + $entry->{value}->{$fieldname} = undef; + } + } else { + $entry->{value}->{$fieldname} = $value; #scalar + last; + } + } + } else { #($type eq 'string') { + if (ref $value eq 'ARRAY') { + $entry->{value} = $value->[0]; + } elsif (ref $value eq 'HASH') { + my @keys = keys %$value; #Experimental shift on scalar is now forbidden at.. + $entry->{value} = $value->{shift @keys}; + } elsif (ref $value eq ref $entry) { + die('redis type mismatch') if $entry->{type} ne $value->{type}; + $entry->{value} = $value->{value}; + } else { + $entry->{value} = $value; + } + } + } + + } + return $entry; +} + 1; diff --git a/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm b/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm index 8081276..b16867d 100644 --- a/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm +++ b/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm @@ -36,15 +36,14 @@ use NGCP::BulkProcessor::NoSqlConnectors::Redis qw(get_scan_args); require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( - init_entry - copy_value + process_entries ); my $nosqlprocessing_threadqueuelength = 10; -my $reader_connection_name = 'reader'; +#my $reader_connection_name = 'reader'; my $thread_sleep_secs = 0.1; @@ -52,118 +51,6 @@ my $RUNNING = 1; my $COMPLETED = 2; my $ERROR = 4; - -sub init_entry { - - my ($entry,$fieldnames) = @_; - - if (defined $fieldnames) { - # if there are fieldnames defined, we make a member variable for each and set it to undef - foreach my $fieldname (@$fieldnames) { - $entry->{value}->{$fieldname} = undef; - } - } - - return $entry; - -} - -sub copy_value { - my ($entry,$value,$fieldnames) = @_; - if (defined $entry) { - if (defined $value) { - if ($entry->{type} eq 'set') { - if (ref $value eq 'ARRAY') { - %{$entry->{value}} = map { $_ => undef; } @$value; - } elsif (ref $value eq 'HASH') { - %{$entry->{value}} = map { $_ => undef; } %$value; - } elsif (ref $value eq ref $entry) { - die('redis type mismatch') if $entry->{type} ne $value->{type}; - %{$entry->{value}} = %{$value->{value}}; - } else { - $entry->{value} = { $value => undef, }; - } - } elsif ($entry->{type} eq 'list') { - if (ref $value eq 'ARRAY') { - @{$entry->{value}} = @$value; - } elsif (ref $value eq 'HASH') { - @{$entry->{value}} = %$value; - } elsif (ref $value eq ref $entry) { - die('redis type mismatch') if $entry->{type} ne $value->{type}; - @{$entry->{value}} = @{$value->{value}}; - } else { - $entry->{value} = [ $value, ]; - } - } elsif ($entry->{type} eq 'zset') { - my %value = (); - tie(%value, 'Tie::IxHash'); - $entry->{value} = \%value; - if (ref $value eq 'ARRAY') { - map { $entry->{value}->Push($_ => undef); } @$value; - } elsif (ref $value eq 'HASH') { - map { $entry->{value}->Push($_ => undef); } %$value; - } elsif (ref $value eq ref $entry) { - die('redis type mismatch') if $entry->{type} ne $value->{type}; - map { $entry->{value}->Push($_ => undef); } keys %{$value->{value}}; - } else { - $entry->{value}->Push($value => undef); - } - } elsif ($entry->{type} eq 'hash') { - my $i; - if (ref $value eq 'ARRAY') { - $i = 0; - } elsif (ref $value eq 'HASH') { - $i = -1; - } elsif (ref $value eq ref $entry) { - die('redis type mismatch') if $entry->{type} ne $value->{type}; - $i = -2; - } else { - $i = -3; - } - foreach my $fieldname (@$fieldnames) { - if ($i >= 0) { - $entry->{value}->{$fieldname} = $value->[$i]; - $i++; - } elsif ($i == -1) { - if (exists $value->{$fieldname}) { - $entry->{value}->{$fieldname} = $value->{$fieldname}; - } elsif (exists $value->{uc($fieldname)}) { - $entry->{value}->{$fieldname} = $value->{uc($fieldname)}; - } else { - $entry->{value}->{$fieldname} = undef; - } - } elsif ($i == -2) { - if (exists $value->{value}->{$fieldname}) { - $entry->{value}->{$fieldname} = $value->{value}->{$fieldname}; - } elsif (exists $entry->{value}->{uc($fieldname)}) { - $entry->{value}->{$fieldname} = $value->{value}->{uc($fieldname)}; - } else { - $entry->{value}->{$fieldname} = undef; - } - } else { - $entry->{value}->{$fieldname} = $value; #scalar - last; - } - } - } else { #($type eq 'string') { - if (ref $value eq 'ARRAY') { - $entry->{value} = $value->[0]; - } elsif (ref $value eq 'HASH') { - my @keys = keys %$value; #Experimental shift on scalar is now forbidden at.. - $entry->{value} = $value->{shift @keys}; - } elsif (ref $value eq ref $entry) { - die('redis type mismatch') if $entry->{type} ne $value->{type}; - $entry->{value} = $value->{value}; - } else { - $entry->{value} = $value; - } - } - } - - } - return $entry; -} - sub process_entries { my %params = @_; @@ -175,7 +62,7 @@ sub process_entries { $blocksize, $init_process_context_code, $uninit_process_context_code, - $destroy_reader_dbs_code, + $destroy_reader_stores_code, $multithreading, $nosqlprocessing_threads) = @params{qw/ get_store @@ -186,16 +73,14 @@ sub process_entries { blocksize init_process_context_code uninit_process_context_code - destroy_reader_dbs_code + destroy_reader_stores_code multithreading nosqlprocessing_threads /}; if (ref $get_store eq 'CODE') { - my $store = &$get_store($reader_connection_name,1); - - nosqlprocessingstarted(&$get_store(),$scan_pattern,getlogger(__PACKAGE__)); + nosqlprocessingstarted(&$get_store(undef,0),$scan_pattern,getlogger(__PACKAGE__)); my $errorstate = $RUNNING; my $tid = threadid(); @@ -211,7 +96,7 @@ sub process_entries { nosqlthreadingdebug('shutting down connections ...',getlogger(__PACKAGE__)); - $store->disconnect(); + #$store->disconnect(); my $default_connection = &$get_store(undef,0); my $default_connection_reconnect = $default_connection->is_connected(); $default_connection->disconnect(); @@ -226,7 +111,7 @@ sub process_entries { scan_pattern => $scan_pattern, type => $type, blocksize => $blocksize, - destroy_stores_code => $destroy_reader_dbs_code, + destroy_stores_code => $destroy_reader_stores_code, }); for (my $i = 0; $i < $nosqlprocessing_threads; $i++) { @@ -315,10 +200,10 @@ sub process_entries { } if ($errorstate == $COMPLETED) { - nosqlprocessingdone(&$get_store(),$scan_pattern,getlogger(__PACKAGE__)); + nosqlprocessingdone(&$get_store(undef,0),$scan_pattern,getlogger(__PACKAGE__)); return 1; } else { - nosqlprocessingfailed(&$get_store(),$scan_pattern,getlogger(__PACKAGE__)); + nosqlprocessingfailed(&$get_store(undef,0),$scan_pattern,getlogger(__PACKAGE__)); } } diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t index e253115..dfb2064 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t @@ -6,9 +6,16 @@ use NGCP::BulkProcessor::NoSqlConnectors::Redis qw(); use NGCP::BulkProcessor::Redis::Trunk::location::entry qw( get_entry get_entry_by_ruid - process_keys + +); +use NGCP::BulkProcessor::Redis::Trunk::location::usrdom qw( + get_usrdom + get_usrdom_by_username_domain + ); +$NGCP::BulkProcessor::Globals::location_host = '192.168.0.146'; + goto SKIP; { my $host = '192.168.0.146'; @@ -28,16 +35,16 @@ goto SKIP; #print join("\n",@keys); } -SKIP: +#SKIP: { - $NGCP::BulkProcessor::Globals::location_host = '192.168.0.146'; + my $static_context = { }; - my $result = process_keys( + my $result = NGCP::BulkProcessor::Redis::Trunk::location::entry::process_keys( process_code => sub { my ($context,$records,$row_offset) = @_; - return 1; + return 0; }, static_context => $static_context, blocksize => 10000, @@ -55,10 +62,43 @@ SKIP: } +#SKIP: +{ + my $location = get_entry_by_ruid("x uloc-1-6007e9b2-302b-ce673"); + $location = get_entry("x location:entry::uloc-1-6007e9b2-302b-ce673"); + $location = get_entry_by_ruid("uloc-1-6007e9b2-302b-ce673"); + $location = get_entry("location:entry::uloc-1-6007e9b2-302b-ce673"); +} + SKIP: { - my $location = get_entry_by_ruid(); + + my $static_context = { + + }; + my $result = NGCP::BulkProcessor::Redis::Trunk::location::usrdom::process_keys( + process_code => sub { + my ($context,$records,$row_offset) = @_; + #die(); + print @$records . " done\n"; + return 0; + }, + static_context => $static_context, + blocksize => 10000, + init_process_context_code => sub { + my ($context)= @_; + }, + uninit_process_context_code => sub { + my ($context)= @_; + destroy_stores(); + }, + multithreading => 1, + numofthreads => 4, + load_recursive => { _entries => 1, }, + ); + } +#destroy_stores(); exit; diff --git a/lib/NGCP/BulkProcessor/Redis/Trunk/location/entry.pm b/lib/NGCP/BulkProcessor/Redis/Trunk/location/entry.pm index 9425548..542ed1f 100644 --- a/lib/NGCP/BulkProcessor/Redis/Trunk/location/entry.pm +++ b/lib/NGCP/BulkProcessor/Redis/Trunk/location/entry.pm @@ -9,11 +9,12 @@ use NGCP::BulkProcessor::ConnectorPool qw( ); use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw( - copy_value process_entries ); -use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw(); +use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw( + copy_value +); require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnectors::RedisEntry); @@ -27,7 +28,7 @@ my $get_store = \&get_location_store; my $table = 'location:entry'; my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::HASH_TYPE; -my $get_key_from_ruid = sub { +my $get_key = sub { my ($ruid) = @_; return $table . '::' . $ruid; }; @@ -60,7 +61,7 @@ my $fieldnames = [ sub new { my $class = shift; - my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,$fieldnames); + my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,shift,$fieldnames); copy_value($self,shift,$fieldnames); @@ -73,7 +74,10 @@ sub get_entry { my ($key,$load_recursive) = @_; my $store = &$get_store(); - return builditems_fromrows({ $store->hgetall($key) },$load_recursive); + if (length($key) and my %res = $store->hgetall($key)) { + return builditems_fromrows($key,\%res,$load_recursive); + } + return undef; } @@ -82,28 +86,32 @@ sub get_entry_by_ruid { my ($ruid,$load_recursive) = @_; my $store = &$get_store(); - return builditems_fromrows({ $store->hgetall(&$get_key_from_ruid($ruid)) },$load_recursive); + if ($ruid and my %res = $store->hgetall(my $key = &$get_key($ruid))) { + return builditems_fromrows($key,\%res,$load_recursive); + } + return undef; } sub builditems_fromrows { - my ($rows,$load_recursive) = @_; + my ($keys,$rows,$load_recursive) = @_; my $item; if (defined $rows and ref $rows eq 'ARRAY') { my @items = (); foreach my $row (@$rows) { - $item = __PACKAGE__->new($row); + $item = __PACKAGE__->new($keys->[scalar @items], $row); # transformations go here ... push @items,$item; + } return \@items; } elsif (defined $rows and ref $rows eq 'HASH') { - $item = __PACKAGE__->new($rows); + $item = __PACKAGE__->new($keys,$rows); return $item; } return undef; @@ -133,11 +141,11 @@ sub process_keys { return process_entries( get_store => $get_store, - scan_pattern => &$get_key_from_ruid('*'), + scan_pattern => &$get_key('*'), type => $type, process_code => sub { my ($context,$rowblock,$row_offset) = @_; - return &$process_code($context,builditems_fromrows([ + return &$process_code($context,builditems_fromrows(\@$rowblock,[ map { { &$get_store()->hgetall($_) }; } @$rowblock ],$load_recursive),$row_offset); }, diff --git a/lib/NGCP/BulkProcessor/Redis/Trunk/location/usrdom.pm b/lib/NGCP/BulkProcessor/Redis/Trunk/location/usrdom.pm new file mode 100644 index 0000000..bf911ee --- /dev/null +++ b/lib/NGCP/BulkProcessor/Redis/Trunk/location/usrdom.pm @@ -0,0 +1,160 @@ +package NGCP::BulkProcessor::Redis::Trunk::location::usrdom; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_location_store + destroy_stores +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw( + process_entries +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw( + copy_value +); + +use NGCP::BulkProcessor::Redis::Trunk::location::entry qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnectors::RedisEntry); +our @EXPORT_OK = qw( + get_usrdom + get_usrdom_by_username_domain + process_keys +); + +my $get_store = \&get_location_store; + +my $table = 'location:usrdom'; +my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::SET_TYPE; +my $get_key = sub { + my ($username,$domain) = @_; + my $result = $table . '::' . $username; + $result .= ':' . $domain if $domain; + return $result; +}; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,shift); + + copy_value($self,shift); + + return $self; + +} + +sub get_usrdom { + + my ($key,$load_recursive) = @_; + my $store = &$get_store(); + + if (length($key) and my @res = $store->smembers($key)) { + return builditems_fromrows($key,\@res,$load_recursive); + } + return undef; + +} + +sub get_usrdom_by_username_domain { + + my ($username,$domain,$load_recursive) = @_; + my $store = &$get_store(); + + if ($username and $domain and my @res = $store->smembers(my $key = &$get_key($username,$domain))) { + return builditems_fromrows($key,\@res,$load_recursive); + } + return undef; + +} + +sub builditems_fromrows { + + my ($keys,$rows,$load_recursive) = @_; + + my $item; + + if (defined $keys and ref $keys eq 'ARRAY') { + my @items = (); + foreach my $key (@$keys) { + $item = __PACKAGE__->new($key, $rows->[scalar @items]); + + transformitem($item,$load_recursive); + + push @items,$item; + + } + return \@items; + } else { + $item = __PACKAGE__->new($keys,$rows); + transformitem($item,$load_recursive); + return $item; + } + +} + +sub transformitem { + my ($item,$load_recursive) = @_; + + # transformations go here ... + if ($load_recursive) { + $load_recursive = {} unless ref $load_recursive; + my $field = "_entries"; + if ($load_recursive->{$field}) { + my @entries = (); + foreach my $element (keys %{$item->getvalue()}) { + push(@entries,NGCP::BulkProcessor::Redis::Trunk::location::entry::get_entry($element,$load_recursive)); + } + $item->{$field} = \@entries; + } + } + +} + +sub process_keys { + + my %params = @_; + my ($process_code, + $static_context, + $blocksize, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + blocksize + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + return process_entries( + get_store => $get_store, + scan_pattern => &$get_key('*'), + type => $type, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,builditems_fromrows(\@$rowblock,[ + map { [ &$get_store()->smembers($_) ]; } @$rowblock + ],$load_recursive),$row_offset); + }, + static_context => $static_context, + blocksize => $blocksize, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_stores_code => \&destroy_stores, + multithreading => $multithreading, + nosqlprocessing_threads => $numofthreads, + ); +} + +1; + diff --git a/lib/NGCP/BulkProcessor/default.cfg b/lib/NGCP/BulkProcessor/default.cfg index 6222f5e..c7f1561 100644 --- a/lib/NGCP/BulkProcessor/default.cfg +++ b/lib/NGCP/BulkProcessor/default.cfg @@ -65,5 +65,5 @@ doneemailrecipient = ##logging: fileloglevel = OFF -screenloglevel = INFO +screenloglevel = OFF emailloglevel = OFF