From 3b40eb00ee3b66ffbe50d7cee7dd458520fbe1d3 Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Thu, 3 Feb 2022 13:50:24 +0100 Subject: [PATCH] TT#161550 file processors: support processing directory structures Change-Id: Ica509e95596d3b5f13486087ba47c190bab2e4db (cherry picked from commit 721013510951e3c0898fd4aac4083b2a807a49a9) --- .../BulkProcessor/Dao/Trunk/accounting/cdr.pm | 41 +++ lib/NGCP/BulkProcessor/FileProcessor.pm | 339 ++++++++++-------- 2 files changed, 230 insertions(+), 150 deletions(-) diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm index e9ee43a..5eb86e1 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm @@ -50,6 +50,8 @@ our @EXPORT_OK = qw( $CFU_CALL_TYPE $CFB_CALL_TYPE + findby_id + get_cdrid_range ); #process_records #delete_ids @@ -177,6 +179,39 @@ sub new { } +sub findby_id { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive)->[0]; + +} + +sub get_cdrid_range { + + my ($id,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT min(id),max(id) FROM ' . $table . ' WHERE ' . + $db->columnidentifier('id') . ' = ?'; + my @params = ($id); + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return $rows; + +} sub delete_callids { @@ -544,6 +579,12 @@ sub gettablename { } +sub settablename { + + $tablename = shift; + +} + sub check_table { return checktableinfo($get_db, diff --git a/lib/NGCP/BulkProcessor/FileProcessor.pm b/lib/NGCP/BulkProcessor/FileProcessor.pm index ee98abf..9bdb4d1 100644 --- a/lib/NGCP/BulkProcessor/FileProcessor.pm +++ b/lib/NGCP/BulkProcessor/FileProcessor.pm @@ -8,6 +8,7 @@ use threads::shared qw(shared_clone); use Thread::Queue; use Time::HiRes qw(sleep); +use File::Basename qw(basename); use NGCP::BulkProcessor::Globals qw( $enablemultithreading @@ -27,6 +28,7 @@ use NGCP::BulkProcessor::LogError qw( fileprocessingfailed fileerror notimplementederror + fileprocessingwarn ); use NGCP::BulkProcessor::Utils qw(threadid); @@ -104,16 +106,16 @@ sub process { multithreading /}; #my ($file,$process_code,$init_process_context_code,$uninit_process_context_code,$multithreading) = @_; + my $files_code = $file; + my $single_file = 1; + $single_file = 0 if ('CODE' eq ref $file); + $files_code = sub { + my $cb = shift; + $cb->($file); + } if $single_file; if (ref $process_code eq 'CODE') { - if (-s $file > 0) { - fileprocessingstarted($file,getlogger(__PACKAGE__)); - } else { - processzerofilesize($file,getlogger(__PACKAGE__)); - return; - } - my $errorstate = $RUNNING; my $tid = threadid(); @@ -130,21 +132,21 @@ sub process { { queue => $queue, errorstates => \%errorstates, instance => $self, - filename => $file, + files_code => $files_code, + single_file => $single_file, }); for (my $i = 0; $i < $self->{numofthreads}; $i++) { filethreadingdebug('starting processor thread ' . ($i + 1) . ' of ' . $self->{numofthreads},getlogger(__PACKAGE__)); my $processor = threads->create(\&_process, create_process_context($static_context, - { queue => $queue, - errorstates => \%errorstates, - readertid => $reader->tid(), - filename => $file, - process_code => $process_code, - init_process_context_code => $init_process_context_code, + { queue => $queue, + errorstates => \%errorstates, + readertid => $reader->tid(), + process_code => $process_code, + init_process_context_code => $init_process_context_code, uninit_process_context_code => $uninit_process_context_code, - instance => $self, + instance => $self, })); if (!defined $processor) { filethreadingdebug('processor thread ' . ($i + 1) . ' of ' . $self->{numofthreads} . ' NOT started',getlogger(__PACKAGE__)); @@ -170,10 +172,10 @@ sub process { } else { my $context = create_process_context($static_context,{ instance => $self, - filename => $file, tid => $tid, }); my $rowblock_result = 1; + my $filename; eval { my $init_reader_context_code = $self->can('init_reader_context'); @@ -195,89 +197,107 @@ sub process { if (!defined $extractfields_code) { notimplementederror((ref $self) . ': ' . 'extractfields class method not implemented',getlogger(__PACKAGE__)); } - - local *INPUTFILE; - if (not open (INPUTFILE, '<:encoding(' . $self->{encoding} . ')', $file)) { - fileerror('processing file - cannot open file ' . $file . ': ' . $!,getlogger(__PACKAGE__)); - return; - } - binmode INPUTFILE; - - my $buffer = undef; - my $chunk = undef; - my $n = 0; - $context->{charsread} = 0; - $context->{linesread} = 0; - - my $i = 0; - while (1) { - #fetching_lines($file,$i,$self->{blocksize},undef,getlogger(__PACKAGE__)); - my $block_n = 0; - my @lines = (); - while ((scalar @lines) < $self->{blocksize} and defined ($n = read(INPUTFILE,$chunk,$self->{buffersize})) and $n != 0) { - if (defined $buffer) { - $buffer .= $chunk; + + $files_code->(sub { + my $filename = shift; + if (-s $filename > 0) { + fileprocessingstarted($filename,getlogger(__PACKAGE__)); + } else { + if ($single_file) { + processzerofilesize($filename,getlogger(__PACKAGE__)); } else { - $buffer = $chunk; + fileprocessingwarn($filename,basename($filename) . ' ' . (-e $filename ? 'has 0 bytes' : 'not found'),getlogger(__PACKAGE__)); } - $context->{charsread} += $n; - $block_n += $n; - last unless &$extractlines_code($context,\$buffer,\@lines); + return; } - lines_read($file,$i,$self->{blocksize},$block_n,getlogger(__PACKAGE__)); - if (not defined $n) { - fileerror('processing file - error reading file ' . $file . ': ' . $!,getlogger(__PACKAGE__)); - close(INPUTFILE); - last; - } else { - if ($n == 0 && defined $buffer) { - push(@lines,$buffer); - } - my @rowblock = (); - foreach my $line (@lines) { - $context->{linesread} += 1; - my $row = &$extractfields_code($context,(ref $line ? $line : \$line)); - push(@rowblock,$row) if defined $row; + local *INPUTFILE; + if (not open (INPUTFILE, '<:encoding(' . $self->{encoding} . ')', $filename)) { + fileerror('processing file - cannot open file ' . $filename . ': ' . $!,getlogger(__PACKAGE__)); + return; + } + binmode INPUTFILE; + + my $buffer = undef; + my $chunk = undef; + my $n = 0; + $context->{charsread} = 0; + $context->{linesread} = 0; + + my $i = 0; + while (1) { + #fetching_lines($file,$i,$self->{blocksize},undef,getlogger(__PACKAGE__)); + my $block_n = 0; + my @lines = (); + while ((scalar @lines) < $self->{blocksize} and defined ($n = read(INPUTFILE,$chunk,$self->{buffersize})) and $n != 0) { + if (defined $buffer) { + $buffer .= $chunk; + } else { + $buffer = $chunk; + } + $context->{charsread} += $n; + $block_n += $n; + last unless &$extractlines_code($context,\$buffer,\@lines); } - my $realblocksize = scalar @rowblock; - if ($realblocksize > 0) { - processing_lines($tid,$i,$realblocksize,undef,getlogger(__PACKAGE__)); - #processing_rows($tid,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__)); - - $rowblock_result = &$process_code($context,\@rowblock,$i); - - $i += $realblocksize; - if ($n == 0 || not $rowblock_result) { + lines_read($filename,$i,$self->{blocksize},$block_n,getlogger(__PACKAGE__)); + + if (not defined $n) { + fileerror('processing file - error reading file ' . $filename . ': ' . $!,getlogger(__PACKAGE__)); + close(INPUTFILE); + last; + } else { + if ($n == 0 && defined $buffer) { + push(@lines,$buffer); + } + my @rowblock = (); + foreach my $line (@lines) { + $context->{linesread} += 1; + my $row = &$extractfields_code($context,(ref $line ? $line : \$line)); + push(@rowblock,$row) if defined $row; + } + my $realblocksize = scalar @rowblock; + if ($realblocksize > 0) { + processing_lines($tid,$i,$realblocksize,undef,getlogger(__PACKAGE__)); + #processing_rows($tid,$i,$realblocksize,$rowcount,getlogger(__PACKAGE__)); + + $rowblock_result = &$process_code($context,\@rowblock,$i); + + $i += $realblocksize; + if ($n == 0 || not $rowblock_result) { + last; + } + } else { last; } - } else { - last; } } - } - close(INPUTFILE); + close(INPUTFILE); + + fileprocessingdone($filename,getlogger(__PACKAGE__)); + + }); }; - + if ($@) { $errorstate = $ERROR; } else { $errorstate = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED; } - + eval { if (defined $uninit_process_context_code and 'CODE' eq ref $uninit_process_context_code) { &$uninit_process_context_code($context); } }; + + unless ($errorstate == $COMPLETED) { + fileprocessingfailed($filename,getlogger(__PACKAGE__)); + } } - + if ($errorstate == $COMPLETED) { - fileprocessingdone($file,getlogger(__PACKAGE__)); return 1; - } else { - fileprocessingfailed($file,getlogger(__PACKAGE__)); } } @@ -300,6 +320,7 @@ sub _reader { filethreadingdebug('[' . $tid . '] reader thread tid ' . $tid . ' started',getlogger(__PACKAGE__)); my $blockcount = 0; + my $filename; eval { my $init_reader_context_code = $context->{instance}->can('init_reader_context'); @@ -320,97 +341,115 @@ sub _reader { notimplementederror((ref $context->{instance}) . ': ' . 'extractfields class method not implemented',getlogger(__PACKAGE__)); } - local *INPUTFILE_READER; - if (not open (INPUTFILE_READER, '<:encoding(' . $context->{instance}->{encoding} . ')', $context->{filename})) { - fileerror('processing file - cannot open file ' . $context->{filename} . ': ' . $!,getlogger(__PACKAGE__)); - return; - } - binmode INPUTFILE_READER; - - filethreadingdebug('[' . $tid . '] reader thread waiting for consumer threads',getlogger(__PACKAGE__)); - while ((_get_other_threads_state($context->{errorstates},$tid) & $RUNNING) == 0) { #wait on cosumers to come up - #yield(); - sleep($thread_sleep_secs); - } - - my $buffer = undef; - my $chunk = undef; - my $n = 0; - $context->{charsread} = 0; - $context->{linesread} = 0; - - my $i = 0; - my $state = $RUNNING; #start at first - while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0) { #as long there is one running consumer and no defunct consumer - #fetching_lines($context->{filename},$i,$context->{instance}->{blocksize},undef,getlogger(__PACKAGE__)); - my $block_n = 0; - my @lines = (); - while ((scalar @lines) < $context->{instance}->{blocksize} and defined ($n = read(INPUTFILE_READER,$chunk,$context->{instance}->{buffersize})) and $n != 0) { - if (defined $buffer) { - $buffer .= $chunk; + $context->{files_code}->(sub { + my $filename = shift; + if (-s $filename > 0) { + fileprocessingstarted($filename,getlogger(__PACKAGE__)); + } else { + if ($context->{single_file}) { + processzerofilesize($filename,getlogger(__PACKAGE__)); } else { - $buffer = $chunk; + fileprocessingwarn($filename,basename($filename) . ' ' . (-e $filename ? 'has 0 bytes' : 'not found'),getlogger(__PACKAGE__)); } - $context->{charsread} += 1; - $block_n += $n; - last unless &$extractlines_code($context,\$buffer,\@lines); - yield(); + return; } - lines_read($context->{filename},$i,$context->{instance}->{blocksize},$block_n,getlogger(__PACKAGE__)); - if (not defined $n) { - fileerror('processing file - error reading file ' . $context->{filename} . ': ' . $!,getlogger(__PACKAGE__)); - close(INPUTFILE_READER); - last; - } else { - if ($n == 0 && defined $buffer) { - push(@lines,$buffer); - } - my @rowblock :shared = (); - foreach my $line (@lines) { - $context->{linesread} += 1; - my $row = &$extractfields_code($context,(ref $line ? $line : \$line)); - push(@rowblock,shared_clone($row)) if defined $row; + + local *INPUTFILE_READER; + if (not open (INPUTFILE_READER, '<:encoding(' . $context->{instance}->{encoding} . ')', $filename)) { + fileerror('processing file - cannot open file ' . $filename . ': ' . $!,getlogger(__PACKAGE__)); + return; + } + binmode INPUTFILE_READER; + + filethreadingdebug('[' . $tid . '] reader thread waiting for consumer threads',getlogger(__PACKAGE__)); + while ((_get_other_threads_state($context->{errorstates},$tid) & $RUNNING) == 0) { #wait on cosumers to come up + #yield(); + sleep($thread_sleep_secs); + } + + my $buffer = undef; + my $chunk = undef; + my $n = 0; + $context->{charsread} = 0; + $context->{linesread} = 0; + + my $i = 0; + my $state = $RUNNING; #start at first + while (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0) { #as long there is one running consumer and no defunct consumer + #fetching_lines($context->{filename},$i,$context->{instance}->{blocksize},undef,getlogger(__PACKAGE__)); + my $block_n = 0; + my @lines = (); + while ((scalar @lines) < $context->{instance}->{blocksize} and defined ($n = read(INPUTFILE_READER,$chunk,$context->{instance}->{buffersize})) and $n != 0) { + if (defined $buffer) { + $buffer .= $chunk; + } else { + $buffer = $chunk; + } + $context->{charsread} += 1; + $block_n += $n; + last unless &$extractlines_code($context,\$buffer,\@lines); yield(); } - my $realblocksize = scalar @rowblock; - my %packet :shared = (); - $packet{rows} = \@rowblock; - $packet{size} = $realblocksize; - $packet{row_offset} = $i; - $packet{block_n} = $block_n; - if ($realblocksize > 0) { - $context->{queue}->enqueue(\%packet); #$packet); - $blockcount++; - #wait if thequeue is full and there there is one running consumer - while (((($state = _get_other_threads_state($context->{errorstates},$tid)) & $RUNNING) == $RUNNING) and $context->{queue}->pending() >= $context->{instance}->{threadqueuelength}) { - #yield(); - sleep($thread_sleep_secs); + lines_read($filename,$i,$context->{instance}->{blocksize},$block_n,getlogger(__PACKAGE__)); + if (not defined $n) { + fileerror('processing file - error reading file ' . $filename . ': ' . $!,getlogger(__PACKAGE__)); + close(INPUTFILE_READER); + last; + } else { + if ($n == 0 && defined $buffer) { + push(@lines,$buffer); + } + my @rowblock :shared = (); + foreach my $line (@lines) { + $context->{linesread} += 1; + my $row = &$extractfields_code($context,(ref $line ? $line : \$line)); + push(@rowblock,shared_clone($row)) if defined $row; + yield(); } - $i += $realblocksize; - if ($n == 0) { - filethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data) ...',getlogger(__PACKAGE__)); + my $realblocksize = scalar @rowblock; + my %packet :shared = (); + $packet{rows} = \@rowblock; + $packet{size} = $realblocksize; + $packet{row_offset} = $i; + $packet{block_n} = $block_n; + if ($realblocksize > 0) { + $context->{queue}->enqueue(\%packet); #$packet); + $blockcount++; + #wait if thequeue is full and there there is one running consumer + while (((($state = _get_other_threads_state($context->{errorstates},$tid)) & $RUNNING) == $RUNNING) and $context->{queue}->pending() >= $context->{instance}->{threadqueuelength}) { + #yield(); + sleep($thread_sleep_secs); + } + $i += $realblocksize; + if ($n == 0) { + filethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data) ...',getlogger(__PACKAGE__)); + last; + } + } else { + $context->{queue}->enqueue(\%packet); #$packet); + filethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data - empty block) ...',getlogger(__PACKAGE__)); last; } - } else { - $context->{queue}->enqueue(\%packet); #$packet); - filethreadingdebug('[' . $tid . '] reader thread is shutting down (end of data - empty block) ...',getlogger(__PACKAGE__)); - last; } } - } - if (not (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0)) { - filethreadingdebug('[' . $tid . '] reader thread is shutting down (' . - (($state & $RUNNING) == $RUNNING ? 'still running consumer threads' : 'no running consumer threads') . ', ' . - (($state & $ERROR) == 0 ? 'no defunct thread(s)' : 'defunct thread(s)') . ') ...' - ,getlogger(__PACKAGE__)); - } - close(INPUTFILE_READER); + if (not (($state & $RUNNING) == $RUNNING and ($state & $ERROR) == 0)) { + filethreadingdebug('[' . $tid . '] reader thread is shutting down (' . + (($state & $RUNNING) == $RUNNING ? 'still running consumer threads' : 'no running consumer threads') . ', ' . + (($state & $ERROR) == 0 ? 'no defunct thread(s)' : 'defunct thread(s)') . ') ...' + ,getlogger(__PACKAGE__)); + } + close(INPUTFILE_READER); + + fileprocessingdone($filename,getlogger(__PACKAGE__)); + + }); }; filethreadingdebug($@ ? '[' . $tid . '] reader thread error: ' . $@ : '[' . $tid . '] reader thread finished (' . $blockcount . ' blocks)',getlogger(__PACKAGE__)); lock $context->{errorstates}; if ($@) { $context->{errorstates}->{$tid} = $ERROR; + fileprocessingfailed($filename,getlogger(__PACKAGE__)); } else { $context->{errorstates}->{$tid} = $COMPLETED; }