diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm index 1e8a667..4fd9673 100755 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm @@ -293,6 +293,7 @@ sub process_unexported { #sort check_table(); + NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::check_table(); my $db = &$get_db(); my $table = $db->tableidentifier($tablename); diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/mark.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/mark.pm index 906e6e8..c7d9deb 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/mark.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/mark.pm @@ -39,6 +39,9 @@ our @EXPORT_OK = qw( cleanup_system_marks cleanup_reseller_marks + + delete_system_marks + delete_reseller_marks ); my $tablename = 'mark'; @@ -99,8 +102,10 @@ sub _get_mark { my @params = ($collector); my $mark = $xa_db->db_get_value($stmt,@params); + + return $mark; - return (defined $mark ? $mark : '0'); + #return (defined $mark ? $mark : '0'); } @@ -217,6 +222,45 @@ sub _cleanup_marks { } +sub delete_system_marks { + + my ($xa_db,$stream) = @_; + + return _delete_marks($xa_db,sprintf($system_collector_format,$stream)); + +} + +sub delete_reseller_marks { + + my ($xa_db,$stream,$reseller_id) = @_; + + return _delete_marks($xa_db,sprintf($reseller_collector_format,$stream,$reseller_id // '')); + +} + +sub _delete_marks { + + my ($xa_db,$collector) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'DELETE FROM ' . $table . ' WHERE ' . + $db->columnidentifier('collector') . ' = ?'; + my @params = ($collector); + + if ($xa_db->db_do($stmt,@params)) { + rowsdeleted($db,$tablename,1,1,getlogger(__PACKAGE__)); + return 1; + } else { + rowsdeleted($db,$tablename,0,0,getlogger(__PACKAGE__)); + return 0; + } + +} + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm index afb8969..41a1f00 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm @@ -111,18 +111,21 @@ sub reset_fsn { my $result = 1; my $context = { tid => threadid(), warning_count => 0, error_count => 0, }; $result &= _check_export_status_stream($context); - my $fsn; + #my $fsn; eval { - NGCP::BulkProcessor::Dao::Trunk::accounting::mark::cleanup_system_marks(undef, - $export_cdr_stream, - ); - NGCP::BulkProcessor::Dao::Trunk::accounting::mark::set_system_mark(undef, + NGCP::BulkProcessor::Dao::Trunk::accounting::mark::delete_system_marks(undef, $export_cdr_stream, - '0' #$NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::min_fsn ); - $fsn = NGCP::BulkProcessor::Dao::Trunk::accounting::mark::get_system_mark(undef, - $export_cdr_stream - ); #load mark... + #NGCP::BulkProcessor::Dao::Trunk::accounting::mark::cleanup_system_marks(undef, + # $export_cdr_stream, + #); + #NGCP::BulkProcessor::Dao::Trunk::accounting::mark::set_system_mark(undef, + # $export_cdr_stream, + # '0' #$NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::min_fsn + #); + #$fsn = NGCP::BulkProcessor::Dao::Trunk::accounting::mark::get_system_mark(undef, + # $export_cdr_stream + #); #load mark... }; if ($@) { if ($skip_errors) { @@ -132,7 +135,7 @@ sub reset_fsn { } $result = 0; } else { - _info($context,"file sequence number reset to $fsn") + _info($context,"file sequence number deleted"); #reset to $fsn") } return $result; @@ -379,6 +382,8 @@ sub _get_transfer_in { connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($context->{dt}), # adjacent + dt => $context->{dt}, + file_sequence_number => $context->{file_sequence_number}, ) ); @@ -478,6 +483,8 @@ sub _get_transfer_out { file_sequence_number => $context->{file_sequence_number}, + dt => $context->{dt}, + #=> (scalar @records), @@ -535,12 +542,12 @@ sub _export_cdrs_create_context { $result = 0; } else { my $reset = 0; - if ($fsn < 0) { - $fsn = 0; + if (not defined $fsn) { + $fsn = $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::min_fsn - 1; + } elsif ($fsn < 0) { $reset = 1; } elsif ($fsn >= $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn) { if ($export_cdr_rollover_fsn) { - $fsn = 0; $reset = 1; } else { _warn($context,"file sequence number $fsn exceeding limit (" . $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn . ")"); @@ -550,17 +557,11 @@ sub _export_cdrs_create_context { _info($context,"last file sequence number is $fsn"); } if ($reset) { + $fsn = $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::min_fsn - 1; eval { - NGCP::BulkProcessor::Dao::Trunk::accounting::mark::cleanup_system_marks(undef, - $export_cdr_stream, - ); - NGCP::BulkProcessor::Dao::Trunk::accounting::mark::set_system_mark(undef, + NGCP::BulkProcessor::Dao::Trunk::accounting::mark::delete_system_marks(undef, $export_cdr_stream, - '0' #$NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::min_fsn ); - $fsn = NGCP::BulkProcessor::Dao::Trunk::accounting::mark::get_system_mark(undef, - $export_cdr_stream - ); #load mark... }; if ($@) { if ($skip_errors) { @@ -570,7 +571,7 @@ sub _export_cdrs_create_context { } $result = 0; } else { - _info($context,"file sequence number reset to $fsn") + _info($context,"file sequence number deleted"); #reset to $fsn") } } lock $file_sequence_number; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/ConnectTime.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/ConnectTime.pm index 6b1af82..9dda3ab 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/ConnectTime.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/ConnectTime.pm @@ -16,7 +16,7 @@ our @EXPORT_OK = qw( my $field_name = "connect time"; my $length = 8; -my @param_names = qw/connect_time/; +my @param_names = qw/connect_time dt/; sub new { @@ -43,7 +43,7 @@ sub _get_param_names { sub get_hex { my $self = shift; - my ($connect_time) = $self->_get_params(@_); + my ($connect_time,$dt) = $self->_get_params(@_); die("invalid connect time '$connect_time'") unless length($connect_time) == 7; return $connect_time . $TERMINATOR; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/Date.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/Date.pm index e294784..102a7d2 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/Date.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/Date.pm @@ -16,7 +16,7 @@ our @EXPORT_OK = qw( my $field_name = "date"; my $length = 6; -my @param_names = qw/date/; +my @param_names = qw/date dt/; sub new { @@ -43,7 +43,7 @@ sub _get_param_names { sub get_hex { my $self = shift; - my ($date) = $self->_get_params(@_); + my ($date,$dt) = $self->_get_params(@_); die("invalid date '$date'") unless length($date) == 5; return $date . $TERMINATOR; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/FileSequenceNumber.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/FileSequenceNumber.pm index 1c6a879..d55423c 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/FileSequenceNumber.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/FileSequenceNumber.pm @@ -18,8 +18,8 @@ my $field_name = "file sequence number"; my $length = 4; my @param_names = qw/file_sequence_number/; -our $min_fsn = 1; -our $max_fsn = 999; +our $min_fsn = 0; #1; +our $max_fsn = 99; #999; sub new { diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm index 4786b35..52f961e 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm @@ -97,6 +97,13 @@ sub get_filename { return $self->{tempfilename} if ($export_cdr_use_temp_files and $show_tempfilename); return sprintf($ama_filename_format, $output_path, + $self->{transfer_in}->get_structure()->get_date_field()->{dt}->year, + substr($self->{transfer_in}->get_structure()->get_date_field()->{dt}->year,-2), + $self->{transfer_in}->get_structure()->get_date_field()->{dt}->month, + $self->{transfer_in}->get_structure()->get_date_field()->{dt}->day, + $self->{transfer_in}->get_structure()->get_connect_time_field()->{dt}->hour, + $self->{transfer_in}->get_structure()->get_connect_time_field()->{dt}->minute, + $self->{transfer_in}->get_structure()->get_connect_time_field()->{dt}->second, $self->{transfer_in}->get_structure()->get_file_sequence_number_field()->{file_sequence_number}, $ama_file_extension, ); diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9013.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9013.pm index acb8280..2c17e40 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9013.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9013.pm @@ -58,12 +58,14 @@ sub new { $self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::RecordingOfficeId->new( @_, )); - $self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date->new( + $self->{date} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date->new( @_, - )); - $self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime->new( + ); + $self->_add_field($self->{date}); + $self->{connect_time} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime->new( @_, - )); + ); + $self->_add_field($self->{connect_time}); $self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::GenericIssue->new( @_, )); @@ -90,6 +92,16 @@ sub get_file_sequence_number_field { return $self->{file_sequence_number}; } +sub get_date_field { + my $self = shift; + return $self->{date}; +} + +sub get_connect_time_field { + my $self = shift; + return $self->{connect_time}; +} + #sub get_instance { # return #} diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg index acffb45..85cc395 100755 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg @@ -11,4 +11,4 @@ export_cdr_stream = ama_simple export_cdr_rollover_fsn = 1 export_cdr_use_temp_files = 1 -ama_filename_format = %1$s%2$03d.debug%3$s +ama_filename_format = %1$sP%3$02d%4$02d%5$02d%6$02d%7$02d%9$02dAMA%10$s diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg index 3747383..36bd7da 100755 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg @@ -6,11 +6,12 @@ export_cdr_blocksize = 1000 #export_cdr_joins = { 'accounting.cdr_export_status_data esd' => { 'esd.cdr_id' => 'accounting.cdr.id' } }, { 'accounting.cdr_export_status es' => { 'es.id' => 'esd.status_id' } } #export_cdr_conditions = { 'es.type' => { '=' => '"default"' } }, { 'esd.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } } #, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } } -export_cdr_conditions = { 'accounting.cdr.call_status' => { '=' => '"ok"' } } +export_cdr_conditions = +#{ 'accounting.cdr.call_status' => { '=' => '"ok"' } } export_cdr_stream = ama_simple #default export_cdr_limit = 10000 export_cdr_rollover_fsn = 1 export_cdr_use_temp_files = 1 -ama_filename_format = %1$s%2$03d.debug%3$s +ama_filename_format = %1$sP%3$02d%4$02d%5$02d%6$02d%7$02d%9$02dAMA.debug%10$s