From 5866b1bc8fc3b51a5eb976320f47ea855ce44805 Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Mon, 23 Sep 2024 15:46:45 +0200 Subject: [PATCH] MT#61121 support infinite processing of SQL tables Change-Id: I129d9126729503f7679077182dfa81f0f166f754 --- .../BulkProcessor/Dao/Trunk/accounting/cdr.pm | 3 + lib/NGCP/BulkProcessor/Logging.pm | 14 +-- lib/NGCP/BulkProcessor/SqlConnector.pm | 2 +- lib/NGCP/BulkProcessor/SqlProcessor.pm | 95 ++++++++++++++----- 4 files changed, 83 insertions(+), 31 deletions(-) diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm index dd84f04..9fa6a48 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm @@ -357,6 +357,7 @@ sub process_unexported { $alias, $joins, $conditions, + $loop, #$sort, $limit) = @params{qw/ process_code @@ -370,6 +371,7 @@ sub process_unexported { alias joins conditions + loop limit /}; #sort @@ -436,6 +438,7 @@ sub process_unexported { blocksize => $blocksize, select => $select_stmt, selectcount => $count_stmt, + loop => $loop, ); } diff --git a/lib/NGCP/BulkProcessor/Logging.pm b/lib/NGCP/BulkProcessor/Logging.pm index bc97c67..6a19851 100644 --- a/lib/NGCP/BulkProcessor/Logging.pm +++ b/lib/NGCP/BulkProcessor/Logging.pm @@ -437,7 +437,7 @@ sub tabletransferstarted { my ($db,$tablename,$target_db,$targettablename,$numofrows,$logger) = @_; if (defined $logger) { - $logger->info(_getsqlconnectorinstanceprefix($db) . 'table transfer started: [' . $db->connectidentifier() . '].' . $tablename . ' > ' . $targettablename . ': ' . $numofrows . ' row(s)'); + $logger->info(_getsqlconnectorinstanceprefix($db) . 'table transfer started: [' . $db->connectidentifier() . '].' . $tablename . ' > ' . $targettablename . (defined $numofrows ? ': ' . $numofrows . ' row(s)' : '')); } } @@ -446,7 +446,7 @@ sub tableprocessingstarted { my ($db,$tablename,$numofrows,$logger) = @_; if (defined $logger) { - $logger->info('table processing started: [' . $db->connectidentifier() . '].' . $tablename . ': ' . $numofrows . ' row(s)'); + $logger->info('table processing started: [' . $db->connectidentifier() . '].' . $tablename . (defined $numofrows ? ': ' . $numofrows . ' row(s)' : '')); } } @@ -645,7 +645,7 @@ sub tabletransferdone { my ($db,$tablename,$target_db,$targettablename,$numofrows,$logger) = @_; if (defined $logger) { - $logger->info(_getsqlconnectorinstanceprefix($db) . 'table transfer done: [' . $db->connectidentifier() . '].' . $tablename . ' > ' . $targettablename . ': ' . $numofrows . ' row(s)'); + $logger->info(_getsqlconnectorinstanceprefix($db) . 'table transfer done: [' . $db->connectidentifier() . '].' . $tablename . ' > ' . $targettablename . (defined $numofrows ? ': ' . $numofrows . ' row(s)' : '')); } } @@ -663,7 +663,7 @@ sub tableprocessingdone { my ($db,$tablename,$numofrows,$logger) = @_; if (defined $logger) { - $logger->info('table processing done: [' . $db->connectidentifier() . '].' . $tablename . ': ' . $numofrows . ' row(s)'); + $logger->info('table processing done: [' . $db->connectidentifier() . '].' . $tablename . (defined $numofrows ? ': ' . $numofrows . ' row(s)' : '')); } } @@ -681,7 +681,7 @@ sub fetching_rows { my ($db,$tablename,$start,$blocksize,$totalnumofrows,$logger) = @_; if (defined $logger) { - $logger->info(_getsqlconnectorinstanceprefix($db) . 'fetching rows from [' . $db->connectidentifier() . '].' . $tablename . ': ' . ($start + 1) . '-' . ($start + $blocksize) . ' of ' . $totalnumofrows); + $logger->info(_getsqlconnectorinstanceprefix($db) . 'fetching rows from [' . $db->connectidentifier() . '].' . $tablename . ': ' . ($start + 1) . '-' . ($start + $blocksize) . (defined $totalnumofrows ? ' of ' . $totalnumofrows : '')); } } @@ -690,7 +690,7 @@ sub writing_rows { my ($db,$tablename,$start,$blocksize,$totalnumofrows,$logger) = @_; if (defined $logger) { - $logger->info(_getsqlconnectorinstanceprefix($db) . 'writing rows to ' . $tablename . ': ' . ($start + 1) . '-' . ($start + $blocksize) . ' of ' . $totalnumofrows); + $logger->info(_getsqlconnectorinstanceprefix($db) . 'writing rows to ' . $tablename . ': ' . ($start + 1) . '-' . ($start + $blocksize) . (defined $totalnumofrows ? ' of ' . $totalnumofrows : '')); } } @@ -699,7 +699,7 @@ sub processing_rows { my ($context, $start,$blocksize,$totalnumofrows,$logger) = @_; if (defined $logger) { - $logger->info(_processing_prefix($context) . 'processing rows: ' . ($start + 1) . '-' . ($start + $blocksize) . ' of ' . $totalnumofrows); + $logger->info(_processing_prefix($context) . 'processing rows: ' . ($start + 1) . '-' . ($start + $blocksize) . (defined $totalnumofrows ? ' of ' . $totalnumofrows : '')); } } diff --git a/lib/NGCP/BulkProcessor/SqlConnector.pm b/lib/NGCP/BulkProcessor/SqlConnector.pm index 807d2bf..a024162 100644 --- a/lib/NGCP/BulkProcessor/SqlConnector.pm +++ b/lib/NGCP/BulkProcessor/SqlConnector.pm @@ -955,7 +955,7 @@ sub db_get_rowblock { dbdebug($self,'db_get_rowblock: ' . $self->{query} . "\nparameters:\n" . join(', ', @{$self->{params}}),getlogger(__PACKAGE__)) if $log_db_operations; - foreach (@{$self->{sth}->fetchall_arrayref(undef, $max_rows)}) { + foreach (@{$self->{sth}->fetchall_arrayref(undef, $max_rows) // []}) { my @row : shared = @{$_}; push @rows, \@row; } diff --git a/lib/NGCP/BulkProcessor/SqlProcessor.pm b/lib/NGCP/BulkProcessor/SqlProcessor.pm index 091e88e..1ac65e8 100644 --- a/lib/NGCP/BulkProcessor/SqlProcessor.pm +++ b/lib/NGCP/BulkProcessor/SqlProcessor.pm @@ -79,6 +79,7 @@ our @EXPORT_OK = qw( delete_records insert_stmt + is_shutdown ); #transfer_record #transfer_records @@ -103,6 +104,7 @@ my $reader_name = 'reader'; my $writer_name = 'writer'; my $thread_sleep_secs = 0.1; +my $loop_sleep_secs = 1; my $RUNNING = 1; my $COMPLETED = 2; @@ -753,6 +755,7 @@ sub transfer_table { $destroy_target_dbs_code, $selectcount, $select, + $loop, $values) = @params{qw/ get_db class @@ -769,6 +772,7 @@ sub transfer_table { destroy_target_dbs_code selectcount select + loop values /}; @@ -790,15 +794,20 @@ sub transfer_table { $countstatement = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename); } - my $rowcount = $db->db_get_value($countstatement,@$values); + my $rowcount; + unless ($loop) { + $rowcount = $db->db_get_value($countstatement,@$values); - #my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename)); + #my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename)); - if ($rowcount > 0) { - tabletransferstarted($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__)); + if ($rowcount > 0) { + tabletransferstarted($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__)); + } else { + transferzerorowcount($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__)); + return 1; + } } else { - transferzerorowcount($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__)); - return 1; + tabletransferstarted($db,$tablename,$target_db,$targettablename,undef,getlogger(__PACKAGE__)); } my $errorstate = $RUNNING; # 1; @@ -863,6 +872,7 @@ sub transfer_table { selectstatement => $selectstatement, blocksize => $blocksize, rowcount => $rowcount, + loop => $loop, #logger => $logger, values_ref => $values, destroy_dbs_code => $destroy_source_dbs_code, @@ -883,6 +893,7 @@ sub transfer_table { insertstatement => $insertstatement, blocksize => $blocksize, rowcount => $rowcount, + loop => $loop, #logger => $logger, destroy_dbs_code => $destroy_target_dbs_code, }); @@ -935,9 +946,10 @@ sub transfer_table { eval { $db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename + my $last_i; my $i = 0; while (1) { - fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)); + fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i; $db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional; my $rowblock = $db->db_get_rowblock($blocksize); $db->db_finish() unless $db->rowblock_transactional; @@ -955,10 +967,10 @@ sub transfer_table { #undef $rowblock; if ($realblocksize < $blocksize) { - last; + last unless $loop; } } else { - last; + last unless $loop; } } $db->db_finish() if $db->rowblock_transactional; @@ -1064,6 +1076,7 @@ sub process_table { $destroy_reader_dbs_code, $selectcount, $select, + $loop, $values) = @params{qw/ get_db name @@ -1079,6 +1092,7 @@ sub process_table { destroy_reader_dbs_code selectcount select + loop values /}; @@ -1100,13 +1114,20 @@ sub process_table { $countstatement = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename); } - my $rowcount = $db->db_get_value($countstatement,@$values); + my $rowcount; + unless ($loop) { + $rowcount = $db->db_get_value($countstatement,@$values); - if ($rowcount > 0) { - tableprocessingstarted($db,$tablename,$rowcount,getlogger(__PACKAGE__)); + #my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename)); + + if ($rowcount > 0) { + tableprocessingstarted($db,$tablename,$rowcount,getlogger(__PACKAGE__)); + } else { + processzerorowcount($db,$tablename,$rowcount,getlogger(__PACKAGE__)); + return 1; + } } else { - processzerorowcount($db,$tablename,$rowcount,getlogger(__PACKAGE__)); - return 1; + tableprocessingstarted($db,$tablename,undef,getlogger(__PACKAGE__)); } my @fieldnames = @$expected_fieldnames; @@ -1160,6 +1181,7 @@ sub process_table { selectstatement => $selectstatement, blocksize => $blocksize, rowcount => $rowcount, + loop => $loop, #logger => $logger, values_ref => $values, destroy_dbs_code => $destroy_reader_dbs_code, @@ -1180,6 +1202,7 @@ sub process_table { uninit_process_context_code => $uninit_process_context_code, blocksize => $blocksize, rowcount => $rowcount, + loop => $loop, #logger => $logger, })); if (!defined $processor) { @@ -1265,9 +1288,10 @@ sub process_table { $db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename + my $last_i; my $i = 0; while (1) { - fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)); + fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i; $db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional; my $rowblock = $db->db_get_rowblock($blocksize); if ('CODE' eq ref $read_code) { @@ -1285,18 +1309,20 @@ sub process_table { #$target_db->db_finish(); $i += $realblocksize; - if ($realblocksize < $blocksize || not $rowblock_result) { - last; + if (not $rowblock_result) { + last; + } elsif ($realblocksize < $blocksize) { + last unless $loop; } } else { - last; + last unless $loop; } } $db->db_finish() if $db->rowblock_transactional; }; -print $@; + #print $@; if ($@) { $errorstate = $ERROR; } else { @@ -1320,7 +1346,7 @@ print $@; #$db->db_disconnect(); return 1; } else { - print "errorstate: $errorstate \n"; + #print "errorstate: $errorstate \n"; tableprocessingfailed($db,$tablename,$rowcount,getlogger(__PACKAGE__)); #$db->db_disconnect(); } @@ -1335,7 +1361,7 @@ sub _calc_blocksize { my ($rowcount,$columncount,$multithreaded,$threadqueuelength) = @_; - if ($rowcount > $minblocksize) { + if (defined $rowcount and $rowcount > $minblocksize) { my $exp = int ( log ($rowcount) / log(10.0) ); my $blocksize = int ( 10 ** $exp ); @@ -1387,6 +1413,22 @@ sub _get_other_threads_state { return $result; } +sub is_shutdown { + my ($context) = @_; + my $errorstates; + my $state = 0; + $errorstates = $context->{errorstates} if $context; + if (defined $errorstates and ref $errorstates eq 'HASH') { + lock $errorstates; + foreach my $threadid (keys %$errorstates) { + $state |= $errorstates->{$threadid}; + } + } + return 1 if $state & $ERROR; + return 1 if $state & $STOP; + return 0; +} + sub _get_stop_consumer_thread { my ($context,$tid) = @_; my $result = 1; @@ -1444,10 +1486,11 @@ sub _reader { #yield(); sleep($thread_sleep_secs); } + my $last_i; my $i = 0; my $state = $RUNNING; #start at first while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0 and ($state & $STOP) == 0) { #as long there is one running consumer and no defunct consumer - fetching_rows($reader_db,$context->{tablename},$i,$context->{blocksize},$context->{rowcount},getlogger(__PACKAGE__)); + fetching_rows($reader_db,$context->{tablename},$i,$context->{blocksize},$context->{rowcount},getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i; $reader_db->db_get_begin($context->{selectstatement},$i,$context->{blocksize},@{$context->{values_ref}}) unless $reader_db->rowblock_transactional; my $rowblock = $reader_db->db_get_rowblock($context->{blocksize}); $reader_db->db_finish() unless $reader_db->rowblock_transactional; @@ -1472,10 +1515,12 @@ sub _reader { sleep($thread_sleep_secs); } $i += $realblocksize; - if ($realblocksize < $context->{blocksize}) { + if ($realblocksize < $context->{blocksize} and not $context->{loop}) { tablethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data) ...',getlogger(__PACKAGE__)); last; } + } elsif ($context->{loop}) { + sleep($loop_sleep_secs); } else { $context->{queue}->enqueue(\%packet); #$packet); tablethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data - empty block) ...',getlogger(__PACKAGE__)); @@ -1538,6 +1583,8 @@ sub _writer { $writer_db->db_finish(); $blockcount++; + } elsif ($context->{loop}) { + sleep($loop_sleep_secs); } else { #empty packet received tablethreadingdebug('[' . $tid . '] shutting down writer thread (end of data - empty block) ...',getlogger(__PACKAGE__)); last; @@ -1612,6 +1659,8 @@ sub _process { last; } + } elsif ($context->{loop}) { + sleep($loop_sleep_secs); } else { tablethreadingdebug('[' . $tid . '] shutting down processor thread (end of data - empty block) ...',getlogger(__PACKAGE__)); last;