diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm index 0c82438..1e8a667 100755 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm @@ -41,6 +41,8 @@ our @EXPORT_OK = qw( findby_callidprefix process_unexported + get_callidprefix + ); #process_records #delete_ids @@ -207,6 +209,15 @@ sub countby_ratingstatus { } +sub get_callidprefix { + + my ($call_id) = @_; + my $suffixre = '(' . join('|', map { quotemeta($_); } @callid_suffixes) . ')+$'; + $call_id =~ s/$suffixre//g; + return $call_id + +} + sub findby_callidprefix { my ($xa_db,$call_id,$joins,$conditions,$load_recursive) = @_; @@ -216,8 +227,7 @@ sub findby_callidprefix { $xa_db //= $db; my $table = $db->tableidentifier($tablename); - my $suffixre = '(' . join('|', map { quotemeta($_); } @callid_suffixes) . ')+$'; - $call_id =~ s/$suffixre//g; + $call_id = get_callidprefix($call_id); $call_id =~ s/%/\\%/g; my @conditions = @{$conditions // []}; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm index 517433a..14ad7e1 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm @@ -5,7 +5,7 @@ use strict; use NGCP::BulkProcessor::Logging qw( getlogger - rowsdeleted + rowsupdated rowinserted rowupserted rowupdated @@ -35,6 +35,8 @@ our @EXPORT_OK = qw( insert_row upsert_row + update_export_status + $UNEXPORTED $EXPORTED ); @@ -165,6 +167,37 @@ sub upsert_row { } +sub update_export_status { + + my ($status_id,$export_status,$start_time_from,$start_time_to) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'UPDATE ' . $table . ' SET ' . $db->columnidentifier('export_status') . ' = ?' . + ' WHERE ' . $db->columnidentifier('status_id') . ' = ? AND ' . $db->columnidentifier('export_status') . ' != ?'; + my @params = ($export_status,$status_id,$export_status); + if (defined $start_time_from) { + $stmt .= ' AND ' . $db->columnidentifier('start_time') . ' >= UNIX_TIMESTAMP(?)'; + push(@params,$start_time_from); + } + if (defined $start_time_to) { + $stmt .= ' AND ' . $db->columnidentifier('start_time') . ' < UNIX_TIMESTAMP(?)'; + push(@params,$start_time_to); + } + + my $count; + if ($count = $db->db_do($stmt,@params)) { + rowsupdated($db,$tablename,$count,getlogger(__PACKAGE__)); + return $count; + } else { + rowsupdated($db,$tablename,0,getlogger(__PACKAGE__)); + return 0; + } + +} + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/mark.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/mark.pm new file mode 100644 index 0000000..906e6e8 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/mark.pm @@ -0,0 +1,256 @@ +package NGCP::BulkProcessor::Dao::Trunk::accounting::mark; +use strict; + +## no critic + +use NGCP::BulkProcessor::Logging qw( + getlogger + rowsdeleted +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_accounting_db + destroy_dbs +); + +use NGCP::BulkProcessor::SqlProcessor qw( + checktableinfo + copy_row + + insert_record + update_record +); +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + gettablename + check_table + + get_system_mark + get_reseller_mark + + set_system_mark + set_reseller_mark + + insert_system_mark + insert_reseller_mark + + cleanup_system_marks + cleanup_reseller_marks +); + +my $tablename = 'mark'; +my $get_db = \&get_accounting_db; + +my $expected_fieldnames = [ +"id", +"collector", +"acc_id", +]; + +my $indexes = {}; + +my $insert_unique_fields = []; + +my $system_collector_format = '%s-lastseq'; +my $reseller_collector_format = '%s-lastseq-%d'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub get_system_mark { + + my ($xa_db,$stream) = @_; + + return _get_mark($xa_db,sprintf($system_collector_format,$stream)); + +} + +sub get_reseller_mark { + + my ($xa_db,$stream,$reseller_id) = @_; + + return _get_mark($xa_db,sprintf($reseller_collector_format,$stream,$reseller_id // '')); + +} + +sub _get_mark { + + my ($xa_db,$collector) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT MAX(' . $db->columnidentifier('acc_id') . ') FROM ' . $table . ' WHERE ' . + $db->columnidentifier('collector') . ' = ?'; + my @params = ($collector); + + my $mark = $xa_db->db_get_value($stmt,@params); + + return (defined $mark ? $mark : '0'); + +} + +sub set_system_mark { + + my ($xa_db,$stream,$mark) = @_; + + return _set_mark($xa_db,sprintf($system_collector_format,$stream),$mark,0); + +} + +sub set_reseller_mark { + + my ($xa_db,$stream,$reseller_id,$mark) = @_; + + return _set_mark($xa_db,sprintf($reseller_collector_format,$stream,$reseller_id // ''),$mark,0); + +} + +sub insert_system_mark { + + my ($xa_db,$stream,$mark) = @_; + + return _set_mark($xa_db,sprintf($system_collector_format,$stream),$mark,1); + +} + +sub insert_reseller_mark { + + my ($xa_db,$stream,$reseller_id,$mark) = @_; + + return _set_mark($xa_db,sprintf($reseller_collector_format,$stream,$reseller_id // ''),$mark,1); + +} + +sub _set_mark { + + my ($xa_db,$collector,$mark,$force_insert) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $id; + unless ($force_insert) { + my $stmt = 'SELECT MAX(t1.id) FROM ' . $table . ' t1 LEFT JOIN ' . $table . ' t2' . + ' ON t1.collector = t2.collector and t2.acc_id > t1.acc_id'. + ' WHERE t2.collector IS NULL AND t1.collector = ?'; + my @params = ($collector); + $id = $xa_db->db_get_value($stmt,@params); + } + + if (defined $id) { + return update_record($get_db,$xa_db,__PACKAGE__,{ + id => $id, + acc_id => $mark, + }); + } else { + if (insert_record($db,$xa_db,__PACKAGE__,{ + collector => $collector, + acc_id => $mark + },0,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + return undef; + } + +} + +sub cleanup_system_marks { + + my ($xa_db,$stream) = @_; + + return _cleanup_marks($xa_db,sprintf($system_collector_format,$stream)); + +} + +sub cleanup_reseller_marks { + + my ($xa_db,$stream,$reseller_id) = @_; + + return _cleanup_marks($xa_db,sprintf($reseller_collector_format,$stream,$reseller_id // '')); + +} + +sub _cleanup_marks { + + my ($xa_db,$collector) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT MAX(t1.id) FROM ' . $table . ' t1 LEFT JOIN ' . $table . ' t2' . + ' ON t1.collector = t2.collector and t2.acc_id > t1.acc_id'. + ' WHERE t2.collector IS NULL AND t1.collector = ?'; + my @params = ($collector); + my $id = $xa_db->db_get_value($stmt,@params); + + if (defined $id) { + $stmt = 'DELETE FROM ' . $table . ' WHERE collector = ? AND id != ?'; + push(@params,$id); + if ($xa_db->db_do($stmt,@params)) { + rowsdeleted($db,$tablename,1,1,getlogger(__PACKAGE__)); + return 1; + } else { + rowsdeleted($db,$tablename,0,0,getlogger(__PACKAGE__)); + return 0; + } + } + return 0; + +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Logging.pm b/lib/NGCP/BulkProcessor/Logging.pm index 6f05a3f..ec27d65 100755 --- a/lib/NGCP/BulkProcessor/Logging.pm +++ b/lib/NGCP/BulkProcessor/Logging.pm @@ -55,6 +55,7 @@ our @EXPORT_OK = qw( rowupserted rowupdated rowsdeleted + rowsupdated totalrowsdeleted rowinsertskipped rowupdateskipped @@ -498,6 +499,17 @@ sub rowupdated { } +sub rowsupdated { + + my ($db,$tablename,$rowcount,$logger) = @_; + if (defined $logger) { + + $logger->debug(_getsqlconnectorinstanceprefix($db) . $rowcount . ' row(s) updated'); + + } + +} + sub rowsdeleted { my ($db,$tablename,$rowcount,$initial_rowcount,$logger) = @_; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm index 5fa1896..9f7fe7a 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/CDR.pm @@ -4,33 +4,19 @@ use strict; ## no critic use threads::shared qw(); -#use Time::HiRes qw(sleep); -#use String::MkPasswd qw(); -#use List::Util qw(); -#use Data::Rmap qw(); - -#use Tie::IxHash; - -#use NGCP::BulkProcessor::Globals qw( -# $enablemultithreading -#); use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw( $skip_errors $export_cdr_multithreading + $export_cdr_numofthreads $export_cdr_blocksize $export_cdr_joins $export_cdr_conditions $export_cdr_limit $export_cdr_stream ); -#$dry -#$deadlock_retries -#@providers -#$generate_cdr_numofthreads -#$generate_cdr_count use NGCP::BulkProcessor::Logging qw ( getlogger @@ -46,14 +32,7 @@ use NGCP::BulkProcessor::LogError qw( use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw(); use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status qw(); use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data qw(); - -#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw(); -#use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw(); -#use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw(); -#use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw(); -#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw(); - -#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw(); +use NGCP::BulkProcessor::Dao::Trunk::accounting::mark qw(); use NGCP::BulkProcessor::Projects::Export::Ama::Format::File qw(); use NGCP::BulkProcessor::Projects::Export::Ama::Format::Record qw(); @@ -68,17 +47,15 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigit use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternational qw(); use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime qw(); use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ElapsedTime qw(); +use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber qw(); use NGCP::BulkProcessor::ConnectorPool qw( get_xa_db destroy_dbs + ping_dbs ); -#ping_dbs -#use NGCP::BulkProcessor::Utils qw(threadid timestamp); # stringtobool check_ipnet trim); -##use NGCP::BulkProcessor::DSSorter qw(sort_by_configs); -##use NGCP::BulkProcessor::RandomString qw(createtmpstring); -#use NGCP::BulkProcessor::Array qw(array_to_map); +use NGCP::BulkProcessor::Utils qw(threadid kbytes2gigs); # stringtobool check_ipnet trim); use NGCP::BulkProcessor::Calendar qw(current_local); @@ -86,9 +63,71 @@ require Exporter; our @ISA = qw(Exporter); our @EXPORT_OK = qw( export_cdrs - + reset_fsn + reset_export_status ); +my $file_sequence_number : shared = 0; + +sub reset_export_status { + + my ($from,$to) = @_; + + my $result = 1; + my $context = { tid => threadid(), warning_count => 0, error_count => 0, }; + $result &= _check_export_status_stream($context); + my $updated; + eval { + $updated = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::update_export_status($context->{export_status_id}, + $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::UNEXPORTED,$from,$to); + }; + if ($@) { + if ($skip_errors) { + _warn($context,"problem with export status reset: " . $@); + } else { + _error($context,"problem with export status reset: " . $@); + } + $result = 0; + } else { + _info($context,"$updated export states reset"); + } + + return $result; + +} + +sub reset_fsn { + + my $result = 1; + my $context = { tid => threadid(), warning_count => 0, error_count => 0, }; + $result &= _check_export_status_stream($context); + my $fsn; + eval { + NGCP::BulkProcessor::Dao::Trunk::accounting::mark::cleanup_system_marks(undef, + $export_cdr_stream, + ); + NGCP::BulkProcessor::Dao::Trunk::accounting::mark::set_system_mark(undef, + $export_cdr_stream, + '0' #$NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::min_fsn + ); + $fsn = NGCP::BulkProcessor::Dao::Trunk::accounting::mark::get_system_mark(undef, + $export_cdr_stream + ); #load mark... + }; + if ($@) { + if ($skip_errors) { + _warn($context,"problem with file sequence number reset: " . $@); + } else { + _error($context,"problem with file sequence number reset: " . $@); + } + $result = 0; + } else { + _info($context,"file sequence number reset to $fsn") + } + return $result; + +} + sub export_cdrs { my $static_context = {}; @@ -96,27 +135,26 @@ sub export_cdrs { destroy_dbs(); my $warning_count :shared = 0; - return ($result && NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::process_unexported( + my @ama_files : shared = (); + $result &= NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::process_unexported( static_context => $static_context, process_code => sub { my ($context,$records,$row_offset) = @_; - $context->{rownum} = $row_offset; - $context->{block_cdr_id_map} = { map { $_->[0] => $_->[1]; } @$records }; + $context->{block_call_id_map} = {}; foreach my $record (@$records) { - return 0 if (defined $export_cdr_limit and $context->{rownum} >= $export_cdr_limit); my ($id,$call_id) = @$record; - # skip if the cdr belongs to a call already done in this block: - next unless exists $context->{block_cdr_id_map}->{$id}; - # skip if the cdr is pending for flushing to file: - next if exists $context->{file_cdr_id_map}->{$id}; - # skip if call legs/data is incomplete: - next unless _export_cdrs_init_context($context,$call_id); - # go ahead: - foreach my $cdr (@{$context->{cdrs}}) { - $context->{file_cdr_id_map}->{$cdr->{id}} = $cdr->{start_time}; - delete $context->{cdr_id_map}->{$cdr->{id}}; - $context->{rownum} += 1; + my $call_id_prefix = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::get_callidprefix($call_id); + if (exists $context->{block_call_id_map}->{$call_id_prefix}) { + $context->{block_call_id_map}->{$call_id_prefix} += 1; + } else { + $context->{block_call_id_map}->{$call_id_prefix} = 1; } + } + foreach my $record (@$records) { + return 0 if (defined $export_cdr_limit and $context->{rownum} >= $export_cdr_limit); + return 0 if $context->{file_sequence_number} > $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn; + my ($id,$call_id) = @$record; + next unless _export_cdrs_init_context($context,$id,$call_id); eval { $context->{file}->write_record( get_transfer_in => \&_get_transfer_in, @@ -134,7 +172,7 @@ sub export_cdrs { } } } - + ping_dbs(); return 1; }, init_process_context_code => sub { @@ -142,58 +180,96 @@ sub export_cdrs { $context->{db} = &get_xa_db(); $context->{error_count} = 0; $context->{warning_count} = 0; - # below is not mandatory.. - #_check_insert_tables(); + + $context->{ama_files} = []; + $context->{has_next} = 1; + $context->{rownum} = 0; + + _increment_file_sequence_number($context); }, uninit_process_context_code => sub { my ($context)= @_; - - eval { - $context->{file}->close( - get_transfer_out => \&_get_transfer_out, - commit_cb => \&_commit_export_status, - context => $context, - ); - }; - if ($@) { - if ($skip_errors) { - _warn($context,"problem while closing " . $context->{file}->get_file_name() . ": " . $@); - } else { - _error($context,"problem while exporting " . $context->{file}->get_file_name() . ": " . $@); + $context->{has_next} = 0; #do not reserve another file sequence number + if ($context->{file_sequence_number} <= $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn) { + eval { + $context->{file}->close( + get_transfer_out => \&_get_transfer_out, + commit_cb => \&_commit_export_status, + context => $context, + ); + }; + if ($@) { + if ($skip_errors) { + _warn($context,"problem while closing " . $context->{file}->get_filename() . ": " . $@); + } else { + _error($context,"problem while closing " . $context->{file}->get_filename() . ": " . $@); + } } } undef $context->{db}; - destroy_all_dbs(); + destroy_dbs(); + { lock $warning_count; $warning_count += $context->{warning_count}; + push(@ama_files,@{$context->{ama_files}}); } }, load_recursive => 0, blocksize => $export_cdr_blocksize, multithreading => $export_cdr_multithreading, - numofthreads => 1, + numofthreads => $export_cdr_numofthreads, joins => $export_cdr_joins, conditions => $export_cdr_conditions, #sort => [{ column => 'id', numeric => 1, dir => 1 }], limit => $export_cdr_limit, - ),$warning_count); + ); + + eval { + NGCP::BulkProcessor::Dao::Trunk::accounting::mark::cleanup_system_marks(undef, + $export_cdr_stream); + }; + if ($@) { + if ($skip_errors) { + _warn($static_context,"problem with file sequence number cleanup: " . $@); + } else { + _error($static_context,"problem with file sequence number cleanup: " . $@); + } + $result = 0; + } else { + _info($static_context,"file sequence numbers cleaned up"); + } + + return ($result,$warning_count,\@ama_files); } sub _export_cdrs_init_context { - my ($context,$call_id) = @_; - - my $result = 1; + my ($context,$cdr_id,$call_id) = @_; + my $result = 0; + $context->{cdrs} = undef; $context->{call_id} = $call_id; - $context->{cdrs} = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::findby_callidprefix($context->{db}, - $call_id,$export_cdr_joins,$export_cdr_conditions); - $result &= ((scalar @{$context->{cdrs}}) > 0 ? 1 : 0); - #$result &= ((scalar @{$context->{cdrs}}) == 4 ? 1 : 0); + if (not exists $context->{file_cdr_id_map}->{$cdr_id}) { + my $call_id_prefix = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::get_callidprefix($call_id); + if (exists $context->{block_call_id_map}->{$call_id_prefix}) { + $context->{cdrs} = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::findby_callidprefix($context->{db}, + $call_id,$export_cdr_joins,$export_cdr_conditions); + my $cdrs_in_block = delete $context->{block_call_id_map}->{$call_id_prefix}; + if ((scalar @{$context->{cdrs}}) == $cdrs_in_block) { + foreach my $cdr (@{$context->{cdrs}}) { + $context->{file_cdr_id_map}->{$cdr->{id}} = $cdr; #->{start_time}; + $context->{rownum} += 1; + } + $result = 1; + } + } + } + + # todo: prepare the fields from the call's CDRs: $context->{dt} = current_local(); $context->{source} = "43011001"; @@ -212,41 +288,53 @@ sub _commit_export_status { ) = @params{qw/ context /}; - #my %dropped = (); + _info($context,"file " . $context->{file}->get_filename() . " (" . kbytes2gigs(int((-s $context->{file}->get_filename()) / 1024)) . ") written (" . $context->{file}->get_record_count() . " records in " . $context->{file}->get_block_count() . " blocks)"); eval { + ping_dbs(); $context->{db}->db_begin(); - foreach my $id (keys %{$context->{block_cdr_id_map}}) { + foreach my $id (keys %{$context->{file_cdr_id_map}}) { #mark exported NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::upsert_row($context->{db}, cdr_id => $id, status_id => $context->{export_status_id}, export_status => $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::EXPORTED, - cdr_start_time => $context->{block_cdr_id_map}->{$id}->{start_time}, + cdr_start_time => $context->{file_cdr_id_map}->{$id}->{start_time}, ); _info($context,"export_status set for cdr id $id",1); - #$dropped{$cdr_id} = delete $context->{file_cdrs}->{$cdr_id}; } - + NGCP::BulkProcessor::Dao::Trunk::accounting::mark::insert_system_mark($context->{db}, + $export_cdr_stream, + $context->{file_sequence_number}, + ); #set mark... + _info($context,"file sequence number $context->{file_sequence_number} saved"); $context->{db}->db_commit(); }; - $context->{block_cdr_id_map} = {}; + $context->{file_cdr_id_map} = {}; my $err = $@; if ($err) { eval { $context->{db}->db_rollback(1); - #foreach (keys %dropped) { - # $cdr_id_map{$_} = $dropped{$_}; - #} }; eval { unlink $context->{file}->get_filename(); }; die($err); + } else { + push(@{$context->{ama_files}},$context->{file}->get_filename()); + _increment_file_sequence_number($context) if $context->{has_next}; } } +sub _increment_file_sequence_number { + my ($context) = @_; + lock $file_sequence_number; + $file_sequence_number = $file_sequence_number + 1; + _info($context,"file sequence number incremented: $file_sequence_number",1); + $context->{file_sequence_number} = $file_sequence_number; +} + sub _get_transfer_in { my %params = @_; @@ -255,20 +343,21 @@ sub _get_transfer_in { ) = @params{qw/ context /}; + return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new( NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9013->new( rewritten => 0, - sensor_id => '008708', # Graz + sensor_id => '438716', # Graz padding => 0, - recording_office_id => '008708', + recording_office_id => '438716', date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($context->{dt}), - connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($context->{dt}), + connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($context->{dt}), # adjacent - file_sequence_number => 1, + file_sequence_number => $context->{file_sequence_number}, ) ); @@ -287,21 +376,31 @@ sub _get_record { call_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::CallType::STATION_PAID, rewritten => 0, - sensor_id => '008708', # Graz + sensor_id => '438716', # Graz padding => 0, - recording_office_id => '008708', + recording_office_id => '438716', #008708 + + #call code 970c + #timing ind 000 + #seervice observed 0c + + #called party off-hook setzen + #unanswered => date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($context->{dt}), service_feature => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ServiceFeature::OTHER, - +#mit 43 originating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($context->{source}), originating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($context->{source}), originating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($context->{source}), - domestic_international => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternational::get_number_domestic_international($context->{destination}), + domestic_international => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternational::INTERNATIONAL, #get_number_domestic_international($context->{destination}), +#2c +#destination number mit 43 +#dialed_in , 0er wegstreichen terminating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($context->{destination}), terminating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($context->{destination}), terminating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($context->{destination}), @@ -323,7 +422,7 @@ sub _get_transfer_out { /}; return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new( NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9014->new( - + @_, rewritten => 0, sensor_id => '008708', # Graz @@ -334,15 +433,17 @@ sub _get_transfer_out { connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($context->{dt}), - #file_sequence_number => 1, + file_sequence_number => $context->{file_sequence_number}, #=> (scalar @records), + + ) ); } -sub _export_cdrs_create_context { +sub _check_export_status_stream { my ($context) = @_; @@ -359,11 +460,47 @@ sub _export_cdrs_create_context { _error($context,"cannot find export stream '$export_cdr_stream'"); $result = 0; } elsif ($export_status) { - _info($context,"export stream '$export_cdr_stream' set"); + _info($context,"using export stream '$export_cdr_stream'"); } + return $result; + +} + +sub _export_cdrs_create_context { + + my ($context) = @_; + + my $result = 1; + + $context->{tid} = threadid(); + + $result &= _check_export_status_stream($context); + $context->{file} = NGCP::BulkProcessor::Projects::Export::Ama::Format::File->new(); $context->{file_cdr_id_map} = {}; + $context->{has_next} = 1; + + my $fsn; + eval { + $fsn = NGCP::BulkProcessor::Dao::Trunk::accounting::mark::get_system_mark(undef, + $export_cdr_stream + ); #load mark... + }; + if ($@) { + _error($context,"cannot get last file sequence number"); + $result = 0; + } else { + if ($fsn < 0) { + $fsn = 0; + } elsif ($fsn >= $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn) { + _warn($context,"file sequence number $fsn exceeding limit (" . $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn . ")"); + $result = 0; + } + _info($context,"last file sequence number is $fsn"); + lock $file_sequence_number; + $file_sequence_number = $fsn; + } return $result; } diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Block.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Block.pm index a5fe2bf..400552b 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Block.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Block.pm @@ -37,6 +37,26 @@ sub add_record { } +sub records_fit { + my $self = shift; + my @records = @_; + if (not $self->{padded}) { + my $length = $self->get_length(); + foreach my $record (@records) { + if (not ref $record) { + $length += $record; + } else { + $length += $record->get_length(); + } + } + if ($length <= 2 * $max_block_length) { + return 1; + } + } + return 0; + +} + sub get_hex { my $self = shift; @@ -45,7 +65,7 @@ sub get_hex { $result .= $record->get_hex(); } if ($self->{padded}) { - $result .= 'aa' x (2 * $max_block_length - length($result)); + $result .= 'a' x (2 * $max_block_length - length($result)); } return $result; } diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/FileSequenceNumber.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/FileSequenceNumber.pm index d9701ee..1c6a879 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/FileSequenceNumber.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/FileSequenceNumber.pm @@ -10,13 +10,17 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Field qw($TERMINATOR); require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::Projects::Export::Ama::Format::Field); our @EXPORT_OK = qw( - + $min_fsn + $max_fsn ); my $field_name = "file sequence number"; my $length = 4; my @param_names = qw/file_sequence_number/; +our $min_fsn = 1; +our $max_fsn = 999; + sub new { my $class = shift; @@ -41,7 +45,7 @@ sub get_hex { my $self = shift; my ($file_sequence_number) = $self->_get_params(@_); - die("invalid file sequence number '$file_sequence_number'") if ($file_sequence_number < 1 or $file_sequence_number > 999); + die("invalid file sequence number '$file_sequence_number'") if ($file_sequence_number < $min_fsn or $file_sequence_number > $max_fsn); return sprintf('%03d',$file_sequence_number) . $TERMINATOR; } diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/ServiceFeature.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/ServiceFeature.pm index fc0c44d..9be2ed6 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/ServiceFeature.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Fields/ServiceFeature.pm @@ -56,7 +56,7 @@ sub get_hex { my $self = shift; my ($service_feature) = $self->_get_params(@_); die("invalid service feature '$service_feature'") unless length($service_feature) == 3; - return $service_feature . $TERMINATOR; + return sprintf("%03d",$service_feature) . $TERMINATOR; } diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm index 171b5c5..43d2ad8 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/File.pm @@ -3,7 +3,13 @@ use strict; ## no critic +use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw( + $output_path + $ama_filename_format +); + use NGCP::BulkProcessor::Projects::Export::Ama::Format::Block qw(); +use NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9014 qw(); use NGCP::BulkProcessor::Logging qw( getlogger @@ -36,27 +42,44 @@ sub reset { $self->{current_block} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Block->new(); $self->{blocks} = [ $self->{current_block} ]; $self->{record_count} = 0; + $self->_save_transfer_in(undef); + $self->_save_transfer_out(undef); + return; +} +sub get_record_count { + my $self = shift; + return $self->{record_count}; +} + +sub get_block_count { + my $self = shift; + return scalar @{$self->{blocks}}; } sub add_record { my $self = shift; - my ($record,$pad) = shift; + my ($record,$pad) = @_; my $result; - if (not $self->{current_block}->add_record($record)) { - if ((scalar @{$self->{blocks}}) >= $max_blocks) { - $result = 0; + if (not $pad and (scalar @{$self->{blocks}}) >= $max_blocks and not $self->{current_block}->records_fit($record, + $NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9014::length)) { + $result = 0; + } else { + if (not $self->{current_block}->add_record($record)) { + if ((scalar @{$self->{blocks}}) >= $max_blocks) { + $result = 0; + } else { + $self->{current_block}->set_padded(1); + $self->{current_block} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Block->new(); + push(@{$self->{blocks}},$self->{current_block}); + $result = $self->{current_block}->add_record($record); + $self->{record_count} += 1; + } } else { - $self->{current_block}->set_padded(1); - $self->{current_block} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Block->new(); - push(@{$self->{blocks}},$self->{current_block}); - $result = $self->{current_block}->add_record($record); $self->{record_count} += 1; + $self->{current_block}->set_padded(1) if $pad; + $result = 1; } - } else { - $self->{record_count} += 1; - $self->{current_block}->set_padded(1) if $pad; - $result = 1; } return $result; @@ -64,7 +87,10 @@ sub add_record { sub get_filename { my $self = shift; - return 'test.ama'; + return sprintf($ama_filename_format, + $output_path, + $self->{transfer_in}->get_structure()->get_file_sequence_number_field()->{file_sequence_number}, + ); } sub flush { @@ -77,17 +103,22 @@ sub flush { /}; #unlink 'test.ama'; if ((scalar @{$self->{blocks}}) > 0 and (my $filename = $self->get_filename())) { - if (open(my $fh,">:raw",$filename)) { - foreach my $block (@{$self->{blocks}}) { - print $fh pack('H*',$block->get_hex()); - } - close $fh; - &$commit_cb(@_) if defined $commit_cb; - #restdebug($self,"$self->{crt_path} saved",getlogger(__PACKAGE__)); - return 1; - } else { - fileerror('failed to open ' . $filename . ": $!",getlogger(__PACKAGE__)); + if (-e $filename) { + fileerror($filename . ' already exists',getlogger(__PACKAGE__)); return 0; + } else { + if (open(my $fh,">:raw",$filename)) { + foreach my $block (@{$self->{blocks}}) { + print $fh pack('H*',$block->get_hex()); + } + close $fh; + &$commit_cb(@_) if defined $commit_cb; + #restdebug($self,"$self->{crt_path} saved",getlogger(__PACKAGE__)); + return 1; + } else { + fileerror('failed to open ' . $filename . ": $!",getlogger(__PACKAGE__)); + return 0; + } } } else { return 0; @@ -107,15 +138,17 @@ sub close { /}; my $result = 0; $self->add_record( - &$get_transfer_out( - + $self->_save_transfer_out(&$get_transfer_out( #file_sequence_number => 1, #=> (scalar @records), @_ - ), + )), 1, ); + # update count fields: + $self->{transfer_out}->get_structure()->get_block_count_field()->_set_params(block_count => $self->get_block_count()); + $self->{transfer_out}->get_structure()->get_record_count_field()->_set_params(record_count => $self->get_record_count()); $result |= $self->flush( commit_cb => $commit_cb, @_ @@ -142,27 +175,28 @@ sub write_record { /}; $self->add_record( - &$get_transfer_in( + $self->_save_transfer_in(&$get_transfer_in( #file_sequence_number => 1, @_, - ), + )), 1 ) unless $self->{record_count} > 0; my $result = 0; my $record = &$get_record(@_); if (not $self->add_record($record)) { + #my $blah="y"; $result |= $self->close( get_transfer_out => $get_transfer_out, commit_cb => $commit_cb, @_ ); $self->add_record( - &$get_transfer_in( + $self->_save_transfer_in(&$get_transfer_in( #file_sequence_number => 1, @_ - ), + )), 1 ); @@ -173,4 +207,26 @@ sub write_record { } +sub _save_transfer_in { + my $self = shift; + my $record = shift; + if (defined $record) { + $self->{transfer_in} = $record; + } else { + undef $self->{transfer_in}; + } + return $record; +} + +sub _save_transfer_out { + my $self = shift; + my $record = shift; + if (defined $record) { + $self->{transfer_out} = $record; + } else { + undef $self->{transfer_out}; + } + return $record; +} + 1; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Modules/Module104.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Modules/Module104.pm new file mode 100644 index 0000000..cf9e89b --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Modules/Module104.pm @@ -0,0 +1,4 @@ + + +TRUNK_FACILITY_ID 129990000C +TRUNK_FACILITY_ID 220140000C \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Record.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Record.pm index 1539c49..392e160 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Record.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Record.pm @@ -23,6 +23,11 @@ sub new { } +sub get_structure { + my $self = shift; + return $self->{structure}; +} + sub get_hex { my $self = shift; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure0510.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure0510.pm index 98565ef..19a066f 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure0510.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure0510.pm @@ -32,7 +32,7 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ElapsedTime qw() require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::Projects::Export::Ama::Format::FieldSet NGCP::BulkProcessor::Projects::Export::Ama::Format::Structure); our @EXPORT_OK = qw( - + $length ); #get_instance @@ -40,7 +40,7 @@ our @EXPORT_OK = qw( #my $INSTANCE = __PACKAGE__->new(); -my $length = 142; +our $length = 142; sub new { diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9013.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9013.pm index 2eb8ed7..acb8280 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9013.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9013.pm @@ -21,10 +21,10 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumb require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::Projects::Export::Ama::Format::FieldSet NGCP::BulkProcessor::Projects::Export::Ama::Format::Structure); our @EXPORT_OK = qw( - + $length ); -my $length = 64; +our $length = 64; sub new { @@ -71,9 +71,10 @@ sub new { tracer_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::TracerType::TRANSFER_IN, @_, )); - $self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber->new( + $self->{file_sequence_number} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber->new( @_, - )); + ); + $self->_add_field($self->{file_sequence_number}); return $self; @@ -84,6 +85,11 @@ sub get_structure_code_field { return $self->{structure_code}; } +sub get_file_sequence_number_field { + my $self = shift; + return $self->{file_sequence_number}; +} + #sub get_instance { # return #} diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9014.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9014.pm index c54b236..a7a8157 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9014.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Format/Structures/Structure9014.pm @@ -23,10 +23,10 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::BlockCount qw(); require Exporter; our @ISA = qw(Exporter NGCP::BulkProcessor::Projects::Export::Ama::Format::FieldSet NGCP::BulkProcessor::Projects::Export::Ama::Format::Structure); our @EXPORT_OK = qw( - + $length ); -my $length = 78; +our $length = 78; sub new { @@ -70,18 +70,23 @@ sub new { @_, )); $self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::TracerType->new( - tracer_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::TracerType::TRANSFER_IN, + tracer_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::TracerType::TRANSFER_OUT, @_, )); - $self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber->new( + $self->{file_sequence_number} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber->new( @_, - )); - $self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::RecordCount->new( + ); + $self->_add_field($self->{file_sequence_number}); + + $self->{record_count} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::RecordCount->new( @_, - )); - $self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::BlockCount->new( + ); + $self->_add_field($self->{record_count}); + + $self->{block_count} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::BlockCount->new( @_, - )); + ); + $self->_add_field($self->{block_count}); return $self; @@ -92,6 +97,21 @@ sub get_structure_code_field { return $self->{structure_code}; } +sub get_file_sequence_number_field { + my $self = shift; + return $self->{file_sequence_number}; +} + +sub get_record_count_field { + my $self = shift; + return $self->{record_count}; +} + +sub get_block_count_field { + my $self = shift; + return $self->{block_count}; +} + #sub get_instance { # return #} diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm index b0c16e8..ae07a6d 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/Settings.pm @@ -46,6 +46,7 @@ our @EXPORT_OK = qw( $force $export_cdr_multithreading + $export_cdr_numofthreads $export_cdr_blocksize $export_cdr_joins $export_cdr_conditions @@ -54,6 +55,8 @@ our @EXPORT_OK = qw( $domestic_destination_pattern $international_destination_pattern + + $ama_filename_format ); #check_dry #$dry @@ -72,6 +75,7 @@ our $force = 0; our $skip_errors = 0; our $export_cdr_multithreading = $enablemultithreading; +our $export_cdr_numofthreads = $cpucount; our $export_cdr_blocksize = undef; our $export_cdr_joins = []; our $export_cdr_conditions = []; @@ -81,6 +85,8 @@ our $export_cdr_stream = undef; our $domestic_destination_pattern = undef; our $international_destination_pattern = undef; +our $ama_filename_format = '%1$s%2$s.ama'; + sub update_settings { my ($data,$configfile) = @_; @@ -106,6 +112,7 @@ sub update_settings { $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; $export_cdr_multithreading = $data->{export_cdr_multithreading} if exists $data->{export_cdr_multithreading}; + $export_cdr_numofthreads = _get_numofthreads($cpucount,$data,'export_cdr_numofthreads'); $export_cdr_blocksize = $data->{export_cdr_blocksize} if exists $data->{export_cdr_blocksize}; my $parse_result; @@ -128,6 +135,8 @@ sub update_settings { ($regexp_result,$international_destination_pattern) = parse_regexp($international_destination_pattern,$configfile); $result &= $regexp_result; + $ama_filename_format = $data->{ama_filename_format} if exists $data->{ama_filename_format}; + return $result; } @@ -196,4 +205,12 @@ sub _parse_export_conditions { return (1,\@conditions); } +sub _get_numofthreads { + my ($default_value,$data,$key) = @_; + my $_numofthreads = $default_value; + $_numofthreads = $data->{$key} if exists $data->{$key}; + $_numofthreads = $cpucount if $_numofthreads > $cpucount; + return $_numofthreads; +} + 1; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl b/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl index af0dc9e..8dcce9e 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/process.pl @@ -67,6 +67,8 @@ use NGCP::BulkProcessor::ConnectorPool qw(destroy_dbs); use NGCP::BulkProcessor::Projects::Export::Ama::CDR qw( export_cdrs + reset_fsn + reset_export_status ); scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet @@ -74,6 +76,8 @@ scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unl my @TASK_OPTS = (); my $tasks = []; +my $from = undef, +my $to = undef; my $cleanup_task_opt = 'cleanup'; push(@TASK_OPTS,$cleanup_task_opt); @@ -83,6 +87,12 @@ push(@TASK_OPTS,$cleanup_all_task_opt); my $export_cdr_task_opt = 'export_cdr'; push(@TASK_OPTS,$export_cdr_task_opt); +my $reset_fsn_task_opt = 'reset_fsn'; +push(@TASK_OPTS,$reset_fsn_task_opt); + +my $reset_export_status_task_opt = 'reset_export_status'; +push(@TASK_OPTS,$reset_export_status_task_opt); + if (init()) { main(); exit(0); @@ -102,6 +112,8 @@ sub init { #"dry" => \$dry, "skip-errors" => \$skip_errors, "force" => \$force, + "from=s" => \$from, + "to=s" => \$to, ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); $tasks = removeduplicates($tasks,1); @@ -138,20 +150,17 @@ sub main() { $completion |= 1; } + } elsif (lc($reset_fsn_task_opt) eq lc($task)) { + if (taskinfo($reset_fsn_task_opt,$result,1)) { + #next unless check_dry(); + $result &= reset_fsn_task(\@messages); + } - #} elsif (lc($provision_subscriber_task_opt) eq lc($task)) { - # if (taskinfo($provision_subscriber_task_opt,$result,1)) { - # next unless check_dry(); - # $result &= provision_subscriber_task(\@messages); - # $completion |= 1; - # } - - #} elsif (lc($generate_cdr_task_opt) eq lc($task)) { - # if (taskinfo($generate_cdr_task_opt,$result,1)) { - # next unless check_dry(); - # $result &= generate_cdr_task(\@messages); - # $completion |= 1; - # } + } elsif (lc($reset_export_status_task_opt) eq lc($task)) { + if (taskinfo($reset_export_status_task_opt,$result,1)) { + #next unless check_dry(); + $result &= reset_export_status_task(\@messages); + } } else { $result = 0; @@ -205,21 +214,61 @@ sub cleanup_task { sub export_cdr_task { + my ($messages) = @_; + my ($result,$warning_count,$ama_files) = (0,0,[]); + eval { + ($result,$warning_count,$ama_files) = export_cdrs(); + }; + my $err = $@; + my $stats = ": " . (scalar @$ama_files) . ' files'; # . ((scalar @$ama_files) > 0 ? "\n " : '') . join("\n ",@$ama_files); + foreach my $ama_file (@$ama_files) { + $stats .= "\n " . $ama_file; + } + #eval { + # #stats .= "\n total CDRs: " . + # # NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::countby_ratingstatus(undef) . ' rows'; + #}; + if ($err or !$result) { + push(@$messages,"export cdrs INCOMPLETE$stats"); + } else { + push(@$messages,"export cdrs completed$stats"); + } + destroy_dbs(); + return $result; + +} + +sub reset_fsn_task { + my ($messages) = @_; my ($result) = (0); eval { - ($result) = export_cdrs(); + ($result) = reset_fsn(); }; my $err = $@; - my $stats = ":"; + if ($err or !$result) { + push(@$messages,"reset file sequence number INCOMPLETE"); + } else { + push(@$messages,"reset file sequence number completed"); + } + destroy_dbs(); + return $result; + +} + +sub reset_export_status_task { + + my ($messages) = @_; + my ($result) = (0); eval { - #stats .= "\n total CDRs: " . - # NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::countby_ratingstatus(undef) . ' rows'; + ($result) = reset_export_status($from,$to); }; + my $err = $@; + my $fromto = 'from ' . ($from ? $from : '-') . ' to ' . ($to ? $to : '-'); if ($err or !$result) { - push(@$messages,"export cdrs INCOMPLETE$stats"); + push(@$messages,"reset export status $fromto INCOMPLETE"); } else { - push(@$messages,"export cdrs completed$stats"); + push(@$messages,"reset export status $fromto completed"); } destroy_dbs(); return $result; diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg index daf3493..e9c37c4 100755 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.cfg @@ -1,10 +1,11 @@ - #skip_errors=0 -providers_yml = providers.yml - export_cdr_multithreading = 1 +export_cdr_numofthreads = 2 export_cdr_blocksize = 1000 export_cdr_joins = { 'billing.voip_subscribers source_voip_subscribers' => { 'source_voip_subscribers.uuid' => 'accounting.cdr.source_user_id' } }, { 'billing.voip_subscribers destination_voip_subscribers' => { 'destination_voip_subscribers.uuid' => 'accounting.cdr.destination_user_id' } }, { 'billing.billing_zones_history source_carrier_bbz' => { 'source_carrier_bbz.id' => 'accounting.cdr.source_carrier_billing_zone_id' } }, { 'billing.billing_zones_history source_reseller_bbz' => { 'source_reseller_bbz.id' => 'accounting.cdr.source_reseller_billing_zone_id' } }, { 'billing.billing_zones_history source_customer_bbz' => { 'source_customer_bbz.id' => 'accounting.cdr.source_customer_billing_zone_id' } }, { 'billing.billing_zones_history destination_carrier_bbz' => { 'destination_carrier_bbz.id' => 'accounting.cdr.destination_carrier_billing_zone_id' } }, { 'billing.billing_zones_history destination_reseller_bbz' => { 'destination_reseller_bbz.id' => 'accounting.cdr.destination_reseller_billing_zone_id' } }, { 'billing.billing_zones_history destination_customer_bbz' => { 'destination_customer_bbz.id' => 'accounting.cdr.destination_customer_billing_zone_id' } } export_cdr_conditions = { 'accounting.cdr.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } }, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } } export_cdr_limit = 300000 +export_cdr_stream = ama_simple + +ama_filename_format = %1$s%2$03d.debug.ama diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg index e5387cd..9b374a4 100755 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/settings.debug.cfg @@ -1,14 +1,15 @@ - #skip_errors=0 -providers_yml = providers.yml - export_cdr_multithreading = 1 +export_cdr_numofthreads = 2 export_cdr_blocksize = 1000 #export_cdr_joins = { 'accounting.cdr_export_status_data esd' => { 'esd.cdr_id' => 'accounting.cdr.id' } }, { 'accounting.cdr_export_status es' => { 'es.id' => 'esd.status_id' } } #export_cdr_conditions = { 'es.type' => { '=' => '"default"' } }, { 'esd.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } } #, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } } export_cdr_conditions = { 'accounting.cdr.call_status' => { '=' => '"ok"' } } -export_cdr_stream = default +export_cdr_stream = ama_simple +#default #export_cdr_limit = 3500 #300000 + +ama_filename_format = %1$s%2$03d.debug.ama diff --git a/lib/NGCP/BulkProcessor/Projects/Export/Ama/test.pl b/lib/NGCP/BulkProcessor/Projects/Export/Ama/test.pl index cd8ea9e..d062d70 100644 --- a/lib/NGCP/BulkProcessor/Projects/Export/Ama/test.pl +++ b/lib/NGCP/BulkProcessor/Projects/Export/Ama/test.pl @@ -6,6 +6,17 @@ use File::Basename; use Cwd; use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); +use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw( + update_settings +); + +use NGCP::BulkProcessor::LoadConfig qw( + load_config + $SIMPLE_CONFIG_TYPE + $YAML_CONFIG_TYPE + $ANY_CONFIG_TYPE +); + use NGCP::BulkProcessor::Calendar qw(current_local); use NGCP::BulkProcessor::Projects::Export::Ama::Format::File qw(); @@ -22,6 +33,9 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternat use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime qw(); use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ElapsedTime qw(); +load_config('config.cfg'); +load_config('settings.cfg',\&update_settings,$SIMPLE_CONFIG_TYPE); + my $dt = current_local(); my $source = "43011001"; my $destination = "43011002"; @@ -31,79 +45,101 @@ my $file = NGCP::BulkProcessor::Projects::Export::Ama::Format::File->new( ); -my $limit = 1000; +my $limit = 5000; my $i = 0; -while ($i < $limit) { - $file->write_record(sub { - return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new( - NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9013->new( +my $file_sequence_number = 123; + +sub get_transfer_in { + return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new( + NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9013->new( + + rewritten => 0, + sensor_id => '008708', # Graz + + padding => 0, + recording_office_id => '008708', - rewritten => 0, - sensor_id => '008708', # Graz + date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt), - padding => 0, - recording_office_id => '008708', + connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt), - date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt), + file_sequence_number => $file_sequence_number, + ) + ); +} + +sub get_record { + return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new( + NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure0510->new( + call_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::CallType::STATION_PAID, - connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt), + rewritten => 0, + sensor_id => '008708', # Graz - file_sequence_number => 1, - ) - ); + padding => 0, + recording_office_id => '008708', - }, sub { - return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new( - NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure0510->new( - call_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::CallType::STATION_PAID, + date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt), - rewritten => 0, - sensor_id => '008708', # Graz + service_feature => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ServiceFeature::OTHER, - padding => 0, - recording_office_id => '008708', + originating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($source), + originating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($source), + originating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($source), - date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt), + domestic_international => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternational::get_number_domestic_international($destination), - service_feature => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ServiceFeature::OTHER, + terminating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($destination), + terminating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($destination), + terminating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($destination), - originating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($source), - originating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($source), - originating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($source), + connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt), + elapsed_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ElapsedTime::get_elapsed_time($duration), + ) + ); +} - domestic_international => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternational::get_number_domestic_international($destination), +sub get_transfer_out { - terminating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($destination), - terminating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($destination), - terminating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($destination), + return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new( + NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9014->new( - connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt), - elapsed_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ElapsedTime::get_elapsed_time($duration), - ) - ); - }, sub { + rewritten => 0, + sensor_id => '008708', # Graz - return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new( - NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9014->new( + padding => 0, + recording_office_id => '008708', - rewritten => 0, - sensor_id => '008708', # Graz - - padding => 0, - recording_office_id => '008708', + date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt), - date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt), + connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt), - connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt), + file_sequence_number => $file_sequence_number, - file_sequence_number => 1, + #=> (scalar @records), + ) + ); + +} - => (scalar @records), - ) - ); - }); +sub commit_cb { + print $file_sequence_number . "\n"; + $file_sequence_number++; +} + +while ($i < $limit) { + $file->write_record( + get_transfer_in => \&get_transfer_in, + get_record => \&get_record, + get_transfer_out => \&get_transfer_out, + commit_cb => \&commit_cb, + ); $i++; } +$file->close( + get_transfer_out => \&get_transfer_out, + commit_cb => \&commit_cb, +); #print $test->{structure}->to_string()."\n"; #print $test->get_hex(); \ No newline at end of file diff --git a/lib/NGCP/BulkProcessor/SqlConnector.pm b/lib/NGCP/BulkProcessor/SqlConnector.pm index ac3a695..f8152c5 100755 --- a/lib/NGCP/BulkProcessor/SqlConnector.pm +++ b/lib/NGCP/BulkProcessor/SqlConnector.pm @@ -813,7 +813,9 @@ sub DESTROY { if ($self->{tid} == threadid()) { $self->_db_disconnect(); delete $self->{drh}; - dbdebug($self,(ref $self) . ' connector destroyed',getlogger(__PACKAGE__)); + eval { + dbdebug($self,(ref $self) . ' connector destroyed',getlogger(__PACKAGE__)); + }; #} else { # print "NOT destroyed\n"; } diff --git a/lib/NGCP/BulkProcessor/Table.pm b/lib/NGCP/BulkProcessor/Table.pm index ed7f5cf..5745f25 100755 --- a/lib/NGCP/BulkProcessor/Table.pm +++ b/lib/NGCP/BulkProcessor/Table.pm @@ -21,7 +21,7 @@ sub new { sub _set_data { my $self = shift; - my ($data,$dupecheck) = shift; + my ($data,$dupecheck) = @_; $self->clear(); if (defined $data and ref $data eq 'ARRAY') { if ($dupecheck) { diff --git a/lib/NGCP/BulkProcessor/Utils.pm b/lib/NGCP/BulkProcessor/Utils.pm index 709e651..09c36dc 100755 --- a/lib/NGCP/BulkProcessor/Utils.pm +++ b/lib/NGCP/BulkProcessor/Utils.pm @@ -124,7 +124,7 @@ sub float_equal { sub round { - my ($number) = shift; + my ($number) = @_; return int($number + .5 * ($number <=> 0)); }