|
|
|
|
@ -8,6 +8,7 @@ use threads::shared qw(shared_clone);
|
|
|
|
|
use Thread::Queue;
|
|
|
|
|
|
|
|
|
|
use Time::HiRes qw(sleep);
|
|
|
|
|
use File::Basename qw(basename);
|
|
|
|
|
|
|
|
|
|
use NGCP::BulkProcessor::Globals qw(
|
|
|
|
|
$enablemultithreading
|
|
|
|
|
@ -27,6 +28,7 @@ use NGCP::BulkProcessor::LogError qw(
|
|
|
|
|
fileprocessingfailed
|
|
|
|
|
fileerror
|
|
|
|
|
notimplementederror
|
|
|
|
|
fileprocessingwarn
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
use NGCP::BulkProcessor::Utils qw(threadid);
|
|
|
|
|
@ -104,16 +106,16 @@ sub process {
|
|
|
|
|
multithreading
|
|
|
|
|
/};
|
|
|
|
|
#my ($file,$process_code,$init_process_context_code,$uninit_process_context_code,$multithreading) = @_;
|
|
|
|
|
my $files_code = $file;
|
|
|
|
|
my $single_file = 1;
|
|
|
|
|
$single_file = 0 if ('CODE' eq ref $file);
|
|
|
|
|
$files_code = sub {
|
|
|
|
|
my $cb = shift;
|
|
|
|
|
$cb->($file);
|
|
|
|
|
} if $single_file;
|
|
|
|
|
|
|
|
|
|
if (ref $process_code eq 'CODE') {
|
|
|
|
|
|
|
|
|
|
if (-s $file > 0) {
|
|
|
|
|
fileprocessingstarted($file,getlogger(__PACKAGE__));
|
|
|
|
|
} else {
|
|
|
|
|
processzerofilesize($file,getlogger(__PACKAGE__));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
my $errorstate = $RUNNING;
|
|
|
|
|
my $tid = threadid();
|
|
|
|
|
|
|
|
|
|
@ -130,21 +132,21 @@ sub process {
|
|
|
|
|
{ queue => $queue,
|
|
|
|
|
errorstates => \%errorstates,
|
|
|
|
|
instance => $self,
|
|
|
|
|
filename => $file,
|
|
|
|
|
files_code => $files_code,
|
|
|
|
|
single_file => $single_file,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
for (my $i = 0; $i < $self->{numofthreads}; $i++) {
|
|
|
|
|
filethreadingdebug('starting processor thread ' . ($i + 1) . ' of ' . $self->{numofthreads},getlogger(__PACKAGE__));
|
|
|
|
|
my $processor = threads->create(\&_process,
|
|
|
|
|
create_process_context($static_context,
|
|
|
|
|
{ queue => $queue,
|
|
|
|
|
errorstates => \%errorstates,
|
|
|
|
|
readertid => $reader->tid(),
|
|
|
|
|
filename => $file,
|
|
|
|
|
process_code => $process_code,
|
|
|
|
|
init_process_context_code => $init_process_context_code,
|
|
|
|
|
{ queue => $queue,
|
|
|
|
|
errorstates => \%errorstates,
|
|
|
|
|
readertid => $reader->tid(),
|
|
|
|
|
process_code => $process_code,
|
|
|
|
|
init_process_context_code => $init_process_context_code,
|
|
|
|
|
uninit_process_context_code => $uninit_process_context_code,
|
|
|
|
|
instance => $self,
|
|
|
|
|
instance => $self,
|
|
|
|
|
}));
|
|
|
|
|
if (!defined $processor) {
|
|
|
|
|
filethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $self->{numofthreads} . ' NOT started',getlogger(__PACKAGE__));
|
|
|
|
|
@ -170,10 +172,10 @@ sub process {
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
|
|
my $context = create_process_context($static_context,{ instance => $self,
|
|
|
|
|
filename => $file,
|
|
|
|
|
tid => $tid,
|
|
|
|
|
});
|
|
|
|
|
my $rowblock_result = 1;
|
|
|
|
|
my $filename;
|
|
|
|
|
eval {
|
|
|
|
|
|
|
|
|
|
my $init_reader_context_code = $self->can('init_reader_context');
|
|
|
|
|
@ -195,89 +197,107 @@ sub process {
|
|
|
|
|
if (!defined $extractfields_code) {
|
|
|
|
|
notimplementederror((ref $self) . ': ' . 'extractfields class method not implemented',getlogger(__PACKAGE__));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
local *INPUTFILE;
|
|
|
|
|
if (not open (INPUTFILE, '<:encoding(' . $self->{encoding} . ')', $file)) {
|
|
|
|
|
fileerror('processing file - cannot open file ' . $file . ': ' . $!,getlogger(__PACKAGE__));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
binmode INPUTFILE;
|
|
|
|
|
|
|
|
|
|
my $buffer = undef;
|
|
|
|
|
my $chunk = undef;
|
|
|
|
|
my $n = 0;
|
|
|
|
|
$context->{charsread} = 0;
|
|
|
|
|
$context->{linesread} = 0;
|
|
|
|
|
|
|
|
|
|
my $i = 0;
|
|
|
|
|
while (1) {
|
|
|
|
|
#fetching_lines($file,$i,$self->{blocksize},undef,getlogger(__PACKAGE__));
|
|
|
|
|
my $block_n = 0;
|
|
|
|
|
my @lines = ();
|
|
|
|
|
while ((scalar @lines) < $self->{blocksize} and defined ($n = read(INPUTFILE,$chunk,$self->{buffersize})) and $n != 0) {
|
|
|
|
|
if (defined $buffer) {
|
|
|
|
|
$buffer .= $chunk;
|
|
|
|
|
|
|
|
|
|
$files_code->(sub {
|
|
|
|
|
my $filename = shift;
|
|
|
|
|
if (-s $filename > 0) {
|
|
|
|
|
fileprocessingstarted($filename,getlogger(__PACKAGE__));
|
|
|
|
|
} else {
|
|
|
|
|
if ($single_file) {
|
|
|
|
|
processzerofilesize($filename,getlogger(__PACKAGE__));
|
|
|
|
|
} else {
|
|
|
|
|
$buffer = $chunk;
|
|
|
|
|
fileprocessingwarn($filename,basename($filename) . ' ' . (-e $filename ? 'has 0 bytes' : 'not found'),getlogger(__PACKAGE__));
|
|
|
|
|
}
|
|
|
|
|
$context->{charsread} += $n;
|
|
|
|
|
$block_n += $n;
|
|
|
|
|
last unless &$extractlines_code($context,\$buffer,\@lines);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
lines_read($file,$i,$self->{blocksize},$block_n,getlogger(__PACKAGE__));
|
|
|
|
|
|
|
|
|
|
if (not defined $n) {
|
|
|
|
|
fileerror('processing file - error reading file ' . $file . ': ' . $!,getlogger(__PACKAGE__));
|
|
|
|
|
close(INPUTFILE);
|
|
|
|
|
last;
|
|
|
|
|
} else {
|
|
|
|
|
if ($n == 0 && defined $buffer) {
|
|
|
|
|
push(@lines,$buffer);
|
|
|
|
|
}
|
|
|
|
|
my @rowblock = ();
|
|
|
|
|
foreach my $line (@lines) {
|
|
|
|
|
$context->{linesread} += 1;
|
|
|
|
|
my $row = &$extractfields_code($context,(ref $line ? $line : \$line));
|
|
|
|
|
push(@rowblock,$row) if defined $row;
|
|
|
|
|
local *INPUTFILE;
|
|
|
|
|
if (not open (INPUTFILE, '<:encoding(' . $self->{encoding} . ')', $filename)) {
|
|
|
|
|
fileerror('processing file - cannot open file ' . $filename . ': ' . $!,getlogger(__PACKAGE__));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
binmode INPUTFILE;
|
|
|
|
|
|
|
|
|
|
my $buffer = undef;
|
|
|
|
|
my $chunk = undef;
|
|
|
|
|
my $n = 0;
|
|
|
|
|
$context->{charsread} = 0;
|
|
|
|
|
$context->{linesread} = 0;
|
|
|
|
|
|
|
|
|
|
my $i = 0;
|
|
|
|
|
while (1) {
|
|
|
|
|
#fetching_lines($file,$i,$self->{blocksize},undef,getlogger(__PACKAGE__));
|
|
|
|
|
my $block_n = 0;
|
|
|
|
|
my @lines = ();
|
|
|
|
|
while ((scalar @lines) < $self->{blocksize} and defined ($n = read(INPUTFILE,$chunk,$self->{buffersize})) and $n != 0) {
|
|
|
|
|
if (defined $buffer) {
|
|
|
|
|
$buffer .= $chunk;
|
|
|
|
|
} else {
|
|
|
|
|
$buffer = $chunk;
|
|
|
|
|
}
|
|
|
|
|
$context->{charsread} += $n;
|
|
|
|
|
$block_n += $n;
|
|
|
|
|
last unless &$extractlines_code($context,\$buffer,\@lines);
|
|
|
|
|
}
|
|
|
|
|
my $realblocksize = scalar @rowblock;
|
|
|
|
|
if ($realblocksize > 0) {
|
|
|
|
|
processing_lines($tid,$i,$realblocksize,undef,getlogger(__PACKAGE__));
|
|
|
|
|
#processing_rows($tid,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__));
|
|
|
|
|
|
|
|
|
|
$rowblock_result = &$process_code($context,\@rowblock,$i);
|
|
|
|
|
|
|
|
|
|
$i += $realblocksize;
|
|
|
|
|
if ($n == 0 || not $rowblock_result) {
|
|
|
|
|
lines_read($filename,$i,$self->{blocksize},$block_n,getlogger(__PACKAGE__));
|
|
|
|
|
|
|
|
|
|
if (not defined $n) {
|
|
|
|
|
fileerror('processing file - error reading file ' . $filename . ': ' . $!,getlogger(__PACKAGE__));
|
|
|
|
|
close(INPUTFILE);
|
|
|
|
|
last;
|
|
|
|
|
} else {
|
|
|
|
|
if ($n == 0 && defined $buffer) {
|
|
|
|
|
push(@lines,$buffer);
|
|
|
|
|
}
|
|
|
|
|
my @rowblock = ();
|
|
|
|
|
foreach my $line (@lines) {
|
|
|
|
|
$context->{linesread} += 1;
|
|
|
|
|
my $row = &$extractfields_code($context,(ref $line ? $line : \$line));
|
|
|
|
|
push(@rowblock,$row) if defined $row;
|
|
|
|
|
}
|
|
|
|
|
my $realblocksize = scalar @rowblock;
|
|
|
|
|
if ($realblocksize > 0) {
|
|
|
|
|
processing_lines($tid,$i,$realblocksize,undef,getlogger(__PACKAGE__));
|
|
|
|
|
#processing_rows($tid,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__));
|
|
|
|
|
|
|
|
|
|
$rowblock_result = &$process_code($context,\@rowblock,$i);
|
|
|
|
|
|
|
|
|
|
$i += $realblocksize;
|
|
|
|
|
if ($n == 0 || not $rowblock_result) {
|
|
|
|
|
last;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
last;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
last;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
close(INPUTFILE);
|
|
|
|
|
close(INPUTFILE);
|
|
|
|
|
|
|
|
|
|
fileprocessingdone($filename,getlogger(__PACKAGE__));
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if ($@) {
|
|
|
|
|
$errorstate = $ERROR;
|
|
|
|
|
} else {
|
|
|
|
|
$errorstate = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
eval {
|
|
|
|
|
if (defined $uninit_process_context_code and 'CODE' eq ref $uninit_process_context_code) {
|
|
|
|
|
&$uninit_process_context_code($context);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
unless ($errorstate == $COMPLETED) {
|
|
|
|
|
fileprocessingfailed($filename,getlogger(__PACKAGE__));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if ($errorstate == $COMPLETED) {
|
|
|
|
|
fileprocessingdone($file,getlogger(__PACKAGE__));
|
|
|
|
|
return 1;
|
|
|
|
|
} else {
|
|
|
|
|
fileprocessingfailed($file,getlogger(__PACKAGE__));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
@ -300,6 +320,7 @@ sub _reader {
|
|
|
|
|
filethreadingdebug('[' . $tid . '] reader thread tid ' . $tid . ' started',getlogger(__PACKAGE__));
|
|
|
|
|
|
|
|
|
|
my $blockcount = 0;
|
|
|
|
|
my $filename;
|
|
|
|
|
eval {
|
|
|
|
|
|
|
|
|
|
my $init_reader_context_code = $context->{instance}->can('init_reader_context');
|
|
|
|
|
@ -320,97 +341,115 @@ sub _reader {
|
|
|
|
|
notimplementederror((ref $context->{instance}) . ': ' . 'extractfields class method not implemented',getlogger(__PACKAGE__));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
local *INPUTFILE_READER;
|
|
|
|
|
if (not open (INPUTFILE_READER, '<:encoding(' . $context->{instance}->{encoding} . ')', $context->{filename})) {
|
|
|
|
|
fileerror('processing file - cannot open file ' . $context->{filename} . ': ' . $!,getlogger(__PACKAGE__));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
binmode INPUTFILE_READER;
|
|
|
|
|
|
|
|
|
|
filethreadingdebug('[' . $tid . '] reader thread waiting for consumer threads',getlogger(__PACKAGE__));
|
|
|
|
|
while ((_get_other_threads_state($context->{errorstates},$tid) & $RUNNING) == 0) { #wait on cosumers to come up
|
|
|
|
|
#yield();
|
|
|
|
|
sleep($thread_sleep_secs);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
my $buffer = undef;
|
|
|
|
|
my $chunk = undef;
|
|
|
|
|
my $n = 0;
|
|
|
|
|
$context->{charsread} = 0;
|
|
|
|
|
$context->{linesread} = 0;
|
|
|
|
|
|
|
|
|
|
my $i = 0;
|
|
|
|
|
my $state = $RUNNING; #start at first
|
|
|
|
|
while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0) { #as long there is one running consumer and no defunct consumer
|
|
|
|
|
#fetching_lines($context->{filename},$i,$context->{instance}->{blocksize},undef,getlogger(__PACKAGE__));
|
|
|
|
|
my $block_n = 0;
|
|
|
|
|
my @lines = ();
|
|
|
|
|
while ((scalar @lines) < $context->{instance}->{blocksize} and defined ($n = read(INPUTFILE_READER,$chunk,$context->{instance}->{buffersize})) and $n != 0) {
|
|
|
|
|
if (defined $buffer) {
|
|
|
|
|
$buffer .= $chunk;
|
|
|
|
|
$context->{files_code}->(sub {
|
|
|
|
|
my $filename = shift;
|
|
|
|
|
if (-s $filename > 0) {
|
|
|
|
|
fileprocessingstarted($filename,getlogger(__PACKAGE__));
|
|
|
|
|
} else {
|
|
|
|
|
if ($context->{single_file}) {
|
|
|
|
|
processzerofilesize($filename,getlogger(__PACKAGE__));
|
|
|
|
|
} else {
|
|
|
|
|
$buffer = $chunk;
|
|
|
|
|
fileprocessingwarn($filename,basename($filename) . ' ' . (-e $filename ? 'has 0 bytes' : 'not found'),getlogger(__PACKAGE__));
|
|
|
|
|
}
|
|
|
|
|
$context->{charsread} += 1;
|
|
|
|
|
$block_n += $n;
|
|
|
|
|
last unless &$extractlines_code($context,\$buffer,\@lines);
|
|
|
|
|
yield();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
lines_read($context->{filename},$i,$context->{instance}->{blocksize},$block_n,getlogger(__PACKAGE__));
|
|
|
|
|
if (not defined $n) {
|
|
|
|
|
fileerror('processing file - error reading file ' . $context->{filename} . ': ' . $!,getlogger(__PACKAGE__));
|
|
|
|
|
close(INPUTFILE_READER);
|
|
|
|
|
last;
|
|
|
|
|
} else {
|
|
|
|
|
if ($n == 0 && defined $buffer) {
|
|
|
|
|
push(@lines,$buffer);
|
|
|
|
|
}
|
|
|
|
|
my @rowblock :shared = ();
|
|
|
|
|
foreach my $line (@lines) {
|
|
|
|
|
$context->{linesread} += 1;
|
|
|
|
|
my $row = &$extractfields_code($context,(ref $line ? $line : \$line));
|
|
|
|
|
push(@rowblock,shared_clone($row)) if defined $row;
|
|
|
|
|
|
|
|
|
|
local *INPUTFILE_READER;
|
|
|
|
|
if (not open (INPUTFILE_READER, '<:encoding(' . $context->{instance}->{encoding} . ')', $filename)) {
|
|
|
|
|
fileerror('processing file - cannot open file ' . $filename . ': ' . $!,getlogger(__PACKAGE__));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
binmode INPUTFILE_READER;
|
|
|
|
|
|
|
|
|
|
filethreadingdebug('[' . $tid . '] reader thread waiting for consumer threads',getlogger(__PACKAGE__));
|
|
|
|
|
while ((_get_other_threads_state($context->{errorstates},$tid) & $RUNNING) == 0) { #wait on cosumers to come up
|
|
|
|
|
#yield();
|
|
|
|
|
sleep($thread_sleep_secs);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
my $buffer = undef;
|
|
|
|
|
my $chunk = undef;
|
|
|
|
|
my $n = 0;
|
|
|
|
|
$context->{charsread} = 0;
|
|
|
|
|
$context->{linesread} = 0;
|
|
|
|
|
|
|
|
|
|
my $i = 0;
|
|
|
|
|
my $state = $RUNNING; #start at first
|
|
|
|
|
while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0) { #as long there is one running consumer and no defunct consumer
|
|
|
|
|
#fetching_lines($context->{filename},$i,$context->{instance}->{blocksize},undef,getlogger(__PACKAGE__));
|
|
|
|
|
my $block_n = 0;
|
|
|
|
|
my @lines = ();
|
|
|
|
|
while ((scalar @lines) < $context->{instance}->{blocksize} and defined ($n = read(INPUTFILE_READER,$chunk,$context->{instance}->{buffersize})) and $n != 0) {
|
|
|
|
|
if (defined $buffer) {
|
|
|
|
|
$buffer .= $chunk;
|
|
|
|
|
} else {
|
|
|
|
|
$buffer = $chunk;
|
|
|
|
|
}
|
|
|
|
|
$context->{charsread} += 1;
|
|
|
|
|
$block_n += $n;
|
|
|
|
|
last unless &$extractlines_code($context,\$buffer,\@lines);
|
|
|
|
|
yield();
|
|
|
|
|
}
|
|
|
|
|
my $realblocksize = scalar @rowblock;
|
|
|
|
|
my %packet :shared = ();
|
|
|
|
|
$packet{rows} = \@rowblock;
|
|
|
|
|
$packet{size} = $realblocksize;
|
|
|
|
|
$packet{row_offset} = $i;
|
|
|
|
|
$packet{block_n} = $block_n;
|
|
|
|
|
if ($realblocksize > 0) {
|
|
|
|
|
$context->{queue}->enqueue(\%packet); #$packet);
|
|
|
|
|
$blockcount++;
|
|
|
|
|
#wait if thequeue is full and there there is one running consumer
|
|
|
|
|
while (((($state = _get_other_threads_state($context->{errorstates},$tid)) & $RUNNING) == $RUNNING) and $context->{queue}->pending() >= $context->{instance}->{threadqueuelength}) {
|
|
|
|
|
#yield();
|
|
|
|
|
sleep($thread_sleep_secs);
|
|
|
|
|
lines_read($filename,$i,$context->{instance}->{blocksize},$block_n,getlogger(__PACKAGE__));
|
|
|
|
|
if (not defined $n) {
|
|
|
|
|
fileerror('processing file - error reading file ' . $filename . ': ' . $!,getlogger(__PACKAGE__));
|
|
|
|
|
close(INPUTFILE_READER);
|
|
|
|
|
last;
|
|
|
|
|
} else {
|
|
|
|
|
if ($n == 0 && defined $buffer) {
|
|
|
|
|
push(@lines,$buffer);
|
|
|
|
|
}
|
|
|
|
|
my @rowblock :shared = ();
|
|
|
|
|
foreach my $line (@lines) {
|
|
|
|
|
$context->{linesread} += 1;
|
|
|
|
|
my $row = &$extractfields_code($context,(ref $line ? $line : \$line));
|
|
|
|
|
push(@rowblock,shared_clone($row)) if defined $row;
|
|
|
|
|
yield();
|
|
|
|
|
}
|
|
|
|
|
$i += $realblocksize;
|
|
|
|
|
if ($n == 0) {
|
|
|
|
|
filethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data) ...',getlogger(__PACKAGE__));
|
|
|
|
|
my $realblocksize = scalar @rowblock;
|
|
|
|
|
my %packet :shared = ();
|
|
|
|
|
$packet{rows} = \@rowblock;
|
|
|
|
|
$packet{size} = $realblocksize;
|
|
|
|
|
$packet{row_offset} = $i;
|
|
|
|
|
$packet{block_n} = $block_n;
|
|
|
|
|
if ($realblocksize > 0) {
|
|
|
|
|
$context->{queue}->enqueue(\%packet); #$packet);
|
|
|
|
|
$blockcount++;
|
|
|
|
|
#wait if thequeue is full and there there is one running consumer
|
|
|
|
|
while (((($state = _get_other_threads_state($context->{errorstates},$tid)) & $RUNNING) == $RUNNING) and $context->{queue}->pending() >= $context->{instance}->{threadqueuelength}) {
|
|
|
|
|
#yield();
|
|
|
|
|
sleep($thread_sleep_secs);
|
|
|
|
|
}
|
|
|
|
|
$i += $realblocksize;
|
|
|
|
|
if ($n == 0) {
|
|
|
|
|
filethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data) ...',getlogger(__PACKAGE__));
|
|
|
|
|
last;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
$context->{queue}->enqueue(\%packet); #$packet);
|
|
|
|
|
filethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data - empty block) ...',getlogger(__PACKAGE__));
|
|
|
|
|
last;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
$context->{queue}->enqueue(\%packet); #$packet);
|
|
|
|
|
filethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data - empty block) ...',getlogger(__PACKAGE__));
|
|
|
|
|
last;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (not (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0)) {
|
|
|
|
|
filethreadingdebug('[' . $tid . '] reader thread is shutting down (' .
|
|
|
|
|
(($state & $RUNNING) == $RUNNING ? 'still running consumer threads' : 'no running consumer threads') . ', ' .
|
|
|
|
|
(($state & $ERROR) == 0 ? 'no defunct thread(s)' : 'defunct thread(s)') . ') ...'
|
|
|
|
|
,getlogger(__PACKAGE__));
|
|
|
|
|
}
|
|
|
|
|
close(INPUTFILE_READER);
|
|
|
|
|
if (not (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0)) {
|
|
|
|
|
filethreadingdebug('[' . $tid . '] reader thread is shutting down (' .
|
|
|
|
|
(($state & $RUNNING) == $RUNNING ? 'still running consumer threads' : 'no running consumer threads') . ', ' .
|
|
|
|
|
(($state & $ERROR) == 0 ? 'no defunct thread(s)' : 'defunct thread(s)') . ') ...'
|
|
|
|
|
,getlogger(__PACKAGE__));
|
|
|
|
|
}
|
|
|
|
|
close(INPUTFILE_READER);
|
|
|
|
|
|
|
|
|
|
fileprocessingdone($filename,getlogger(__PACKAGE__));
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
filethreadingdebug($@ ? '[' . $tid . '] reader thread error: ' . $@ : '[' . $tid . '] reader thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__));
|
|
|
|
|
lock $context->{errorstates};
|
|
|
|
|
if ($@) {
|
|
|
|
|
$context->{errorstates}->{$tid} = $ERROR;
|
|
|
|
|
fileprocessingfailed($filename,getlogger(__PACKAGE__));
|
|
|
|
|
} else {
|
|
|
|
|
$context->{errorstates}->{$tid} = $COMPLETED;
|
|
|
|
|
}
|
|
|
|
|
|