TT#108605 redis "location" store schema #1

Change-Id: I96150ac365658e1cf0c156e725e88b75757977e9
(cherry picked from commit e9e315fd8b)
mr7.5.5
Rene Krenn 5 years ago
parent 41634ee566
commit 1673e11728

@ -41,6 +41,12 @@ use NGCP::BulkProcessor::Globals qw(
$ngcprestapi_username
$ngcprestapi_password
$ngcprestapi_realm
$location_databaseindex
$location_password
$location_host
$location_port
$location_sock
);
@ -56,6 +62,7 @@ use NGCP::BulkProcessor::SqlConnectors::MySQLDB;
#use NGCP::BulkProcessor::SqlConnectors::CSVDB;
#use NGCP::BulkProcessor::SqlConnectors::SQLServerDB;
use NGCP::BulkProcessor::RestConnectors::NGCPRestApi;
use NGCP::BulkProcessor::NoSqlConnectors::Redis;
use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo);
@ -87,12 +94,16 @@ our @EXPORT_OK = qw(
xa_db_tableidentifier
get_ngcp_restapi
get_location_store
destroy_dbs
destroy_stores
get_connectorinstancename
get_cluster_db
ping_dbs
ping_stores
ping
);
@ -109,6 +120,8 @@ my $xa_dbs = {};
my $ngcp_restapis = {};
my $location_stores = {};
sub get_accounting_db {
my ($instance_name,$reconnect) = @_;
@ -248,6 +261,23 @@ sub get_ngcp_restapi {
}
sub get_location_store {
my ($instance_name,$reconnect) = @_;
my $name = get_connectorinstancename($instance_name);
if (!defined $location_stores->{$name}) {
$location_stores->{$name} = NGCP::BulkProcessor::NoSqlConnectors::Redis->new($instance_name);
if (!defined $reconnect) {
$reconnect = 1;
}
}
if ($reconnect) {
$location_stores->{$name}->connect($location_databaseindex,$location_password,$location_host,$location_port,$location_sock);
}
return $location_stores->{$name};
}
sub get_connectorinstancename {
my ($name) = @_;
@ -266,6 +296,10 @@ sub ping_dbs {
ping($xa_dbs);
}
sub ping_stores {
ping($location_stores);
}
sub ping {
my $dbs = shift;
my $this_tid = threadid();
@ -315,6 +349,15 @@ sub destroy_dbs {
}
sub destroy_stores {
foreach my $name (keys %$location_stores) {
undef $location_stores->{$name};
delete $location_stores->{$name};
}
}
sub get_cluster_db { # oracle RAC and the like ...
my ($cluster,$instance_name,$reconnect) = @_;

@ -82,6 +82,12 @@ our @EXPORT_OK = qw(
$xa_password
$xa_host
$xa_port
$location_databaseindex
$location_password
$location_host
$location_port
$location_sock
$ngcprestapi_uri
$ngcprestapi_username
@ -198,7 +204,12 @@ our $xa_password = '';
our $xa_host = '127.0.0.1';
our $xa_port = '3306';
our $location_databaseindex = '20';
our $location_password = undef;
our $location_host = '127.0.0.1';
our $location_port = '6379';
our $location_sock = undef;
our $ngcprestapi_uri = 'https://127.0.0.1:443';
our $ngcprestapi_username = 'administrator';
our $ngcprestapi_password = 'administrator';
@ -314,10 +325,7 @@ sub update_masterconfig {
my $result = 1;
$ngcprestapi_uri = $data->{ngcprestapi_uri} if exists $data->{ngcprestapi_uri};
$ngcprestapi_username = $data->{ngcprestapi_username} if exists $data->{ngcprestapi_username};
$ngcprestapi_password = $data->{ngcprestapi_password} if exists $data->{ngcprestapi_password};
$ngcprestapi_realm = $data->{ngcprestapi_realm} if exists $data->{ngcprestapi_realm};
$cpucount = $data->{cpucount} if exists $data->{cpucount};
$enablemultithreading = $data->{enablemultithreading} if exists $data->{enablemultithreading};
@ -438,6 +446,17 @@ sub _postprocess_masterconfig {
$xa_username = $data->{xa_username} if exists $data->{xa_username};
$xa_password = $data->{xa_password} if exists $data->{xa_password};
$ngcprestapi_uri = $data->{ngcprestapi_uri} if exists $data->{ngcprestapi_uri};
$ngcprestapi_username = $data->{ngcprestapi_username} if exists $data->{ngcprestapi_username};
$ngcprestapi_password = $data->{ngcprestapi_password} if exists $data->{ngcprestapi_password};
$ngcprestapi_realm = $data->{ngcprestapi_realm} if exists $data->{ngcprestapi_realm};
$location_databaseindex = $data->{location_databaseindex} if exists $data->{location_databaseindex};
$location_password = $data->{location_password} if exists $data->{location_password};
$location_host = $data->{location_host} if exists $data->{location_host};
$location_port = $data->{location_port} if exists $data->{location_port};
$location_sock = $data->{location_sock} if exists $data->{location_sock};
return 1;
}
return 0;

@ -49,6 +49,7 @@ our @EXPORT_OK = qw(
dbwarn
nosqlerror
nosqlwarn
nosqlprocessingfailed
fieldnamesdiffer
transferzerorowcount
processzerorowcount
@ -73,7 +74,7 @@ our @EXPORT_OK = qw(
fileprocessingwarn
restprocessingfailed
emailwarn
configurationwarn
configurationerror
@ -342,6 +343,18 @@ sub nosqlwarn {
}
sub nosqlprocessingfailed {
my ($store,$scan_pattern,$logger) = @_;
my $message = 'keystore processing failed: [' . $store->connectidentifier() . '] ' . $scan_pattern;
if (defined $logger) {
$logger->error($message);
}
terminate($message, $logger);
}
sub resterror {
my ($restapi, $message, $logger) = @_;

@ -810,7 +810,11 @@ sub processing_entries {
my ($tid, $start, $blocksize, $logger) = @_;
if (defined $logger) {
$logger->info(($enablemultithreading ? '[' . $tid . '] ' : '') . 'processing entries: ' . ($start + 1) . '-' . ($start + $blocksize));
if ($blocksize) {
$logger->info(($enablemultithreading ? '[' . $tid . '] ' : '') . 'processing entries: ' . ($start + 1) . '-' . ($start + $blocksize));
} else {
$logger->info(($enablemultithreading ? '[' . $tid . '] ' : '') . 'processing entries: (none)');
}
}
}
@ -824,7 +828,7 @@ sub nosqlprocessingstarted {
}
sub restprocessingdone {
sub nosqlprocessingdone {
my ($store,$scan_pattern,$logger) = @_;
if (defined $logger) {

@ -24,7 +24,9 @@ use NGCP::BulkProcessor::NoSqlConnector qw(
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnector);
our @EXPORT_OK = qw();
our @EXPORT_OK = qw(
get_scan_args
);
our $AUTOLOAD;
@ -224,4 +226,25 @@ sub multithreading_supported {
}
sub get_scan_args {
my ($scan_pattern,$blocksize,$type) = @_;
my @result = ();
if ($scan_pattern) {
push(@result,'MATCH');
push(@result,$scan_pattern);
}
if ($blocksize) {
push(@result,'COUNT');
push(@result,$blocksize);
}
# As of version 6.0 you can use this option to ask SCAN to only return objects that match a given type:
#if ($type) {
# push(@result,'TYPE');
# push(@result,$type);
#}
return @result;
}
1;

@ -0,0 +1,84 @@
package NGCP::BulkProcessor::NoSqlConnectors::RedisEntry;
use strict;
## no critic
use Tie::IxHash;
use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw(init_entry);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
$HASH_TYPE
$SET_TYPE
$LIST_TYPE
$ZSET_TYPE
$STRING_TYPE
);
#should correspond to the type names for redis SCAN:
our $HASH_TYPE = 'hash';
our $SET_TYPE = 'set';
our $LIST_TYPE = 'list';
our $ZSET_TYPE = 'zset';
our $STRING_TYPE = 'string';
sub new {
my $base_class = shift;
my $class = shift;
my $type = shift;
my $self = bless {}, $class;
$type = 'default' unless $type;
$type = lc($type);
my $value;
if ($type eq 'set') {
$value = {};
} elsif ($type eq 'list') {
$value = [];
} elsif ($type eq 'zset') {
my %value = ();
tie(%value, 'Tie::IxHash');
$value = \%value;
} elsif ($type eq 'hash') {
$value = {};
} else { #($type eq 'string') {
$type = 'string';
$value = undef;
}
$self->{type} = $type;
$self->{value} = $value;
return init_entry($self,@_);
}
sub getvalue {
my $self = shift;
#$self->{value} = shift if scalar @_;
return $self->{value};
}
sub gettype {
my $self = shift;
return $self->{type};
}
sub gethash {
my $self = shift;
my $fieldvalues;
if ($self->{type} eq 'set') {
$fieldvalues = [ sort keys %{$self->{value}} ];
} elsif ($self->{type} eq 'list') {
$fieldvalues = $self->{value};
} elsif ($self->{type} eq 'zset') {
$fieldvalues = [ keys %{$self->{value}} ];
} elsif ($self->{type} eq 'hash') {
$fieldvalues = [ map { $self->{value}->{$_}; } sort keys %{$self->{value}} ];
} else { #($type eq 'string') {
$fieldvalues = [ $self->{value} ];
}
return get_rowhash($fieldvalues);
}
1;

@ -1,4 +1,4 @@
package NGCP::BulkProcessor::RedisProcessor;
package NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor;
use strict;
## no critic
@ -31,6 +31,8 @@ use NGCP::BulkProcessor::LogError qw(
use NGCP::BulkProcessor::Utils qw(threadid);
use NGCP::BulkProcessor::NoSqlConnectors::Redis qw(get_scan_args);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
@ -42,6 +44,8 @@ our @EXPORT_OK = qw(
my $nosqlprocessing_threadqueuelength = 10;
my $reader_connection_name = 'reader';
my $thread_sleep_secs = 0.1;
my $RUNNING = 1;
@ -165,25 +169,31 @@ sub process_entries {
my %params = @_;
my ($get_store,
$scan_pattern,
$type,
$process_code,
$static_context,
$blocksize,
$init_process_context_code,
$uninit_process_context_code,
$destroy_reader_dbs_code,
$multithreading,
$nosqlprocessing_threads) = @params{qw/
get_store
scan_pattern
type
process_code
static_context
blocksize
init_process_context_code
uninit_process_context_code
destroy_reader_dbs_code
multithreading
nosqlprocessing_threads
/};
if (ref $get_store eq 'CODE') {
my $store = &$get_store($reader_connection_name,1);
nosqlprocessingstarted(&$get_store(),$scan_pattern,getlogger(__PACKAGE__));
@ -199,6 +209,13 @@ sub process_entries {
my %errorstates :shared = ();
my $queue = Thread::Queue->new();
nosqlthreadingdebug('shutting down connections ...',getlogger(__PACKAGE__));
$store->disconnect();
my $default_connection = &$get_store(undef,0);
my $default_connection_reconnect = $default_connection->is_connected();
$default_connection->disconnect();
nosqlthreadingdebug('starting reader thread',getlogger(__PACKAGE__));
$reader = threads->create(\&_reader,
@ -207,7 +224,9 @@ sub process_entries {
threadqueuelength => $nosqlprocessing_threadqueuelength,
get_store => $get_store,
scan_pattern => $scan_pattern,
type => $type,
blocksize => $blocksize,
destroy_stores_code => $destroy_reader_dbs_code,
});
for (my $i = 0; $i < $nosqlprocessing_threads; $i++) {
@ -220,6 +239,7 @@ sub process_entries {
process_code => $process_code,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
}));
if (!defined $processor) {
nosqlthreadingdebug('processor thread ' . ($i + 1) . ' of ' . $nosqlprocessing_threads . ' NOT started',getlogger(__PACKAGE__));
@ -241,6 +261,12 @@ sub process_entries {
}
$errorstate = (_get_other_threads_state(\%errorstates,$tid) & ~$RUNNING);
nosqlthreadingdebug('restoring connections ...',getlogger(__PACKAGE__));
if ($default_connection_reconnect) {
$default_connection = &$get_store(undef,1);
}
} else {
@ -259,7 +285,7 @@ sub process_entries {
my $cursor = 0;
while (1) {
fetching_entries($store,$scan_pattern,$i,$blocksize,getlogger(__PACKAGE__));
($cursor, my $rowblock) = $store->scan($cursor,$scan_pattern,$blocksize);
($cursor, my $rowblock) = $store->scan($cursor,get_scan_args($scan_pattern,$blocksize,$type));
my $realblocksize = scalar @$rowblock;
processing_entries($tid,$i,$realblocksize,getlogger(__PACKAGE__));
$rowblock_result = &$process_code($context,$rowblock,$i);
@ -284,7 +310,8 @@ sub process_entries {
&$uninit_process_context_code($context);
}
};
$store->disconnect();
}
if ($errorstate == $COMPLETED) {
@ -300,7 +327,6 @@ sub process_entries {
}
sub _reader {
my $context = shift;
@ -329,7 +355,7 @@ sub _reader {
my $state = $RUNNING; #start at first
while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0) { #as long there is one running consumer and no defunct consumer
fetching_entries($store,$context->{scan_pattern},$i,$blocksize,getlogger(__PACKAGE__));
($cursor, my $rowblock) = $store->scan($cursor,$context->{scan_pattern},$blocksize);
($cursor, my $rowblock) = $store->scan_shared($cursor,get_scan_args($context->{scan_pattern},$blocksize,$context->{type}));
my $realblocksize = scalar @$rowblock;
my %packet :shared = ();
$packet{rows} = $rowblock;
@ -355,14 +381,31 @@ sub _reader {
,getlogger(__PACKAGE__));
}
};
#nosqlthreadingdebug($@ ? '[' . $tid . '] reader thread error: ' . $@ : '[' . $tid . '] reader thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
#lock $context->{errorstates};
#if ($@) {
# $context->{errorstates}->{$tid} = $ERROR;
#} else {
# $context->{errorstates}->{$tid} = $COMPLETED;
#}
#return $context->{errorstates}->{$tid};
nosqlthreadingdebug($@ ? '[' . $tid . '] reader thread error: ' . $@ : '[' . $tid . '] reader thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
# stop the consumer:
# $context->{queue}->enqueue(undef);
if (defined $store) {
# if thread cleanup has a problem...
$store->disconnect();
}
if (defined $context->{destroy_stores_code} and 'CODE' eq ref $context->{destroy_stores_code}) {
&{$context->{destroy_stores_code}}();
}
lock $context->{errorstates};
if ($@) {
$context->{errorstates}->{$tid} = $ERROR;
} else {
$context->{errorstates}->{$tid} = $COMPLETED;
}
return $context->{errorstates}->{$tid};
return $context->{errorstates}->{$tid};
}
sub _process {

@ -0,0 +1,64 @@
use strict;
use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::ConnectorPool qw(destroy_stores);
use NGCP::BulkProcessor::NoSqlConnectors::Redis qw();
use NGCP::BulkProcessor::Redis::Trunk::location::entry qw(
get_entry
get_entry_by_ruid
process_keys
);
goto SKIP;
{
my $host = '192.168.0.146';
my $port = '6379';
my $sock = undef;
my $password = undef;
my $databaseindex = '0';
my $store = NGCP::BulkProcessor::NoSqlConnectors::Redis->new(undef);
$store->connect(20,undef,$host);
my @result = $store->keys_shared('*',sub {
my ($reply, $error) = @_;
die "Oops, got an error: $error\n" if defined $error;
print "$_\n" for @$reply;
});
#print join("\n",@keys);
}
SKIP:
{
$NGCP::BulkProcessor::Globals::location_host = '192.168.0.146';
my $static_context = {
};
my $result = process_keys(
process_code => sub {
my ($context,$records,$row_offset) = @_;
return 1;
},
static_context => $static_context,
blocksize => 10000,
init_process_context_code => sub {
my ($context)= @_;
},
uninit_process_context_code => sub {
my ($context)= @_;
destroy_stores();
},
multithreading => 1,
numofthreads => 4,
#load_recursive => ,
);
}
SKIP:
{
my $location = get_entry_by_ruid();
}
exit;

@ -0,0 +1,155 @@
package NGCP::BulkProcessor::Redis::Trunk::location::entry;
use strict;
## no critic
use NGCP::BulkProcessor::ConnectorPool qw(
get_location_store
destroy_stores
);
use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw(
copy_value
process_entries
);
use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnectors::RedisEntry);
our @EXPORT_OK = qw(
get_entry
get_entry_by_ruid
process_keys
);
my $get_store = \&get_location_store;
my $table = 'location:entry';
my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::HASH_TYPE;
my $get_key_from_ruid = sub {
my ($ruid) = @_;
return $table . '::' . $ruid;
};
my $fieldnames = [
'instance',
'domain',
'cseq',
'partition',
'ruid',
'connection_id',
'username',
'keepalive',
'path',
'reg_id',
'contact',
'flags',
'received',
'callid',
'socket',
'cflags',
'expires',
'methods',
'user_agent',
'q',
'last_modified',
'server_id',
];
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,$fieldnames);
copy_value($self,shift,$fieldnames);
return $self;
}
sub get_entry {
my ($key,$load_recursive) = @_;
my $store = &$get_store();
return builditems_fromrows({ $store->hgetall($key) },$load_recursive);
}
sub get_entry_by_ruid {
my ($ruid,$load_recursive) = @_;
my $store = &$get_store();
return builditems_fromrows({ $store->hgetall(&$get_key_from_ruid($ruid)) },$load_recursive);
}
sub builditems_fromrows {
my ($rows,$load_recursive) = @_;
my $item;
if (defined $rows and ref $rows eq 'ARRAY') {
my @items = ();
foreach my $row (@$rows) {
$item = __PACKAGE__->new($row);
# transformations go here ...
push @items,$item;
}
return \@items;
} elsif (defined $rows and ref $rows eq 'HASH') {
$item = __PACKAGE__->new($rows);
return $item;
}
return undef;
}
sub process_keys {
my %params = @_;
my ($process_code,
$static_context,
$blocksize,
$init_process_context_code,
$uninit_process_context_code,
$multithreading,
$numofthreads,
$load_recursive) = @params{qw/
process_code
static_context
blocksize
init_process_context_code
uninit_process_context_code
multithreading
numofthreads
load_recursive
/};
return process_entries(
get_store => $get_store,
scan_pattern => &$get_key_from_ruid('*'),
type => $type,
process_code => sub {
my ($context,$rowblock,$row_offset) = @_;
return &$process_code($context,builditems_fromrows([
map { { &$get_store()->hgetall($_) }; } @$rowblock
],$load_recursive),$row_offset);
},
static_context => $static_context,
blocksize => $blocksize,
init_process_context_code => $init_process_context_code,
uninit_process_context_code => $uninit_process_context_code,
destroy_reader_stores_code => \&destroy_stores,
multithreading => $multithreading,
nosqlprocessing_threads => $numofthreads,
);
}
1;

@ -49,6 +49,13 @@ ngcprestapi_username = administrator
ngcprestapi_password = administrator
ngcprestapi_realm = api_admin_http
##NGCP Redis connectivity - "location" store:
location_databaseindex = 20
#location_password =
location_host = 127.0.0.1
location_port = 6379
#location_sock =
##sending email:
emailenable = 0
erroremailrecipient =

Loading…
Cancel
Save