diff --git a/lib/NGCP/BulkProcessor/ConnectorPool.pm b/lib/NGCP/BulkProcessor/ConnectorPool.pm index c708bfa..d0938b6 100644 --- a/lib/NGCP/BulkProcessor/ConnectorPool.pm +++ b/lib/NGCP/BulkProcessor/ConnectorPool.pm @@ -41,6 +41,12 @@ use NGCP::BulkProcessor::Globals qw( $ngcprestapi_username $ngcprestapi_password $ngcprestapi_realm + + $location_databaseindex + $location_password + $location_host + $location_port + $location_sock ); @@ -56,6 +62,7 @@ use NGCP::BulkProcessor::SqlConnectors::MySQLDB; #use NGCP::BulkProcessor::SqlConnectors::CSVDB; #use NGCP::BulkProcessor::SqlConnectors::SQLServerDB; use NGCP::BulkProcessor::RestConnectors::NGCPRestApi; +use NGCP::BulkProcessor::NoSqlConnectors::Redis; use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); @@ -87,12 +94,16 @@ our @EXPORT_OK = qw( xa_db_tableidentifier get_ngcp_restapi + + get_location_store destroy_dbs + destroy_stores get_connectorinstancename get_cluster_db ping_dbs + ping_stores ping ); @@ -109,6 +120,8 @@ my $xa_dbs = {}; my $ngcp_restapis = {}; +my $location_stores = {}; + sub get_accounting_db { my ($instance_name,$reconnect) = @_; @@ -248,6 +261,23 @@ sub get_ngcp_restapi { } +sub get_location_store { + + my ($instance_name,$reconnect) = @_; + my $name = get_connectorinstancename($instance_name); + if (!defined $location_stores->{$name}) { + $location_stores->{$name} = NGCP::BulkProcessor::NoSqlConnectors::Redis->new($instance_name); + if (!defined $reconnect) { + $reconnect = 1; + } + } + if ($reconnect) { + $location_stores->{$name}->connect($location_databaseindex,$location_password,$location_host,$location_port,$location_sock); + } + return $location_stores->{$name}; + +} + sub get_connectorinstancename { my ($name) = @_; @@ -266,6 +296,10 @@ sub ping_dbs { ping($xa_dbs); } +sub ping_stores { + ping($location_stores); +} + sub ping { my $dbs = shift; my $this_tid = threadid(); @@ -315,6 +349,15 @@ sub destroy_dbs { } +sub destroy_stores { + + foreach my $name (keys %$location_stores) { + undef $location_stores->{$name}; + delete $location_stores->{$name}; + } + +} + sub get_cluster_db { # oracle RAC and the like ... my ($cluster,$instance_name,$reconnect) = @_; diff --git a/lib/NGCP/BulkProcessor/Globals.pm b/lib/NGCP/BulkProcessor/Globals.pm index 57d48bc..650414d 100644 --- a/lib/NGCP/BulkProcessor/Globals.pm +++ b/lib/NGCP/BulkProcessor/Globals.pm @@ -82,6 +82,12 @@ our @EXPORT_OK = qw( $xa_password $xa_host $xa_port + + $location_databaseindex + $location_password + $location_host + $location_port + $location_sock $ngcprestapi_uri $ngcprestapi_username @@ -198,7 +204,12 @@ our $xa_password = ''; our $xa_host = '127.0.0.1'; our $xa_port = '3306'; - +our $location_databaseindex = '20'; +our $location_password = undef; +our $location_host = '127.0.0.1'; +our $location_port = '6379'; +our $location_sock = undef; + our $ngcprestapi_uri = 'https://127.0.0.1:443'; our $ngcprestapi_username = 'administrator'; our $ngcprestapi_password = 'administrator'; @@ -314,10 +325,7 @@ sub update_masterconfig { my $result = 1; - $ngcprestapi_uri = $data->{ngcprestapi_uri} if exists $data->{ngcprestapi_uri}; - $ngcprestapi_username = $data->{ngcprestapi_username} if exists $data->{ngcprestapi_username}; - $ngcprestapi_password = $data->{ngcprestapi_password} if exists $data->{ngcprestapi_password}; - $ngcprestapi_realm = $data->{ngcprestapi_realm} if exists $data->{ngcprestapi_realm}; + $cpucount = $data->{cpucount} if exists $data->{cpucount}; $enablemultithreading = $data->{enablemultithreading} if exists $data->{enablemultithreading}; @@ -438,6 +446,17 @@ sub _postprocess_masterconfig { $xa_username = $data->{xa_username} if exists $data->{xa_username}; $xa_password = $data->{xa_password} if exists $data->{xa_password}; + $ngcprestapi_uri = $data->{ngcprestapi_uri} if exists $data->{ngcprestapi_uri}; + $ngcprestapi_username = $data->{ngcprestapi_username} if exists $data->{ngcprestapi_username}; + $ngcprestapi_password = $data->{ngcprestapi_password} if exists $data->{ngcprestapi_password}; + $ngcprestapi_realm = $data->{ngcprestapi_realm} if exists $data->{ngcprestapi_realm}; + + $location_databaseindex = $data->{location_databaseindex} if exists $data->{location_databaseindex}; + $location_password = $data->{location_password} if exists $data->{location_password}; + $location_host = $data->{location_host} if exists $data->{location_host}; + $location_port = $data->{location_port} if exists $data->{location_port}; + $location_sock = $data->{location_sock} if exists $data->{location_sock}; + return 1; } return 0; diff --git a/lib/NGCP/BulkProcessor/LogError.pm b/lib/NGCP/BulkProcessor/LogError.pm index c01c3b2..b1fe24f 100644 --- a/lib/NGCP/BulkProcessor/LogError.pm +++ b/lib/NGCP/BulkProcessor/LogError.pm @@ -49,6 +49,7 @@ our @EXPORT_OK = qw( dbwarn nosqlerror nosqlwarn + nosqlprocessingfailed fieldnamesdiffer transferzerorowcount processzerorowcount @@ -73,7 +74,7 @@ our @EXPORT_OK = qw( fileprocessingwarn restprocessingfailed - + emailwarn configurationwarn configurationerror @@ -342,6 +343,18 @@ sub nosqlwarn { } +sub nosqlprocessingfailed { + + my ($store,$scan_pattern,$logger) = @_; + my $message = 'keystore processing failed: [' . $store->connectidentifier() . '] ' . $scan_pattern; + if (defined $logger) { + $logger->error($message); + } + terminate($message, $logger); + +} + + sub resterror { my ($restapi, $message, $logger) = @_; diff --git a/lib/NGCP/BulkProcessor/Logging.pm b/lib/NGCP/BulkProcessor/Logging.pm index 43e665e..950ed87 100644 --- a/lib/NGCP/BulkProcessor/Logging.pm +++ b/lib/NGCP/BulkProcessor/Logging.pm @@ -810,7 +810,11 @@ sub processing_entries { my ($tid, $start, $blocksize, $logger) = @_; if (defined $logger) { - $logger->info(($enablemultithreading ? '[' . $tid . '] ' : '') . 'processing entries: ' . ($start + 1) . '-' . ($start + $blocksize)); + if ($blocksize) { + $logger->info(($enablemultithreading ? '[' . $tid . '] ' : '') . 'processing entries: ' . ($start + 1) . '-' . ($start + $blocksize)); + } else { + $logger->info(($enablemultithreading ? '[' . $tid . '] ' : '') . 'processing entries: (none)'); + } } } @@ -824,7 +828,7 @@ sub nosqlprocessingstarted { } -sub restprocessingdone { +sub nosqlprocessingdone { my ($store,$scan_pattern,$logger) = @_; if (defined $logger) { diff --git a/lib/NGCP/BulkProcessor/NoSqlConnectors/Redis.pm b/lib/NGCP/BulkProcessor/NoSqlConnectors/Redis.pm index 1ee74f3..8afd9ea 100644 --- a/lib/NGCP/BulkProcessor/NoSqlConnectors/Redis.pm +++ b/lib/NGCP/BulkProcessor/NoSqlConnectors/Redis.pm @@ -24,7 +24,9 @@ use NGCP::BulkProcessor::NoSqlConnector qw( require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnector); -our @EXPORT_OK = qw(); +our @EXPORT_OK = qw( + get_scan_args +); our $AUTOLOAD; @@ -224,4 +226,25 @@ sub multithreading_supported { } +sub get_scan_args { + + my ($scan_pattern,$blocksize,$type) = @_; + my @result = (); + if ($scan_pattern) { + push(@result,'MATCH'); + push(@result,$scan_pattern); + } + if ($blocksize) { + push(@result,'COUNT'); + push(@result,$blocksize); + } + # As of version 6.0 you can use this option to ask SCAN to only return objects that match a given type: + #if ($type) { + # push(@result,'TYPE'); + # push(@result,$type); + #} + return @result; + +} + 1; diff --git a/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisEntry.pm b/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisEntry.pm new file mode 100644 index 0000000..7ce58a0 --- /dev/null +++ b/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisEntry.pm @@ -0,0 +1,84 @@ +package NGCP::BulkProcessor::NoSqlConnectors::RedisEntry; +use strict; + +## no critic + +use Tie::IxHash; + +use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw(init_entry); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + $HASH_TYPE + $SET_TYPE + $LIST_TYPE + $ZSET_TYPE + $STRING_TYPE +); + +#should correspond to the type names for redis SCAN: +our $HASH_TYPE = 'hash'; +our $SET_TYPE = 'set'; +our $LIST_TYPE = 'list'; +our $ZSET_TYPE = 'zset'; +our $STRING_TYPE = 'string'; + +sub new { + + my $base_class = shift; + my $class = shift; + my $type = shift; + my $self = bless {}, $class; + $type = 'default' unless $type; + $type = lc($type); + my $value; + if ($type eq 'set') { + $value = {}; + } elsif ($type eq 'list') { + $value = []; + } elsif ($type eq 'zset') { + my %value = (); + tie(%value, 'Tie::IxHash'); + $value = \%value; + } elsif ($type eq 'hash') { + $value = {}; + } else { #($type eq 'string') { + $type = 'string'; + $value = undef; + } + $self->{type} = $type; + $self->{value} = $value; + return init_entry($self,@_); + +} + +sub getvalue { + my $self = shift; + #$self->{value} = shift if scalar @_; + return $self->{value}; +} + +sub gettype { + my $self = shift; + return $self->{type}; +} + +sub gethash { + my $self = shift; + my $fieldvalues; + if ($self->{type} eq 'set') { + $fieldvalues = [ sort keys %{$self->{value}} ]; + } elsif ($self->{type} eq 'list') { + $fieldvalues = $self->{value}; + } elsif ($self->{type} eq 'zset') { + $fieldvalues = [ keys %{$self->{value}} ]; + } elsif ($self->{type} eq 'hash') { + $fieldvalues = [ map { $self->{value}->{$_}; } sort keys %{$self->{value}} ]; + } else { #($type eq 'string') { + $fieldvalues = [ $self->{value} ]; + } + return get_rowhash($fieldvalues); +} + +1; diff --git a/lib/NGCP/BulkProcessor/RedisProcessor.pm b/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm similarity index 89% rename from lib/NGCP/BulkProcessor/RedisProcessor.pm rename to lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm index 4b3fbe2..8081276 100644 --- a/lib/NGCP/BulkProcessor/RedisProcessor.pm +++ b/lib/NGCP/BulkProcessor/NoSqlConnectors/RedisProcessor.pm @@ -1,4 +1,4 @@ -package NGCP::BulkProcessor::RedisProcessor; +package NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor; use strict; ## no critic @@ -31,6 +31,8 @@ use NGCP::BulkProcessor::LogError qw( use NGCP::BulkProcessor::Utils qw(threadid); +use NGCP::BulkProcessor::NoSqlConnectors::Redis qw(get_scan_args); + require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( @@ -42,6 +44,8 @@ our @EXPORT_OK = qw( my $nosqlprocessing_threadqueuelength = 10; +my $reader_connection_name = 'reader'; + my $thread_sleep_secs = 0.1; my $RUNNING = 1; @@ -165,25 +169,31 @@ sub process_entries { my %params = @_; my ($get_store, $scan_pattern, + $type, $process_code, $static_context, $blocksize, $init_process_context_code, $uninit_process_context_code, + $destroy_reader_dbs_code, $multithreading, $nosqlprocessing_threads) = @params{qw/ get_store scan_pattern + type process_code static_context blocksize init_process_context_code uninit_process_context_code + destroy_reader_dbs_code multithreading nosqlprocessing_threads /}; if (ref $get_store eq 'CODE') { + + my $store = &$get_store($reader_connection_name,1); nosqlprocessingstarted(&$get_store(),$scan_pattern,getlogger(__PACKAGE__)); @@ -199,6 +209,13 @@ sub process_entries { my %errorstates :shared = (); my $queue = Thread::Queue->new(); + nosqlthreadingdebug('shutting down connections ...',getlogger(__PACKAGE__)); + + $store->disconnect(); + my $default_connection = &$get_store(undef,0); + my $default_connection_reconnect = $default_connection->is_connected(); + $default_connection->disconnect(); + nosqlthreadingdebug('starting reader thread',getlogger(__PACKAGE__)); $reader = threads->create(\&_reader, @@ -207,7 +224,9 @@ sub process_entries { threadqueuelength => $nosqlprocessing_threadqueuelength, get_store => $get_store, scan_pattern => $scan_pattern, + type => $type, blocksize => $blocksize, + destroy_stores_code => $destroy_reader_dbs_code, }); for (my $i = 0; $i < $nosqlprocessing_threads; $i++) { @@ -220,6 +239,7 @@ sub process_entries { process_code => $process_code, init_process_context_code => $init_process_context_code, uninit_process_context_code => $uninit_process_context_code, + })); if (!defined $processor) { nosqlthreadingdebug('processor thread ' . ($i + 1) . ' of ' . $nosqlprocessing_threads . ' NOT started',getlogger(__PACKAGE__)); @@ -241,6 +261,12 @@ sub process_entries { } $errorstate = (_get_other_threads_state(\%errorstates,$tid) & ~$RUNNING); + + nosqlthreadingdebug('restoring connections ...',getlogger(__PACKAGE__)); + + if ($default_connection_reconnect) { + $default_connection = &$get_store(undef,1); + } } else { @@ -259,7 +285,7 @@ sub process_entries { my $cursor = 0; while (1) { fetching_entries($store,$scan_pattern,$i,$blocksize,getlogger(__PACKAGE__)); - ($cursor, my $rowblock) = $store->scan($cursor,$scan_pattern,$blocksize); + ($cursor, my $rowblock) = $store->scan($cursor,get_scan_args($scan_pattern,$blocksize,$type)); my $realblocksize = scalar @$rowblock; processing_entries($tid,$i,$realblocksize,getlogger(__PACKAGE__)); $rowblock_result = &$process_code($context,$rowblock,$i); @@ -284,7 +310,8 @@ sub process_entries { &$uninit_process_context_code($context); } }; - + $store->disconnect(); + } if ($errorstate == $COMPLETED) { @@ -300,7 +327,6 @@ sub process_entries { } - sub _reader { my $context = shift; @@ -329,7 +355,7 @@ sub _reader { my $state = $RUNNING; #start at first while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0) { #as long there is one running consumer and no defunct consumer fetching_entries($store,$context->{scan_pattern},$i,$blocksize,getlogger(__PACKAGE__)); - ($cursor, my $rowblock) = $store->scan($cursor,$context->{scan_pattern},$blocksize); + ($cursor, my $rowblock) = $store->scan_shared($cursor,get_scan_args($context->{scan_pattern},$blocksize,$context->{type})); my $realblocksize = scalar @$rowblock; my %packet :shared = (); $packet{rows} = $rowblock; @@ -355,14 +381,31 @@ sub _reader { ,getlogger(__PACKAGE__)); } }; + #nosqlthreadingdebug($@ ? '[' . $tid . '] reader thread error: ' . $@ : '[' . $tid . '] reader thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__)); + #lock $context->{errorstates}; + #if ($@) { + # $context->{errorstates}->{$tid} = $ERROR; + #} else { + # $context->{errorstates}->{$tid} = $COMPLETED; + #} + #return $context->{errorstates}->{$tid}; nosqlthreadingdebug($@ ? '[' . $tid . '] reader thread error: ' . $@ : '[' . $tid . '] reader thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__)); + # stop the consumer: + # $context->{queue}->enqueue(undef); + if (defined $store) { + # if thread cleanup has a problem... + $store->disconnect(); + } + if (defined $context->{destroy_stores_code} and 'CODE' eq ref $context->{destroy_stores_code}) { + &{$context->{destroy_stores_code}}(); + } lock $context->{errorstates}; if ($@) { $context->{errorstates}->{$tid} = $ERROR; } else { $context->{errorstates}->{$tid} = $COMPLETED; } - return $context->{errorstates}->{$tid}; + return $context->{errorstates}->{$tid}; } sub _process { diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t new file mode 100644 index 0000000..e253115 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/Redis.t @@ -0,0 +1,64 @@ +use strict; + +use NGCP::BulkProcessor::Globals qw(); +use NGCP::BulkProcessor::ConnectorPool qw(destroy_stores); +use NGCP::BulkProcessor::NoSqlConnectors::Redis qw(); +use NGCP::BulkProcessor::Redis::Trunk::location::entry qw( + get_entry + get_entry_by_ruid + process_keys +); + +goto SKIP; +{ + my $host = '192.168.0.146'; + my $port = '6379'; + my $sock = undef; + my $password = undef; + my $databaseindex = '0'; + + my $store = NGCP::BulkProcessor::NoSqlConnectors::Redis->new(undef); + $store->connect(20,undef,$host); + + my @result = $store->keys_shared('*',sub { + my ($reply, $error) = @_; + die "Oops, got an error: $error\n" if defined $error; + print "$_\n" for @$reply; + }); + #print join("\n",@keys); +} + +SKIP: +{ + $NGCP::BulkProcessor::Globals::location_host = '192.168.0.146'; + my $static_context = { + + }; + my $result = process_keys( + process_code => sub { + my ($context,$records,$row_offset) = @_; + return 1; + }, + static_context => $static_context, + blocksize => 10000, + init_process_context_code => sub { + my ($context)= @_; + }, + uninit_process_context_code => sub { + my ($context)= @_; + destroy_stores(); + }, + multithreading => 1, + numofthreads => 4, + #load_recursive => , + ); + +} + +SKIP: +{ + my $location = get_entry_by_ruid(); +} + +exit; + diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl new file mode 100644 index 0000000..684f6d8 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Massive/RegistrationMonitoring/process.pl @@ -0,0 +1,3 @@ + + +get_location_store \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Redis/Trunk/location/entry.pm b/lib/NGCP/BulkProcessor/Redis/Trunk/location/entry.pm new file mode 100644 index 0000000..9425548 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Redis/Trunk/location/entry.pm @@ -0,0 +1,155 @@ +package NGCP::BulkProcessor::Redis::Trunk::location::entry; +use strict; + +## no critic + +use NGCP::BulkProcessor::ConnectorPool qw( + get_location_store + destroy_stores +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisProcessor qw( + copy_value + process_entries +); + +use NGCP::BulkProcessor::NoSqlConnectors::RedisEntry qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::NoSqlConnectors::RedisEntry); +our @EXPORT_OK = qw( + get_entry + get_entry_by_ruid + process_keys +); + +my $get_store = \&get_location_store; + +my $table = 'location:entry'; +my $type = $NGCP::BulkProcessor::NoSqlConnectors::RedisEntry::HASH_TYPE; +my $get_key_from_ruid = sub { + my ($ruid) = @_; + return $table . '::' . $ruid; +}; + +my $fieldnames = [ + 'instance', + 'domain', + 'cseq', + 'partition', + 'ruid', + 'connection_id', + 'username', + 'keepalive', + 'path', + 'reg_id', + 'contact', + 'flags', + 'received', + 'callid', + 'socket', + 'cflags', + 'expires', + 'methods', + 'user_agent', + 'q', + 'last_modified', + 'server_id', +]; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::NoSqlConnectors::RedisEntry->new($class,$type,$fieldnames); + + copy_value($self,shift,$fieldnames); + + return $self; + +} + +sub get_entry { + + my ($key,$load_recursive) = @_; + my $store = &$get_store(); + + return builditems_fromrows({ $store->hgetall($key) },$load_recursive); + +} + +sub get_entry_by_ruid { + + my ($ruid,$load_recursive) = @_; + my $store = &$get_store(); + + return builditems_fromrows({ $store->hgetall(&$get_key_from_ruid($ruid)) },$load_recursive); + +} + +sub builditems_fromrows { + + my ($rows,$load_recursive) = @_; + + my $item; + + if (defined $rows and ref $rows eq 'ARRAY') { + my @items = (); + foreach my $row (@$rows) { + $item = __PACKAGE__->new($row); + + # transformations go here ... + + push @items,$item; + } + return \@items; + } elsif (defined $rows and ref $rows eq 'HASH') { + $item = __PACKAGE__->new($rows); + return $item; + } + return undef; + +} + +sub process_keys { + + my %params = @_; + my ($process_code, + $static_context, + $blocksize, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $load_recursive) = @params{qw/ + process_code + static_context + blocksize + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + load_recursive + /}; + + return process_entries( + get_store => $get_store, + scan_pattern => &$get_key_from_ruid('*'), + type => $type, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,builditems_fromrows([ + map { { &$get_store()->hgetall($_) }; } @$rowblock + ],$load_recursive),$row_offset); + }, + static_context => $static_context, + blocksize => $blocksize, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_stores_code => \&destroy_stores, + multithreading => $multithreading, + nosqlprocessing_threads => $numofthreads, + ); +} + +1; + diff --git a/lib/NGCP/BulkProcessor/default.cfg b/lib/NGCP/BulkProcessor/default.cfg index 4770ff5..6222f5e 100644 --- a/lib/NGCP/BulkProcessor/default.cfg +++ b/lib/NGCP/BulkProcessor/default.cfg @@ -49,6 +49,13 @@ ngcprestapi_username = administrator ngcprestapi_password = administrator ngcprestapi_realm = api_admin_http +##NGCP Redis connectivity - "location" store: +location_databaseindex = 20 +#location_password = +location_host = 127.0.0.1 +location_port = 6379 +#location_sock = + ##sending email: emailenable = 0 erroremailrecipient =