diff --git a/lib/NGCP/BulkProcessor/FileProcessor.pm b/lib/NGCP/BulkProcessor/FileProcessor.pm index 36a79470..7fdac713 100755 --- a/lib/NGCP/BulkProcessor/FileProcessor.pm +++ b/lib/NGCP/BulkProcessor/FileProcessor.pm @@ -262,7 +262,7 @@ sub process { if ($@) { $errorstate = $ERROR; } else { - $errorstate = (not $rowblock_result) ? $ERROR : $COMPLETED; + $errorstate = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED; } eval { @@ -472,7 +472,7 @@ sub _process { if ($err) { $context->{errorstates}->{$tid} = $ERROR; } else { - $context->{errorstates}->{$tid} = (not $rowblock_result) ? $ERROR : $COMPLETED; + $context->{errorstates}->{$tid} = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED; } return $context->{errorstates}->{$tid}; } diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm index 987c0267..9e734e1d 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm @@ -69,6 +69,7 @@ our @EXPORT_OK = qw( ); my $file_sequence_number : shared = 0; +my $rowcount : shared = 0; sub reset_export_status { @@ -152,8 +153,17 @@ sub export_cdrs { } } foreach my $record (@$records) { - return 0 if (defined $export_cdr_limit and $context->{rownum} >= $export_cdr_limit); - return 0 if $context->{file_sequence_number} > $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn; + if (defined $export_cdr_limit) { + lock $rowcount; + if ($rowcount >= $export_cdr_limit) { + _info($context,"exceeding export limit $export_cdr_limit"); + return 0; + } + } + if ($context->{file_sequence_number} > $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn) { + _info($context,"exceeding file sequence number " . $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn); + return 0; + } my ($id,$call_id) = @$record; next unless _export_cdrs_init_context($context,$id,$call_id); eval { @@ -184,7 +194,7 @@ sub export_cdrs { $context->{ama_files} = []; $context->{has_next} = 1; - $context->{rownum} = 0; + #$context->{rownum} = 0; _increment_file_sequence_number($context); }, @@ -224,7 +234,7 @@ sub export_cdrs { joins => $export_cdr_joins, conditions => $export_cdr_conditions, #sort => [{ column => 'id', numeric => 1, dir => 1 }], - limit => $export_cdr_limit, + #limit => $export_cdr_limit, ); eval { @@ -263,7 +273,8 @@ sub _export_cdrs_init_context { if ((scalar @{$context->{cdrs}}) == $cdrs_in_block) { foreach my $cdr (@{$context->{cdrs}}) { $context->{file_cdr_id_map}->{$cdr->{id}} = $cdr; #->{start_time}; - $context->{rownum} += 1; + lock $rowcount; + $rowcount += 1; } $result = 1; } @@ -289,7 +300,8 @@ sub _commit_export_status { ) = @params{qw/ context /}; - _info($context,"file " . $context->{file}->get_filename() . " (" . kbytes2gigs(int((-s $context->{file}->get_filename()) / 1024)) . ") written (" . $context->{file}->get_record_count() . " records in " . $context->{file}->get_block_count() . " blocks)"); + my $result = 1; + _info($context,"file " . $context->{file}->get_filename(1) . " (" . kbytes2gigs(int($context->{file}->get_filesize() / 1024)) . ") - " . $context->{file}->get_record_count() . " records in " . $context->{file}->get_block_count() . " blocks"); eval { ping_dbs(); $context->{db}->db_begin(); @@ -309,7 +321,6 @@ sub _commit_export_status { ); #set mark... _info($context,"file sequence number $context->{file_sequence_number} saved"); $context->{db}->db_commit(); - }; $context->{file_cdr_id_map} = {}; my $err = $@; @@ -317,14 +328,16 @@ sub _commit_export_status { eval { $context->{db}->db_rollback(1); }; - eval { - unlink $context->{file}->get_filename(); - }; + #eval { + # unlink $context->{file}->get_filename(); + #}; die($err); + $result = 0; } else { push(@{$context->{ama_files}},$context->{file}->get_filename()); _increment_file_sequence_number($context) if $context->{has_next}; } + return $result; } @@ -382,6 +395,7 @@ sub _get_record { padding => 0, recording_office_id => '438716', #008708 + call_type => '970', #call code 970c #timing ind 000 #seervice observed 0c diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm index 43d2ad85..3c733848 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm @@ -6,6 +6,8 @@ use strict; use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw( $output_path $ama_filename_format + $export_cdr_use_temp_files + $tempfile_path ); use NGCP::BulkProcessor::Projects::Export::Ama::Format::Block qw(); @@ -19,6 +21,8 @@ use NGCP::BulkProcessor::LogError qw( fileerror ); +use NGCP::BulkProcessor::Utils qw(tempfilename); + require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( @@ -26,6 +30,7 @@ our @EXPORT_OK = qw( ); my $max_blocks = 99; +my $ama_file_extension = '.ama'; sub new { @@ -44,6 +49,7 @@ sub reset { $self->{record_count} = 0; $self->_save_transfer_in(undef); $self->_save_transfer_out(undef); + $self->{tempfilename} = tempfilename('XXXX',$tempfile_path,$ama_file_extension) if $export_cdr_use_temp_files; return; } @@ -87,12 +93,26 @@ sub add_record { sub get_filename { my $self = shift; + my ($show_tempfilename) = @_; + return $self->{tempfilename} if ($export_cdr_use_temp_files and $show_tempfilename); return sprintf($ama_filename_format, $output_path, $self->{transfer_in}->get_structure()->get_file_sequence_number_field()->{file_sequence_number}, + $ama_file_extension, ); } +sub get_filesize { + my $self = shift; + return -s ($export_cdr_use_temp_files ? $self->{tempfilename} : $self->get_filename()); +} + +sub _rename { + my $self = shift; + my ($filename) = @_; + return rename($self->{tempfilename},$filename); +} + sub flush { my $self = shift; my %params = @_; @@ -102,8 +122,8 @@ sub flush { commit_cb /}; #unlink 'test.ama'; - if ((scalar @{$self->{blocks}}) > 0 and (my $filename = $self->get_filename())) { - if (-e $filename) { + if ((scalar @{$self->{blocks}}) > 0 and (my $filename = ($export_cdr_use_temp_files ? $self->{tempfilename} : $self->get_filename()))) { + if (not $export_cdr_use_temp_files and -e $filename) { fileerror($filename . ' already exists',getlogger(__PACKAGE__)); return 0; } else { @@ -112,9 +132,20 @@ sub flush { print $fh pack('H*',$block->get_hex()); } close $fh; - &$commit_cb(@_) if defined $commit_cb; + if (defined $commit_cb) { + if (&$commit_cb(@_) and (not $export_cdr_use_temp_files or $self->_rename($self->get_filename()))) { + return 1; + } else { + eval { + unlink $filename unless $export_cdr_use_temp_files; + unlink $self->{tempfilename} if $export_cdr_use_temp_files; + }; + return 0; + } + } else { + return 1; + } #restdebug($self,"$self->{crt_path} saved",getlogger(__PACKAGE__)); - return 1; } else { fileerror('failed to open ' . $filename . ": $!",getlogger(__PACKAGE__)); return 0; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm index a3424e84..e122ec77 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm @@ -37,6 +37,7 @@ our @EXPORT_OK = qw( $input_path $output_path + $tempfile_path $defaultsettings $defaultconfig @@ -53,6 +54,7 @@ our @EXPORT_OK = qw( $export_cdr_limit $export_cdr_stream $export_cdr_rollover_fsn + $export_cdr_use_temp_files $domestic_destination_pattern $international_destination_pattern @@ -70,6 +72,7 @@ our $defaultsettings = 'settings.cfg'; our $input_path = $working_path . 'input/'; our $output_path = $working_path . 'output/'; +our $tempfile_path = $working_path . 'temp/'; our $force = 0; #our $dry = 0; @@ -83,11 +86,12 @@ our $export_cdr_conditions = []; our $export_cdr_limit = undef; our $export_cdr_stream = undef; our $export_cdr_rollover_fsn = 0; +our $export_cdr_use_temp_files = 0; our $domestic_destination_pattern = undef; our $international_destination_pattern = undef; -our $ama_filename_format = '%1$s%2$s.ama'; +our $ama_filename_format = '%1$s%2$s%3$s'; sub update_settings { @@ -126,6 +130,7 @@ sub update_settings { $export_cdr_limit = $data->{export_cdr_limit} if exists $data->{export_cdr_limit}; $export_cdr_stream = $data->{export_cdr_stream} if exists $data->{export_cdr_stream}; $export_cdr_rollover_fsn = stringtobool($data->{export_cdr_rollover_fsn}) if exists $data->{export_cdr_rollover_fsn}; + $export_cdr_use_temp_files = stringtobool($data->{export_cdr_use_temp_files}) if exists $data->{export_cdr_use_temp_files}; #if ((confval("MAINTENANCE") // 'no') eq 'yes') { # exit(0); @@ -157,6 +162,8 @@ sub _prepare_working_paths { $result &= $path_result; ($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__)); $result &= $path_result; + ($path_result,$tempfile_path) = create_path($working_path . 'temp',$output_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; return $result; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.cfg index c255d385..dbf40bd7 100755 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.cfg @@ -1,7 +1,7 @@ ##general settings: working_path = /var/sipwise #cpucount = 4 -enablemultithreading = 1 +enablemultithreading = 0 rowblock_transactional = 0 diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.debug.cfg index b245a72f..a69f988c 100755 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.debug.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/config.debug.cfg @@ -1,7 +1,7 @@ ##general settings: working_path = /home/rkrenn/temp/export #cpucount = 4 -enablemultithreading = 1 +enablemultithreading = 0 rowblock_transactional = 0 diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl b/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl index 8dcce9e1..d5c8c611 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl @@ -13,6 +13,7 @@ use NGCP::BulkProcessor::Globals qw(); use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw( update_settings $output_path + $tempfile_path $defaultsettings $defaultconfig $skip_errors @@ -200,6 +201,7 @@ sub cleanup_task { cleanupmsgfiles(\&fileerror,\&filewarn); cleanupcertfiles(); cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + cleanupdir($tempfile_path,1,\&filewarn,getlogger(getscriptpath())); $result = 1; }; } diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg index 186a977f..acffb45b 100755 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg @@ -4,9 +4,11 @@ export_cdr_multithreading = 1 export_cdr_numofthreads = 2 export_cdr_blocksize = 1000 export_cdr_joins = { 'billing.voip_subscribers source_voip_subscribers' => { 'source_voip_subscribers.uuid' => 'accounting.cdr.source_user_id' } }, { 'billing.voip_subscribers destination_voip_subscribers' => { 'destination_voip_subscribers.uuid' => 'accounting.cdr.destination_user_id' } }, { 'billing.billing_zones_history source_carrier_bbz' => { 'source_carrier_bbz.id' => 'accounting.cdr.source_carrier_billing_zone_id' } }, { 'billing.billing_zones_history source_reseller_bbz' => { 'source_reseller_bbz.id' => 'accounting.cdr.source_reseller_billing_zone_id' } }, { 'billing.billing_zones_history source_customer_bbz' => { 'source_customer_bbz.id' => 'accounting.cdr.source_customer_billing_zone_id' } }, { 'billing.billing_zones_history destination_carrier_bbz' => { 'destination_carrier_bbz.id' => 'accounting.cdr.destination_carrier_billing_zone_id' } }, { 'billing.billing_zones_history destination_reseller_bbz' => { 'destination_reseller_bbz.id' => 'accounting.cdr.destination_reseller_billing_zone_id' } }, { 'billing.billing_zones_history destination_customer_bbz' => { 'destination_customer_bbz.id' => 'accounting.cdr.destination_customer_billing_zone_id' } } -export_cdr_conditions = { 'accounting.cdr.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } }, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } } -export_cdr_limit = 300000 +export_cdr_conditions = +#{ 'accounting.cdr.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } }, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } } +export_cdr_limit = 10000 export_cdr_stream = ama_simple export_cdr_rollover_fsn = 1 +export_cdr_use_temp_files = 1 -ama_filename_format = %1$s%2$03d.debug.ama +ama_filename_format = %1$s%2$03d.debug%3$s diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg index e815f821..37473831 100755 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg @@ -9,8 +9,8 @@ export_cdr_blocksize = 1000 export_cdr_conditions = { 'accounting.cdr.call_status' => { '=' => '"ok"' } } export_cdr_stream = ama_simple #default -#export_cdr_limit = 3500 -#300000 +export_cdr_limit = 10000 export_cdr_rollover_fsn = 1 +export_cdr_use_temp_files = 1 -ama_filename_format = %1$s%2$03d.debug.ama +ama_filename_format = %1$s%2$03d.debug%3$s diff --git a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg index 78ad7fae..7de00fd2 100755 --- a/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Massive/Generator/settings.cfg @@ -14,4 +14,5 @@ providers_yml = providers.yml generate_cdr_multithreading = 1 #generate_cdr_numofthreads = 2 -generate_cdr_count = 2000000 +generate_cdr_count = 200000 +#2000000 diff --git a/lib/NGCP/BulkProcessor/RestProcessor.pm b/lib/NGCP/BulkProcessor/RestProcessor.pm index 8bb60e97..08cbb4a5 100755 --- a/lib/NGCP/BulkProcessor/RestProcessor.pm +++ b/lib/NGCP/BulkProcessor/RestProcessor.pm @@ -252,7 +252,7 @@ sub process_collection { if ($@) { $errorstate = $ERROR; } else { - $errorstate = (not $rowblock_result) ? $ERROR : $COMPLETED; + $errorstate = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED; } eval { @@ -406,7 +406,7 @@ sub _process { if ($err) { $context->{errorstates}->{$tid} = $ERROR; } else { - $context->{errorstates}->{$tid} = (not $rowblock_result) ? $ERROR : $COMPLETED; + $context->{errorstates}->{$tid} = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED; } return $context->{errorstates}->{$tid}; } diff --git a/lib/NGCP/BulkProcessor/SqlProcessor.pm b/lib/NGCP/BulkProcessor/SqlProcessor.pm index 27e75ec4..aa263d3d 100755 --- a/lib/NGCP/BulkProcessor/SqlProcessor.pm +++ b/lib/NGCP/BulkProcessor/SqlProcessor.pm @@ -1243,7 +1243,7 @@ sub process_table { if ($@) { $errorstate = $ERROR; } else { - $errorstate = (not $rowblock_result) ? $ERROR : $COMPLETED; + $errorstate = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED; } eval { @@ -1569,7 +1569,7 @@ sub _process { if ($err) { $context->{errorstates}->{$tid} = $ERROR; } else { - $context->{errorstates}->{$tid} = (not $rowblock_result) ? $ERROR : $COMPLETED; + $context->{errorstates}->{$tid} = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED; } return $context->{errorstates}->{$tid}; }