MT#61121 support infinite processing of SQL tables

Change-Id: I129d9126729503f7679077182dfa81f0f166f754
(cherry picked from commit 5866b1bc8f)
mr13.0
Rene Krenn 7 months ago
parent bca5c8141d
commit 3fb6e95ed5

@ -357,6 +357,7 @@ sub process_unexported {
$alias,
$joins,
$conditions,
$loop,
#$sort,
$limit) = @params{qw/
process_code
@ -370,6 +371,7 @@ sub process_unexported {
alias
joins
conditions
loop
limit
/};
#sort
@ -436,6 +438,7 @@ sub process_unexported {
blocksize => $blocksize,
select => $select_stmt,
selectcount => $count_stmt,
loop => $loop,
);
}

@ -437,7 +437,7 @@ sub tabletransferstarted {
my ($db,$tablename,$target_db,$targettablename,$numofrows,$logger) = @_;
if (defined $logger) {
$logger->info(_getsqlconnectorinstanceprefix($db) . 'table transfer started: [' . $db->connectidentifier() . '].' . $tablename . ' > ' . $targettablename . ': ' . $numofrows . ' row(s)');
$logger->info(_getsqlconnectorinstanceprefix($db) . 'table transfer started: [' . $db->connectidentifier() . '].' . $tablename . ' > ' . $targettablename . (defined $numofrows ? ': ' . $numofrows . ' row(s)' : ''));
}
}
@ -446,7 +446,7 @@ sub tableprocessingstarted {
my ($db,$tablename,$numofrows,$logger) = @_;
if (defined $logger) {
$logger->info('table processing started: [' . $db->connectidentifier() . '].' . $tablename . ': ' . $numofrows . ' row(s)');
$logger->info('table processing started: [' . $db->connectidentifier() . '].' . $tablename . (defined $numofrows ? ': ' . $numofrows . ' row(s)' : ''));
}
}
@ -645,7 +645,7 @@ sub tabletransferdone {
my ($db,$tablename,$target_db,$targettablename,$numofrows,$logger) = @_;
if (defined $logger) {
$logger->info(_getsqlconnectorinstanceprefix($db) . 'table transfer done: [' . $db->connectidentifier() . '].' . $tablename . ' > ' . $targettablename . ': ' . $numofrows . ' row(s)');
$logger->info(_getsqlconnectorinstanceprefix($db) . 'table transfer done: [' . $db->connectidentifier() . '].' . $tablename . ' > ' . $targettablename . (defined $numofrows ? ': ' . $numofrows . ' row(s)' : ''));
}
}
@ -663,7 +663,7 @@ sub tableprocessingdone {
my ($db,$tablename,$numofrows,$logger) = @_;
if (defined $logger) {
$logger->info('table processing done: [' . $db->connectidentifier() . '].' . $tablename . ': ' . $numofrows . ' row(s)');
$logger->info('table processing done: [' . $db->connectidentifier() . '].' . $tablename . (defined $numofrows ? ': ' . $numofrows . ' row(s)' : ''));
}
}
@ -681,7 +681,7 @@ sub fetching_rows {
my ($db,$tablename,$start,$blocksize,$totalnumofrows,$logger) = @_;
if (defined $logger) {
$logger->info(_getsqlconnectorinstanceprefix($db) . 'fetching rows from [' . $db->connectidentifier() . '].' . $tablename . ': ' . ($start + 1) . '-' . ($start + $blocksize) . ' of ' . $totalnumofrows);
$logger->info(_getsqlconnectorinstanceprefix($db) . 'fetching rows from [' . $db->connectidentifier() . '].' . $tablename . ': ' . ($start + 1) . '-' . ($start + $blocksize) . (defined $totalnumofrows ? ' of ' . $totalnumofrows : ''));
}
}
@ -690,7 +690,7 @@ sub writing_rows {
my ($db,$tablename,$start,$blocksize,$totalnumofrows,$logger) = @_;
if (defined $logger) {
$logger->info(_getsqlconnectorinstanceprefix($db) . 'writing rows to ' . $tablename . ': ' . ($start + 1) . '-' . ($start + $blocksize) . ' of ' . $totalnumofrows);
$logger->info(_getsqlconnectorinstanceprefix($db) . 'writing rows to ' . $tablename . ': ' . ($start + 1) . '-' . ($start + $blocksize) . (defined $totalnumofrows ? ' of ' . $totalnumofrows : ''));
}
}
@ -699,7 +699,7 @@ sub processing_rows {
my ($context, $start,$blocksize,$totalnumofrows,$logger) = @_;
if (defined $logger) {
$logger->info(_processing_prefix($context) . 'processing rows: ' . ($start + 1) . '-' . ($start + $blocksize) . ' of ' . $totalnumofrows);
$logger->info(_processing_prefix($context) . 'processing rows: ' . ($start + 1) . '-' . ($start + $blocksize) . (defined $totalnumofrows ? ' of ' . $totalnumofrows : ''));
}
}

@ -955,7 +955,7 @@ sub db_get_rowblock {
dbdebug($self,'db_get_rowblock: ' . $self->{query} . "\nparameters:\n" . join(', ', @{$self->{params}}),getlogger(__PACKAGE__)) if $log_db_operations;
foreach (@{$self->{sth}->fetchall_arrayref(undef, $max_rows)}) {
foreach (@{$self->{sth}->fetchall_arrayref(undef, $max_rows) // []}) {
my @row : shared = @{$_};
push @rows, \@row;
}

@ -79,6 +79,7 @@ our @EXPORT_OK = qw(
delete_records
insert_stmt
is_shutdown
);
#transfer_record
#transfer_records
@ -103,6 +104,7 @@ my $reader_name = 'reader';
my $writer_name = 'writer';
my $thread_sleep_secs = 0.1;
my $loop_sleep_secs = 1;
my $RUNNING = 1;
my $COMPLETED = 2;
@ -753,6 +755,7 @@ sub transfer_table {
$destroy_target_dbs_code,
$selectcount,
$select,
$loop,
$values) = @params{qw/
get_db
class
@ -769,6 +772,7 @@ sub transfer_table {
destroy_target_dbs_code
selectcount
select
loop
values
/};
@ -790,15 +794,20 @@ sub transfer_table {
$countstatement = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename);
}
my $rowcount = $db->db_get_value($countstatement,@$values);
my $rowcount;
unless ($loop) {
$rowcount = $db->db_get_value($countstatement,@$values);
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
if ($rowcount > 0) {
tabletransferstarted($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
if ($rowcount > 0) {
tabletransferstarted($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
} else {
transferzerorowcount($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
return 1;
}
} else {
transferzerorowcount($db,$tablename,$target_db,$targettablename,$rowcount,getlogger(__PACKAGE__));
return 1;
tabletransferstarted($db,$tablename,$target_db,$targettablename,undef,getlogger(__PACKAGE__));
}
my $errorstate = $RUNNING; # 1;
@ -863,6 +872,7 @@ sub transfer_table {
selectstatement => $selectstatement,
blocksize => $blocksize,
rowcount => $rowcount,
loop => $loop,
#logger => $logger,
values_ref => $values,
destroy_dbs_code => $destroy_source_dbs_code,
@ -883,6 +893,7 @@ sub transfer_table {
insertstatement => $insertstatement,
blocksize => $blocksize,
rowcount => $rowcount,
loop => $loop,
#logger => $logger,
destroy_dbs_code => $destroy_target_dbs_code,
});
@ -935,9 +946,10 @@ sub transfer_table {
eval {
$db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename
my $last_i;
my $i = 0;
while (1) {
fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__));
fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i;
$db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional;
my $rowblock = $db->db_get_rowblock($blocksize);
$db->db_finish() unless $db->rowblock_transactional;
@ -955,10 +967,10 @@ sub transfer_table {
#undef $rowblock;
if ($realblocksize < $blocksize) {
last;
last unless $loop;
}
} else {
last;
last unless $loop;
}
}
$db->db_finish() if $db->rowblock_transactional;
@ -1064,6 +1076,7 @@ sub process_table {
$destroy_reader_dbs_code,
$selectcount,
$select,
$loop,
$values) = @params{qw/
get_db
name
@ -1079,6 +1092,7 @@ sub process_table {
destroy_reader_dbs_code
selectcount
select
loop
values
/};
@ -1100,13 +1114,20 @@ sub process_table {
$countstatement = 'SELECT COUNT(*) FROM ' . $db->tableidentifier($tablename);
}
my $rowcount = $db->db_get_value($countstatement,@$values);
my $rowcount;
unless ($loop) {
$rowcount = $db->db_get_value($countstatement,@$values);
if ($rowcount > 0) {
tableprocessingstarted($db,$tablename,$rowcount,getlogger(__PACKAGE__));
#my $targettablename = _gettargettablename($db,$tablename,$target_db); #$target_db->getsafetablename($db->tableidentifier($tablename));
if ($rowcount > 0) {
tableprocessingstarted($db,$tablename,$rowcount,getlogger(__PACKAGE__));
} else {
processzerorowcount($db,$tablename,$rowcount,getlogger(__PACKAGE__));
return 1;
}
} else {
processzerorowcount($db,$tablename,$rowcount,getlogger(__PACKAGE__));
return 1;
tableprocessingstarted($db,$tablename,undef,getlogger(__PACKAGE__));
}
my @fieldnames = @$expected_fieldnames;
@ -1160,6 +1181,7 @@ sub process_table {
selectstatement => $selectstatement,
blocksize => $blocksize,
rowcount => $rowcount,
loop => $loop,
#logger => $logger,
values_ref => $values,
destroy_dbs_code => $destroy_reader_dbs_code,
@ -1180,6 +1202,7 @@ sub process_table {
uninit_process_context_code => $uninit_process_context_code,
blocksize => $blocksize,
rowcount => $rowcount,
loop => $loop,
#logger => $logger,
}));
if (!defined $processor) {
@ -1265,9 +1288,10 @@ sub process_table {
$db->db_get_begin($selectstatement,@$values) if $db->rowblock_transactional; #$tablename
my $last_i;
my $i = 0;
while (1) {
fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__));
fetching_rows($db,$tablename,$i,$blocksize,$rowcount,getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i;
$db->db_get_begin($selectstatement,$i,$blocksize,@$values) unless $db->rowblock_transactional;
my $rowblock = $db->db_get_rowblock($blocksize);
if ('CODE' eq ref $read_code) {
@ -1285,18 +1309,20 @@ sub process_table {
#$target_db->db_finish();
$i += $realblocksize;
if ($realblocksize < $blocksize || not $rowblock_result) {
last;
if (not $rowblock_result) {
last;
} elsif ($realblocksize < $blocksize) {
last unless $loop;
}
} else {
last;
last unless $loop;
}
}
$db->db_finish() if $db->rowblock_transactional;
};
print $@;
#print $@;
if ($@) {
$errorstate = $ERROR;
} else {
@ -1320,7 +1346,7 @@ print $@;
#$db->db_disconnect();
return 1;
} else {
print "errorstate: $errorstate \n";
#print "errorstate: $errorstate \n";
tableprocessingfailed($db,$tablename,$rowcount,getlogger(__PACKAGE__));
#$db->db_disconnect();
}
@ -1335,7 +1361,7 @@ sub _calc_blocksize {
my ($rowcount,$columncount,$multithreaded,$threadqueuelength) = @_;
if ($rowcount > $minblocksize) {
if (defined $rowcount and $rowcount > $minblocksize) {
my $exp = int ( log ($rowcount) / log(10.0) );
my $blocksize = int ( 10 ** $exp );
@ -1387,6 +1413,22 @@ sub _get_other_threads_state {
return $result;
}
sub is_shutdown {
my ($context) = @_;
my $errorstates;
my $state = 0;
$errorstates = $context->{errorstates} if $context;
if (defined $errorstates and ref $errorstates eq 'HASH') {
lock $errorstates;
foreach my $threadid (keys %$errorstates) {
$state |= $errorstates->{$threadid};
}
}
return 1 if $state & $ERROR;
return 1 if $state & $STOP;
return 0;
}
sub _get_stop_consumer_thread {
my ($context,$tid) = @_;
my $result = 1;
@ -1444,10 +1486,11 @@ sub _reader {
#yield();
sleep($thread_sleep_secs);
}
my $last_i;
my $i = 0;
my $state = $RUNNING; #start at first
while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0 and ($state & $STOP) == 0) { #as long there is one running consumer and no defunct consumer
fetching_rows($reader_db,$context->{tablename},$i,$context->{blocksize},$context->{rowcount},getlogger(__PACKAGE__));
fetching_rows($reader_db,$context->{tablename},$i,$context->{blocksize},$context->{rowcount},getlogger(__PACKAGE__)) if (not defined $last_i or $i != $last_i); $last_i = $i;
$reader_db->db_get_begin($context->{selectstatement},$i,$context->{blocksize},@{$context->{values_ref}}) unless $reader_db->rowblock_transactional;
my $rowblock = $reader_db->db_get_rowblock($context->{blocksize});
$reader_db->db_finish() unless $reader_db->rowblock_transactional;
@ -1472,10 +1515,12 @@ sub _reader {
sleep($thread_sleep_secs);
}
$i += $realblocksize;
if ($realblocksize < $context->{blocksize}) {
if ($realblocksize < $context->{blocksize} and not $context->{loop}) {
tablethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data) ...',getlogger(__PACKAGE__));
last;
}
} elsif ($context->{loop}) {
sleep($loop_sleep_secs);
} else {
$context->{queue}->enqueue(\%packet); #$packet);
tablethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data - empty block) ...',getlogger(__PACKAGE__));
@ -1538,6 +1583,8 @@ sub _writer {
$writer_db->db_finish();
$blockcount++;
} elsif ($context->{loop}) {
sleep($loop_sleep_secs);
} else { #empty packet received
tablethreadingdebug('[' . $tid . '] shutting down writer thread (end of data - empty block) ...',getlogger(__PACKAGE__));
last;
@ -1612,6 +1659,8 @@ sub _process {
last;
}
} elsif ($context->{loop}) {
sleep($loop_sleep_secs);
} else {
tablethreadingdebug('[' . $tid . '] shutting down processor thread (end of data - empty block) ...',getlogger(__PACKAGE__));
last;

Loading…
Cancel
Save