From 02cfe823514b82515dfec9a88c3d2df388b004cf Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Fri, 2 Apr 2021 16:05:18 +0200 Subject: [PATCH] TT#118151 mr7.5.5 redis "location" db DAO Change-Id: I5b7e919cb686031da7ba08bd7a0f4237fcd309a7 --- .../Massive/RegistrationMonitoring/Redis.t | 4 +- .../Redis/Trunk/location/usrdom.pm | 2 +- .../Redis/mr65/location/entry.pm | 163 ++++++++++++++++++ .../Redis/mr65/location/usrdom.pm | 160 +++++++++++++++++ .../Redis/mr755/location/entry.pm | 163 ++++++++++++++++++ .../Redis/mr755/location/usrdom.pm | 160 +++++++++++++++++ 6 files changed, 649 insertions(+), 3 deletions(-) create mode 100644 lib/NGCP/BulkProcessor/Redis/mr65/location/entry.pm create mode 100644 lib/NGCP/BulkProcessor/Redis/mr65/location/usrdom.pm create mode 100644 lib/NGCP/BulkProcessor/Redis/mr755/location/entry.pm create mode 100644 lib/NGCP/BulkProcessor/Redis/mr755/location/usrdom.pm diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t index 995d2a5..178ce7b 100644 --- a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t @@ -14,7 +14,7 @@ use NGCP::BulkProcessor::Redis::Trunk::location::usrdom qw( ); -$NGCP::BulkProcessor::Globals::location_host = '192.168.0.146'; +$NGCP::BulkProcessor::Globals::location_host = '192.168.0.180'; goto SKIP; { @@ -82,7 +82,7 @@ SKIP: #die(); print join("\n", map { my $key = $_->getkey(); - $key =~ s/location\:usrdom\:\://; + $key =~ s/1:location\:usrdom\:\://; $key =~ s/\:/;/; $key; } @$records); diff --git a/lib/NGCP/BulkProcessor/Redis/Trunk/location/usrdom.pm b/lib/NGCP/BulkProcessor/Redis/Trunk/location/usrdom.pm index bf911ee..3d22684 100644 --- a/lib/NGCP/BulkProcessor/Redis/Trunk/location/usrdom.pm +++ b/lib/NGCP/BulkProcessor/Redis/Trunk/location/usrdom.pm @@ -28,7 +28,7 @@ our @EXPORT_OK = qw( my $get_store = \&get_location_store; -my $table = 'location:usrdom'; +my $table = '1:location:usrdom'; my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::SET_TYPE; my $get_key = sub { my ($username,$domain) = @_; diff --git a/lib/NGCP/BulkProcessor/Redis/mr65/location/entry.pm b/lib/NGCP/BulkProcessor/Redis/mr65/location/entry.pm new file mode 100644 index 0000000..5e9510a --- /dev/null +++ b/lib/NGCP/BulkProcessor/Redis/mr65/location/entry.pm @@ -0,0 +1,163 @@ +package NGCP::BulkProcessor::Redis::mr65::location::entry; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_location_store + destroy_stores +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw( + process_entries +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw( + copy_value +); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnectors::RedisEntry); +our @EXPORT_OK = qw( + get_entry + get_entry_by_ruid + process_keys +); + +my $get_store = \&get_location_store; + +my $table = 'location:entry'; +my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::HASH_TYPE; +my $get_key = sub { + my ($ruid) = @_; + return $table . '::' . $ruid; +}; + +my $fieldnames = [ + 'instance', + 'domain', + 'cseq', + 'partition', + 'ruid', + 'connection_id', + 'username', + 'keepalive', + 'path', + 'reg_id', + 'contact', + 'flags', + 'received', + 'callid', + 'socket', + 'cflags', + 'expires', + 'methods', + 'user_agent', + 'q', + 'last_modified', + 'server_id', +]; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,shift,$fieldnames); + + copy_value($self,shift,$fieldnames); + + return $self; + +} + +sub get_entry { + + my ($key,$load_recursive) = @_; + my $store = &$get_store(); + + if (length($key) and my %res = $store->hgetall($key)) { + return builditems_fromrows($key,\%res,$load_recursive); + } + return undef; + +} + +sub get_entry_by_ruid { + + my ($ruid,$load_recursive) = @_; + my $store = &$get_store(); + + if ($ruid and my %res = $store->hgetall(my $key = &$get_key($ruid))) { + return builditems_fromrows($key,\%res,$load_recursive); + } + return undef; + +} + +sub builditems_fromrows { + + my ($keys,$rows,$load_recursive) = @_; + + my $item; + + if (defined $rows and ref $rows eq 'ARRAY') { + my @items = (); + foreach my $row (@$rows) { + $item = __PACKAGE__->new($keys->[scalar @items], $row); + + # transformations go here ... + + push @items,$item; + + } + return \@items; + } elsif (defined $rows and ref $rows eq 'HASH') { + $item = __PACKAGE__->new($keys,$rows); + return $item; + } + return undef; + +} + +sub process_keys { + + my %params = @_; + my ($process_code, + $static_context, + $blocksize, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + blocksize + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + return process_entries( + get_store => $get_store, + scan_pattern => &$get_key('*'), + type => $type, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,builditems_fromrows(\@$rowblock,[ + map { { &$get_store()->hgetall($_) }; } @$rowblock + ],$load_recursive),$row_offset); + }, + static_context => $static_context, + blocksize => $blocksize, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_stores_code => \&destroy_stores, + multithreading => $multithreading, + nosqlprocessing_threads => $numofthreads, + ); +} + +1; + diff --git a/lib/NGCP/BulkProcessor/Redis/mr65/location/usrdom.pm b/lib/NGCP/BulkProcessor/Redis/mr65/location/usrdom.pm new file mode 100644 index 0000000..8789eae --- /dev/null +++ b/lib/NGCP/BulkProcessor/Redis/mr65/location/usrdom.pm @@ -0,0 +1,160 @@ +package NGCP::BulkProcessor::Redis::mr65::location::usrdom; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_location_store + destroy_stores +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw( + process_entries +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw( + copy_value +); + +use NGCP::BulkProcessor::Redis::mr65::location::entry qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnectors::RedisEntry); +our @EXPORT_OK = qw( + get_usrdom + get_usrdom_by_username_domain + process_keys +); + +my $get_store = \&get_location_store; + +my $table = 'location:usrdom'; +my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::SET_TYPE; +my $get_key = sub { + my ($username,$domain) = @_; + my $result = $table . '::' . $username; + $result .= ':' . $domain if $domain; + return $result; +}; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,shift); + + copy_value($self,shift); + + return $self; + +} + +sub get_usrdom { + + my ($key,$load_recursive) = @_; + my $store = &$get_store(); + + if (length($key) and my @res = $store->smembers($key)) { + return builditems_fromrows($key,\@res,$load_recursive); + } + return undef; + +} + +sub get_usrdom_by_username_domain { + + my ($username,$domain,$load_recursive) = @_; + my $store = &$get_store(); + + if ($username and $domain and my @res = $store->smembers(my $key = &$get_key($username,$domain))) { + return builditems_fromrows($key,\@res,$load_recursive); + } + return undef; + +} + +sub builditems_fromrows { + + my ($keys,$rows,$load_recursive) = @_; + + my $item; + + if (defined $keys and ref $keys eq 'ARRAY') { + my @items = (); + foreach my $key (@$keys) { + $item = __PACKAGE__->new($key, $rows->[scalar @items]); + + transformitem($item,$load_recursive); + + push @items,$item; + + } + return \@items; + } else { + $item = __PACKAGE__->new($keys,$rows); + transformitem($item,$load_recursive); + return $item; + } + +} + +sub transformitem { + my ($item,$load_recursive) = @_; + + # transformations go here ... + if ($load_recursive) { + $load_recursive = {} unless ref $load_recursive; + my $field = "_entries"; + if ($load_recursive->{$field}) { + my @entries = (); + foreach my $element (keys %{$item->getvalue()}) { + push(@entries,NGCP::BulkProcessor::Redis::mr65::location::entry::get_entry($element,$load_recursive)); + } + $item->{$field} = \@entries; + } + } + +} + +sub process_keys { + + my %params = @_; + my ($process_code, + $static_context, + $blocksize, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + blocksize + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + return process_entries( + get_store => $get_store, + scan_pattern => &$get_key('*'), + type => $type, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,builditems_fromrows(\@$rowblock,[ + map { [ &$get_store()->smembers($_) ]; } @$rowblock + ],$load_recursive),$row_offset); + }, + static_context => $static_context, + blocksize => $blocksize, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_stores_code => \&destroy_stores, + multithreading => $multithreading, + nosqlprocessing_threads => $numofthreads, + ); +} + +1; + diff --git a/lib/NGCP/BulkProcessor/Redis/mr755/location/entry.pm b/lib/NGCP/BulkProcessor/Redis/mr755/location/entry.pm new file mode 100644 index 0000000..9ea3331 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Redis/mr755/location/entry.pm @@ -0,0 +1,163 @@ +package NGCP::BulkProcessor::Redis::mr755::location::entry; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_location_store + destroy_stores +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw( + process_entries +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw( + copy_value +); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnectors::RedisEntry); +our @EXPORT_OK = qw( + get_entry + get_entry_by_ruid + process_keys +); + +my $get_store = \&get_location_store; + +my $table = 'location:entry'; +my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::HASH_TYPE; +my $get_key = sub { + my ($ruid) = @_; + return $table . '::' . $ruid; +}; + +my $fieldnames = [ + 'instance', + 'domain', + 'cseq', + 'partition', + 'ruid', + 'connection_id', + 'username', + 'keepalive', + 'path', + 'reg_id', + 'contact', + 'flags', + 'received', + 'callid', + 'socket', + 'cflags', + 'expires', + 'methods', + 'user_agent', + 'q', + 'last_modified', + 'server_id', +]; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,shift,$fieldnames); + + copy_value($self,shift,$fieldnames); + + return $self; + +} + +sub get_entry { + + my ($key,$load_recursive) = @_; + my $store = &$get_store(); + + if (length($key) and my %res = $store->hgetall($key)) { + return builditems_fromrows($key,\%res,$load_recursive); + } + return undef; + +} + +sub get_entry_by_ruid { + + my ($ruid,$load_recursive) = @_; + my $store = &$get_store(); + + if ($ruid and my %res = $store->hgetall(my $key = &$get_key($ruid))) { + return builditems_fromrows($key,\%res,$load_recursive); + } + return undef; + +} + +sub builditems_fromrows { + + my ($keys,$rows,$load_recursive) = @_; + + my $item; + + if (defined $rows and ref $rows eq 'ARRAY') { + my @items = (); + foreach my $row (@$rows) { + $item = __PACKAGE__->new($keys->[scalar @items], $row); + + # transformations go here ... + + push @items,$item; + + } + return \@items; + } elsif (defined $rows and ref $rows eq 'HASH') { + $item = __PACKAGE__->new($keys,$rows); + return $item; + } + return undef; + +} + +sub process_keys { + + my %params = @_; + my ($process_code, + $static_context, + $blocksize, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + blocksize + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + return process_entries( + get_store => $get_store, + scan_pattern => &$get_key('*'), + type => $type, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,builditems_fromrows(\@$rowblock,[ + map { { &$get_store()->hgetall($_) }; } @$rowblock + ],$load_recursive),$row_offset); + }, + static_context => $static_context, + blocksize => $blocksize, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_stores_code => \&destroy_stores, + multithreading => $multithreading, + nosqlprocessing_threads => $numofthreads, + ); +} + +1; + diff --git a/lib/NGCP/BulkProcessor/Redis/mr755/location/usrdom.pm b/lib/NGCP/BulkProcessor/Redis/mr755/location/usrdom.pm new file mode 100644 index 0000000..f5de7d6 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Redis/mr755/location/usrdom.pm @@ -0,0 +1,160 @@ +package NGCP::BulkProcessor::Redis::mr755::location::usrdom; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_location_store + destroy_stores +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw( + process_entries +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw( + copy_value +); + +use NGCP::BulkProcessor::Redis::mr755::location::entry qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnectors::RedisEntry); +our @EXPORT_OK = qw( + get_usrdom + get_usrdom_by_username_domain + process_keys +); + +my $get_store = \&get_location_store; + +my $table = '1:location:usrdom'; +my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::SET_TYPE; +my $get_key = sub { + my ($username,$domain) = @_; + my $result = $table . '::' . $username; + $result .= ':' . $domain if $domain; + return $result; +}; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,shift); + + copy_value($self,shift); + + return $self; + +} + +sub get_usrdom { + + my ($key,$load_recursive) = @_; + my $store = &$get_store(); + + if (length($key) and my @res = $store->smembers($key)) { + return builditems_fromrows($key,\@res,$load_recursive); + } + return undef; + +} + +sub get_usrdom_by_username_domain { + + my ($username,$domain,$load_recursive) = @_; + my $store = &$get_store(); + + if ($username and $domain and my @res = $store->smembers(my $key = &$get_key($username,$domain))) { + return builditems_fromrows($key,\@res,$load_recursive); + } + return undef; + +} + +sub builditems_fromrows { + + my ($keys,$rows,$load_recursive) = @_; + + my $item; + + if (defined $keys and ref $keys eq 'ARRAY') { + my @items = (); + foreach my $key (@$keys) { + $item = __PACKAGE__->new($key, $rows->[scalar @items]); + + transformitem($item,$load_recursive); + + push @items,$item; + + } + return \@items; + } else { + $item = __PACKAGE__->new($keys,$rows); + transformitem($item,$load_recursive); + return $item; + } + +} + +sub transformitem { + my ($item,$load_recursive) = @_; + + # transformations go here ... + if ($load_recursive) { + $load_recursive = {} unless ref $load_recursive; + my $field = "_entries"; + if ($load_recursive->{$field}) { + my @entries = (); + foreach my $element (keys %{$item->getvalue()}) { + push(@entries,NGCP::BulkProcessor::Redis::mr755::location::entry::get_entry($element,$load_recursive)); + } + $item->{$field} = \@entries; + } + } + +} + +sub process_keys { + + my %params = @_; + my ($process_code, + $static_context, + $blocksize, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + blocksize + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + return process_entries( + get_store => $get_store, + scan_pattern => &$get_key('*'), + type => $type, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,builditems_fromrows(\@$rowblock,[ + map { [ &$get_store()->smembers($_) ]; } @$rowblock + ],$load_recursive),$row_offset); + }, + static_context => $static_context, + blocksize => $blocksize, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_stores_code => \&destroy_stores, + multithreading => $multithreading, + nosqlprocessing_threads => $numofthreads, + ); +} + +1; +