TT#50609 ama: write to tmpfiles+rename

Change-Id: I4c55503670d1817d57ec6070c9c2b91e72b661b9
(cherry picked from commit c09f10d8a4)
changes/98/26798/1
Rene Krenn 8 years ago
parent 8c34cf9dbf
commit 87a586f148

@ -262,7 +262,7 @@ sub process {
if ($@) {
$errorstate = $ERROR;
} else {
$errorstate = (not $rowblock_result) ? $ERROR : $COMPLETED;
$errorstate = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
}
eval {
@ -472,7 +472,7 @@ sub _process {
if ($err) {
$context->{errorstates}->{$tid} = $ERROR;
} else {
$context->{errorstates}->{$tid} = (not $rowblock_result) ? $ERROR : $COMPLETED;
$context->{errorstates}->{$tid} = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
}
return $context->{errorstates}->{$tid};
}

@ -69,6 +69,7 @@ our @EXPORT_OK = qw(
);
my $file_sequence_number : shared = 0;
my $rowcount : shared = 0;
sub reset_export_status {
@ -152,8 +153,17 @@ sub export_cdrs {
}
}
foreach my $record (@$records) {
return 0 if (defined $export_cdr_limit and $context->{rownum} >= $export_cdr_limit);
return 0 if $context->{file_sequence_number} > $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn;
if (defined $export_cdr_limit) {
lock $rowcount;
if ($rowcount >= $export_cdr_limit) {
_info($context,"exceeding export limit $export_cdr_limit");
return 0;
}
}
if ($context->{file_sequence_number} > $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn) {
_info($context,"exceeding file sequence number " . $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn);
return 0;
}
my ($id,$call_id) = @$record;
next unless _export_cdrs_init_context($context,$id,$call_id);
eval {
@ -184,7 +194,7 @@ sub export_cdrs {
$context->{ama_files} = [];
$context->{has_next} = 1;
$context->{rownum} = 0;
#$context->{rownum} = 0;
_increment_file_sequence_number($context);
},
@ -224,7 +234,7 @@ sub export_cdrs {
joins => $export_cdr_joins,
conditions => $export_cdr_conditions,
#sort => [{ column => 'id', numeric => 1, dir => 1 }],
limit => $export_cdr_limit,
#limit => $export_cdr_limit,
);
eval {
@ -263,7 +273,8 @@ sub _export_cdrs_init_context {
if ((scalar @{$context->{cdrs}}) == $cdrs_in_block) {
foreach my $cdr (@{$context->{cdrs}}) {
$context->{file_cdr_id_map}->{$cdr->{id}} = $cdr; #->{start_time};
$context->{rownum} += 1;
lock $rowcount;
$rowcount += 1;
}
$result = 1;
}
@ -289,7 +300,8 @@ sub _commit_export_status {
) = @params{qw/
context
/};
_info($context,"file " . $context->{file}->get_filename() . " (" . kbytes2gigs(int((-s $context->{file}->get_filename()) / 1024)) . ") written (" . $context->{file}->get_record_count() . " records in " . $context->{file}->get_block_count() . " blocks)");
my $result = 1;
_info($context,"file " . $context->{file}->get_filename(1) . " (" . kbytes2gigs(int($context->{file}->get_filesize() / 1024)) . ") - " . $context->{file}->get_record_count() . " records in " . $context->{file}->get_block_count() . " blocks");
eval {
ping_dbs();
$context->{db}->db_begin();
@ -309,7 +321,6 @@ sub _commit_export_status {
); #set mark...
_info($context,"file sequence number $context->{file_sequence_number} saved");
$context->{db}->db_commit();
};
$context->{file_cdr_id_map} = {};
my $err = $@;
@ -317,14 +328,16 @@ sub _commit_export_status {
eval {
$context->{db}->db_rollback(1);
};
eval {
unlink $context->{file}->get_filename();
};
#eval {
# unlink $context->{file}->get_filename();
#};
die($err);
$result = 0;
} else {
push(@{$context->{ama_files}},$context->{file}->get_filename());
_increment_file_sequence_number($context) if $context->{has_next};
}
return $result;
}
@ -382,6 +395,7 @@ sub _get_record {
padding => 0,
recording_office_id => '438716', #008708
call_type => '970',
#call code 970c
#timing ind 000
#seervice observed 0c

@ -6,6 +6,8 @@ use strict;
use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw(
$output_path
$ama_filename_format
$export_cdr_use_temp_files
$tempfile_path
);
use NGCP::BulkProcessor::Projects::Export::Ama::Format::Block qw();
@ -19,6 +21,8 @@ use NGCP::BulkProcessor::LogError qw(
fileerror
);
use NGCP::BulkProcessor::Utils qw(tempfilename);
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
@ -26,6 +30,7 @@ our @EXPORT_OK = qw(
);
my $max_blocks = 99;
my $ama_file_extension = '.ama';
sub new {
@ -44,6 +49,7 @@ sub reset {
$self->{record_count} = 0;
$self->_save_transfer_in(undef);
$self->_save_transfer_out(undef);
$self->{tempfilename} = tempfilename('XXXX',$tempfile_path,$ama_file_extension) if $export_cdr_use_temp_files;
return;
}
@ -87,12 +93,26 @@ sub add_record {
sub get_filename {
my $self = shift;
my ($show_tempfilename) = @_;
return $self->{tempfilename} if ($export_cdr_use_temp_files and $show_tempfilename);
return sprintf($ama_filename_format,
$output_path,
$self->{transfer_in}->get_structure()->get_file_sequence_number_field()->{file_sequence_number},
$ama_file_extension,
);
}
sub get_filesize {
my $self = shift;
return -s ($export_cdr_use_temp_files ? $self->{tempfilename} : $self->get_filename());
}
sub _rename {
my $self = shift;
my ($filename) = @_;
return rename($self->{tempfilename},$filename);
}
sub flush {
my $self = shift;
my %params = @_;
@ -102,8 +122,8 @@ sub flush {
commit_cb
/};
#unlink 'test.ama';
if ((scalar @{$self->{blocks}}) > 0 and (my $filename = $self->get_filename())) {
if (-e $filename) {
if ((scalar @{$self->{blocks}}) > 0 and (my $filename = ($export_cdr_use_temp_files ? $self->{tempfilename} : $self->get_filename()))) {
if (not $export_cdr_use_temp_files and -e $filename) {
fileerror($filename . ' already exists',getlogger(__PACKAGE__));
return 0;
} else {
@ -112,9 +132,20 @@ sub flush {
print $fh pack('H*',$block->get_hex());
}
close $fh;
&$commit_cb(@_) if defined $commit_cb;
if (defined $commit_cb) {
if (&$commit_cb(@_) and (not $export_cdr_use_temp_files or $self->_rename($self->get_filename()))) {
return 1;
} else {
eval {
unlink $filename unless $export_cdr_use_temp_files;
unlink $self->{tempfilename} if $export_cdr_use_temp_files;
};
return 0;
}
} else {
return 1;
}
#restdebug($self,"$self->{crt_path} saved",getlogger(__PACKAGE__));
return 1;
} else {
fileerror('failed to open ' . $filename . ": $!",getlogger(__PACKAGE__));
return 0;

@ -37,6 +37,7 @@ our @EXPORT_OK = qw(
$input_path
$output_path
$tempfile_path
$defaultsettings
$defaultconfig
@ -53,6 +54,7 @@ our @EXPORT_OK = qw(
$export_cdr_limit
$export_cdr_stream
$export_cdr_rollover_fsn
$export_cdr_use_temp_files
$domestic_destination_pattern
$international_destination_pattern
@ -70,6 +72,7 @@ our $defaultsettings = 'settings.cfg';
our $input_path = $working_path . 'input/';
our $output_path = $working_path . 'output/';
our $tempfile_path = $working_path . 'temp/';
our $force = 0;
#our $dry = 0;
@ -83,11 +86,12 @@ our $export_cdr_conditions = [];
our $export_cdr_limit = undef;
our $export_cdr_stream = undef;
our $export_cdr_rollover_fsn = 0;
our $export_cdr_use_temp_files = 0;
our $domestic_destination_pattern = undef;
our $international_destination_pattern = undef;
our $ama_filename_format = '%1$s%2$s.ama';
our $ama_filename_format = '%1$s%2$s%3$s';
sub update_settings {
@ -126,6 +130,7 @@ sub update_settings {
$export_cdr_limit = $data->{export_cdr_limit} if exists $data->{export_cdr_limit};
$export_cdr_stream = $data->{export_cdr_stream} if exists $data->{export_cdr_stream};
$export_cdr_rollover_fsn = stringtobool($data->{export_cdr_rollover_fsn}) if exists $data->{export_cdr_rollover_fsn};
$export_cdr_use_temp_files = stringtobool($data->{export_cdr_use_temp_files}) if exists $data->{export_cdr_use_temp_files};
#if ((confval("MAINTENANCE") // 'no') eq 'yes') {
# exit(0);
@ -157,6 +162,8 @@ sub _prepare_working_paths {
$result &= $path_result;
($path_result,$output_path) = create_path($working_path . 'output',$output_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
($path_result,$tempfile_path) = create_path($working_path . 'temp',$output_path,$create,\&fileerror,getlogger(__PACKAGE__));
$result &= $path_result;
return $result;

@ -1,7 +1,7 @@
##general settings:
working_path = /var/sipwise
#cpucount = 4
enablemultithreading = 1
enablemultithreading = 0
rowblock_transactional = 0

@ -1,7 +1,7 @@
##general settings:
working_path = /home/rkrenn/temp/export
#cpucount = 4
enablemultithreading = 1
enablemultithreading = 0
rowblock_transactional = 0

@ -13,6 +13,7 @@ use NGCP::BulkProcessor::Globals qw();
use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw(
update_settings
$output_path
$tempfile_path
$defaultsettings
$defaultconfig
$skip_errors
@ -200,6 +201,7 @@ sub cleanup_task {
cleanupmsgfiles(\&fileerror,\&filewarn);
cleanupcertfiles();
cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated;
cleanupdir($tempfile_path,1,\&filewarn,getlogger(getscriptpath()));
$result = 1;
};
}

@ -4,9 +4,11 @@ export_cdr_multithreading = 1
export_cdr_numofthreads = 2
export_cdr_blocksize = 1000
export_cdr_joins = { 'billing.voip_subscribers source_voip_subscribers' => { 'source_voip_subscribers.uuid' => 'accounting.cdr.source_user_id' } }, { 'billing.voip_subscribers destination_voip_subscribers' => { 'destination_voip_subscribers.uuid' => 'accounting.cdr.destination_user_id' } }, { 'billing.billing_zones_history source_carrier_bbz' => { 'source_carrier_bbz.id' => 'accounting.cdr.source_carrier_billing_zone_id' } }, { 'billing.billing_zones_history source_reseller_bbz' => { 'source_reseller_bbz.id' => 'accounting.cdr.source_reseller_billing_zone_id' } }, { 'billing.billing_zones_history source_customer_bbz' => { 'source_customer_bbz.id' => 'accounting.cdr.source_customer_billing_zone_id' } }, { 'billing.billing_zones_history destination_carrier_bbz' => { 'destination_carrier_bbz.id' => 'accounting.cdr.destination_carrier_billing_zone_id' } }, { 'billing.billing_zones_history destination_reseller_bbz' => { 'destination_reseller_bbz.id' => 'accounting.cdr.destination_reseller_billing_zone_id' } }, { 'billing.billing_zones_history destination_customer_bbz' => { 'destination_customer_bbz.id' => 'accounting.cdr.destination_customer_billing_zone_id' } }
export_cdr_conditions = { 'accounting.cdr.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } }, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } }
export_cdr_limit = 300000
export_cdr_conditions =
#{ 'accounting.cdr.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } }, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } }
export_cdr_limit = 10000
export_cdr_stream = ama_simple
export_cdr_rollover_fsn = 1
export_cdr_use_temp_files = 1
ama_filename_format = %1$s%2$03d.debug.ama
ama_filename_format = %1$s%2$03d.debug%3$s

@ -9,8 +9,8 @@ export_cdr_blocksize = 1000
export_cdr_conditions = { 'accounting.cdr.call_status' => { '=' => '"ok"' } }
export_cdr_stream = ama_simple
#default
#export_cdr_limit = 3500
#300000
export_cdr_limit = 10000
export_cdr_rollover_fsn = 1
export_cdr_use_temp_files = 1
ama_filename_format = %1$s%2$03d.debug.ama
ama_filename_format = %1$s%2$03d.debug%3$s

@ -14,4 +14,5 @@ providers_yml = providers.yml
generate_cdr_multithreading = 1
#generate_cdr_numofthreads = 2
generate_cdr_count = 2000000
generate_cdr_count = 200000
#2000000

@ -252,7 +252,7 @@ sub process_collection {
if ($@) {
$errorstate = $ERROR;
} else {
$errorstate = (not $rowblock_result) ? $ERROR : $COMPLETED;
$errorstate = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
}
eval {
@ -406,7 +406,7 @@ sub _process {
if ($err) {
$context->{errorstates}->{$tid} = $ERROR;
} else {
$context->{errorstates}->{$tid} = (not $rowblock_result) ? $ERROR : $COMPLETED;
$context->{errorstates}->{$tid} = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
}
return $context->{errorstates}->{$tid};
}

@ -1243,7 +1243,7 @@ sub process_table {
if ($@) {
$errorstate = $ERROR;
} else {
$errorstate = (not $rowblock_result) ? $ERROR : $COMPLETED;
$errorstate = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
}
eval {
@ -1569,7 +1569,7 @@ sub _process {
if ($err) {
$context->{errorstates}->{$tid} = $ERROR;
} else {
$context->{errorstates}->{$tid} = (not $rowblock_result) ? $ERROR : $COMPLETED;
$context->{errorstates}->{$tid} = $COMPLETED; #(not $rowblock_result) ? $ERROR : $COMPLETED;
}
return $context->{errorstates}->{$tid};
}

Loading…
Cancel
Save