TT#47532 ama: persisting file sequence number

+ control command to reset fsn
+ control command to reset a export stream's export_status
+ produces intact .ama files now

Change-Id: I3b9456416851d2caa3273b54d9e835097663c98b
(cherry picked from commit 72c17907e4)
changes/96/26796/1
Rene Krenn 7 years ago
parent 8e50c98a1d
commit d28dc6badf

@ -41,6 +41,8 @@ our @EXPORT_OK = qw(
findby_callidprefix
process_unexported
get_callidprefix
);
#process_records
#delete_ids
@ -207,6 +209,15 @@ sub countby_ratingstatus {
}
sub get_callidprefix {
my ($call_id) = @_;
my $suffixre = '(' . join('|', map { quotemeta($_); } @callid_suffixes) . ')+$';
$call_id =~ s/$suffixre//g;
return $call_id
}
sub findby_callidprefix {
my ($xa_db,$call_id,$joins,$conditions,$load_recursive) = @_;
@ -216,8 +227,7 @@ sub findby_callidprefix {
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $suffixre = '(' . join('|', map { quotemeta($_); } @callid_suffixes) . ')+$';
$call_id =~ s/$suffixre//g;
$call_id = get_callidprefix($call_id);
$call_id =~ s/%/\\%/g;
my @conditions = @{$conditions // []};

@ -5,7 +5,7 @@ use strict;
use NGCP::BulkProcessor::Logging qw(
getlogger
rowsdeleted
rowsupdated
rowinserted
rowupserted
rowupdated
@ -35,6 +35,8 @@ our @EXPORT_OK = qw(
insert_row
upsert_row
update_export_status
$UNEXPORTED
$EXPORTED
);
@ -165,6 +167,37 @@ sub upsert_row {
}
sub update_export_status {
my ($status_id,$export_status,$start_time_from,$start_time_to) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'UPDATE ' . $table . ' SET ' . $db->columnidentifier('export_status') . ' = ?' .
' WHERE ' . $db->columnidentifier('status_id') . ' = ? AND ' . $db->columnidentifier('export_status') . ' != ?';
my @params = ($export_status,$status_id,$export_status);
if (defined $start_time_from) {
$stmt .= ' AND ' . $db->columnidentifier('start_time') . ' >= UNIX_TIMESTAMP(?)';
push(@params,$start_time_from);
}
if (defined $start_time_to) {
$stmt .= ' AND ' . $db->columnidentifier('start_time') . ' < UNIX_TIMESTAMP(?)';
push(@params,$start_time_to);
}
my $count;
if ($count = $db->db_do($stmt,@params)) {
rowsupdated($db,$tablename,$count,getlogger(__PACKAGE__));
return $count;
} else {
rowsupdated($db,$tablename,0,getlogger(__PACKAGE__));
return 0;
}
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;

@ -0,0 +1,256 @@
package NGCP::BulkProcessor::Dao::Trunk::accounting::mark;
use strict;
## no critic
use NGCP::BulkProcessor::Logging qw(
getlogger
rowsdeleted
);
use NGCP::BulkProcessor::ConnectorPool qw(
get_accounting_db
destroy_dbs
);
use NGCP::BulkProcessor::SqlProcessor qw(
checktableinfo
copy_row
insert_record
update_record
);
use NGCP::BulkProcessor::SqlRecord qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord);
our @EXPORT_OK = qw(
gettablename
check_table
get_system_mark
get_reseller_mark
set_system_mark
set_reseller_mark
insert_system_mark
insert_reseller_mark
cleanup_system_marks
cleanup_reseller_marks
);
my $tablename = 'mark';
my $get_db = \&get_accounting_db;
my $expected_fieldnames = [
"id",
"collector",
"acc_id",
];
my $indexes = {};
my $insert_unique_fields = [];
my $system_collector_format = '%s-lastseq';
my $reseller_collector_format = '%s-lastseq-%d';
sub new {
my $class = shift;
my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db,
$tablename,$expected_fieldnames,$indexes);
copy_row($self,shift,$expected_fieldnames);
return $self;
}
sub get_system_mark {
my ($xa_db,$stream) = @_;
return _get_mark($xa_db,sprintf($system_collector_format,$stream));
}
sub get_reseller_mark {
my ($xa_db,$stream,$reseller_id) = @_;
return _get_mark($xa_db,sprintf($reseller_collector_format,$stream,$reseller_id // ''));
}
sub _get_mark {
my ($xa_db,$collector) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT MAX(' . $db->columnidentifier('acc_id') . ') FROM ' . $table . ' WHERE ' .
$db->columnidentifier('collector') . ' = ?';
my @params = ($collector);
my $mark = $xa_db->db_get_value($stmt,@params);
return (defined $mark ? $mark : '0');
}
sub set_system_mark {
my ($xa_db,$stream,$mark) = @_;
return _set_mark($xa_db,sprintf($system_collector_format,$stream),$mark,0);
}
sub set_reseller_mark {
my ($xa_db,$stream,$reseller_id,$mark) = @_;
return _set_mark($xa_db,sprintf($reseller_collector_format,$stream,$reseller_id // ''),$mark,0);
}
sub insert_system_mark {
my ($xa_db,$stream,$mark) = @_;
return _set_mark($xa_db,sprintf($system_collector_format,$stream),$mark,1);
}
sub insert_reseller_mark {
my ($xa_db,$stream,$reseller_id,$mark) = @_;
return _set_mark($xa_db,sprintf($reseller_collector_format,$stream,$reseller_id // ''),$mark,1);
}
sub _set_mark {
my ($xa_db,$collector,$mark,$force_insert) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $id;
unless ($force_insert) {
my $stmt = 'SELECT MAX(t1.id) FROM ' . $table . ' t1 LEFT JOIN ' . $table . ' t2' .
' ON t1.collector = t2.collector and t2.acc_id > t1.acc_id'.
' WHERE t2.collector IS NULL AND t1.collector = ?';
my @params = ($collector);
$id = $xa_db->db_get_value($stmt,@params);
}
if (defined $id) {
return update_record($get_db,$xa_db,__PACKAGE__,{
id => $id,
acc_id => $mark,
});
} else {
if (insert_record($db,$xa_db,__PACKAGE__,{
collector => $collector,
acc_id => $mark
},0,$insert_unique_fields)) {
return $xa_db->db_last_insert_id();
}
return undef;
}
}
sub cleanup_system_marks {
my ($xa_db,$stream) = @_;
return _cleanup_marks($xa_db,sprintf($system_collector_format,$stream));
}
sub cleanup_reseller_marks {
my ($xa_db,$stream,$reseller_id) = @_;
return _cleanup_marks($xa_db,sprintf($reseller_collector_format,$stream,$reseller_id // ''));
}
sub _cleanup_marks {
my ($xa_db,$collector) = @_;
check_table();
my $db = &$get_db();
$xa_db //= $db;
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT MAX(t1.id) FROM ' . $table . ' t1 LEFT JOIN ' . $table . ' t2' .
' ON t1.collector = t2.collector and t2.acc_id > t1.acc_id'.
' WHERE t2.collector IS NULL AND t1.collector = ?';
my @params = ($collector);
my $id = $xa_db->db_get_value($stmt,@params);
if (defined $id) {
$stmt = 'DELETE FROM ' . $table . ' WHERE collector = ? AND id != ?';
push(@params,$id);
if ($xa_db->db_do($stmt,@params)) {
rowsdeleted($db,$tablename,1,1,getlogger(__PACKAGE__));
return 1;
} else {
rowsdeleted($db,$tablename,0,0,getlogger(__PACKAGE__));
return 0;
}
}
return 0;
}
sub buildrecords_fromrows {
my ($rows,$load_recursive) = @_;
my @records = ();
my $record;
if (defined $rows and ref $rows eq 'ARRAY') {
foreach my $row (@$rows) {
$record = __PACKAGE__->new($row);
# transformations go here ...
push @records,$record;
}
}
return \@records;
}
sub gettablename {
return $tablename;
}
sub check_table {
return checktableinfo($get_db,
__PACKAGE__,$tablename,
$expected_fieldnames,
$indexes);
}
1;

@ -55,6 +55,7 @@ our @EXPORT_OK = qw(
rowupserted
rowupdated
rowsdeleted
rowsupdated
totalrowsdeleted
rowinsertskipped
rowupdateskipped
@ -498,6 +499,17 @@ sub rowupdated {
}
sub rowsupdated {
my ($db,$tablename,$rowcount,$logger) = @_;
if (defined $logger) {
$logger->debug(_getsqlconnectorinstanceprefix($db) . $rowcount . ' row(s) updated');
}
}
sub rowsdeleted {
my ($db,$tablename,$rowcount,$initial_rowcount,$logger) = @_;

@ -4,33 +4,19 @@ use strict;
## no critic
use threads::shared qw();
#use Time::HiRes qw(sleep);
#use String::MkPasswd qw();
#use List::Util qw();
#use Data::Rmap qw();
#use Tie::IxHash;
#use NGCP::BulkProcessor::Globals qw(
# $enablemultithreading
#);
use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw(
$skip_errors
$export_cdr_multithreading
$export_cdr_numofthreads
$export_cdr_blocksize
$export_cdr_joins
$export_cdr_conditions
$export_cdr_limit
$export_cdr_stream
);
#$dry
#$deadlock_retries
#@providers
#$generate_cdr_numofthreads
#$generate_cdr_count
use NGCP::BulkProcessor::Logging qw (
getlogger
@ -46,14 +32,7 @@ use NGCP::BulkProcessor::LogError qw(
use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr qw();
use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status qw();
use NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::voip_subscribers qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::domains qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::resellers qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contacts qw();
#use NGCP::BulkProcessor::Dao::Trunk::billing::contracts qw();
#use NGCP::BulkProcessor::Dao::Trunk::provisioning::voip_subscribers qw();
use NGCP::BulkProcessor::Dao::Trunk::accounting::mark qw();
use NGCP::BulkProcessor::Projects::Export::Ama::Format::File qw();
use NGCP::BulkProcessor::Projects::Export::Ama::Format::Record qw();
@ -68,17 +47,15 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigit
use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternational qw();
use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime qw();
use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ElapsedTime qw();
use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber qw();
use NGCP::BulkProcessor::ConnectorPool qw(
get_xa_db
destroy_dbs
ping_dbs
);
#ping_dbs
#use NGCP::BulkProcessor::Utils qw(threadid timestamp); # stringtobool check_ipnet trim);
##use NGCP::BulkProcessor::DSSorter qw(sort_by_configs);
##use NGCP::BulkProcessor::RandomString qw(createtmpstring);
#use NGCP::BulkProcessor::Array qw(array_to_map);
use NGCP::BulkProcessor::Utils qw(threadid kbytes2gigs); # stringtobool check_ipnet trim);
use NGCP::BulkProcessor::Calendar qw(current_local);
@ -86,9 +63,71 @@ require Exporter;
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(
export_cdrs
reset_fsn
reset_export_status
);
my $file_sequence_number : shared = 0;
sub reset_export_status {
my ($from,$to) = @_;
my $result = 1;
my $context = { tid => threadid(), warning_count => 0, error_count => 0, };
$result &= _check_export_status_stream($context);
my $updated;
eval {
$updated = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::update_export_status($context->{export_status_id},
$NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::UNEXPORTED,$from,$to);
};
if ($@) {
if ($skip_errors) {
_warn($context,"problem with export status reset: " . $@);
} else {
_error($context,"problem with export status reset: " . $@);
}
$result = 0;
} else {
_info($context,"$updated export states reset");
}
return $result;
}
sub reset_fsn {
my $result = 1;
my $context = { tid => threadid(), warning_count => 0, error_count => 0, };
$result &= _check_export_status_stream($context);
my $fsn;
eval {
NGCP::BulkProcessor::Dao::Trunk::accounting::mark::cleanup_system_marks(undef,
$export_cdr_stream,
);
NGCP::BulkProcessor::Dao::Trunk::accounting::mark::set_system_mark(undef,
$export_cdr_stream,
'0' #$NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::min_fsn
);
$fsn = NGCP::BulkProcessor::Dao::Trunk::accounting::mark::get_system_mark(undef,
$export_cdr_stream
); #load mark...
};
if ($@) {
if ($skip_errors) {
_warn($context,"problem with file sequence number reset: " . $@);
} else {
_error($context,"problem with file sequence number reset: " . $@);
}
$result = 0;
} else {
_info($context,"file sequence number reset to $fsn")
}
return $result;
}
sub export_cdrs {
my $static_context = {};
@ -96,27 +135,26 @@ sub export_cdrs {
destroy_dbs();
my $warning_count :shared = 0;
return ($result && NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::process_unexported(
my @ama_files : shared = ();
$result &= NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::process_unexported(
static_context => $static_context,
process_code => sub {
my ($context,$records,$row_offset) = @_;
$context->{rownum} = $row_offset;
$context->{block_cdr_id_map} = { map { $_->[0] => $_->[1]; } @$records };
$context->{block_call_id_map} = {};
foreach my $record (@$records) {
return 0 if (defined $export_cdr_limit and $context->{rownum} >= $export_cdr_limit);
my ($id,$call_id) = @$record;
# skip if the cdr belongs to a call already done in this block:
next unless exists $context->{block_cdr_id_map}->{$id};
# skip if the cdr is pending for flushing to file:
next if exists $context->{file_cdr_id_map}->{$id};
# skip if call legs/data is incomplete:
next unless _export_cdrs_init_context($context,$call_id);
# go ahead:
foreach my $cdr (@{$context->{cdrs}}) {
$context->{file_cdr_id_map}->{$cdr->{id}} = $cdr->{start_time};
delete $context->{cdr_id_map}->{$cdr->{id}};
$context->{rownum} += 1;
my $call_id_prefix = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::get_callidprefix($call_id);
if (exists $context->{block_call_id_map}->{$call_id_prefix}) {
$context->{block_call_id_map}->{$call_id_prefix} += 1;
} else {
$context->{block_call_id_map}->{$call_id_prefix} = 1;
}
}
foreach my $record (@$records) {
return 0 if (defined $export_cdr_limit and $context->{rownum} >= $export_cdr_limit);
return 0 if $context->{file_sequence_number} > $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn;
my ($id,$call_id) = @$record;
next unless _export_cdrs_init_context($context,$id,$call_id);
eval {
$context->{file}->write_record(
get_transfer_in => \&_get_transfer_in,
@ -134,7 +172,7 @@ sub export_cdrs {
}
}
}
ping_dbs();
return 1;
},
init_process_context_code => sub {
@ -142,58 +180,96 @@ sub export_cdrs {
$context->{db} = &get_xa_db();
$context->{error_count} = 0;
$context->{warning_count} = 0;
# below is not mandatory..
#_check_insert_tables();
$context->{ama_files} = [];
$context->{has_next} = 1;
$context->{rownum} = 0;
_increment_file_sequence_number($context);
},
uninit_process_context_code => sub {
my ($context)= @_;
eval {
$context->{file}->close(
get_transfer_out => \&_get_transfer_out,
commit_cb => \&_commit_export_status,
context => $context,
);
};
if ($@) {
if ($skip_errors) {
_warn($context,"problem while closing " . $context->{file}->get_file_name() . ": " . $@);
} else {
_error($context,"problem while exporting " . $context->{file}->get_file_name() . ": " . $@);
$context->{has_next} = 0; #do not reserve another file sequence number
if ($context->{file_sequence_number} <= $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn) {
eval {
$context->{file}->close(
get_transfer_out => \&_get_transfer_out,
commit_cb => \&_commit_export_status,
context => $context,
);
};
if ($@) {
if ($skip_errors) {
_warn($context,"problem while closing " . $context->{file}->get_filename() . ": " . $@);
} else {
_error($context,"problem while closing " . $context->{file}->get_filename() . ": " . $@);
}
}
}
undef $context->{db};
destroy_all_dbs();
destroy_dbs();
{
lock $warning_count;
$warning_count += $context->{warning_count};
push(@ama_files,@{$context->{ama_files}});
}
},
load_recursive => 0,
blocksize => $export_cdr_blocksize,
multithreading => $export_cdr_multithreading,
numofthreads => 1,
numofthreads => $export_cdr_numofthreads,
joins => $export_cdr_joins,
conditions => $export_cdr_conditions,
#sort => [{ column => 'id', numeric => 1, dir => 1 }],
limit => $export_cdr_limit,
),$warning_count);
);
eval {
NGCP::BulkProcessor::Dao::Trunk::accounting::mark::cleanup_system_marks(undef,
$export_cdr_stream);
};
if ($@) {
if ($skip_errors) {
_warn($static_context,"problem with file sequence number cleanup: " . $@);
} else {
_error($static_context,"problem with file sequence number cleanup: " . $@);
}
$result = 0;
} else {
_info($static_context,"file sequence numbers cleaned up");
}
return ($result,$warning_count,\@ama_files);
}
sub _export_cdrs_init_context {
my ($context,$call_id) = @_;
my $result = 1;
my ($context,$cdr_id,$call_id) = @_;
my $result = 0;
$context->{cdrs} = undef;
$context->{call_id} = $call_id;
$context->{cdrs} = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::findby_callidprefix($context->{db},
$call_id,$export_cdr_joins,$export_cdr_conditions);
$result &= ((scalar @{$context->{cdrs}}) > 0 ? 1 : 0);
#$result &= ((scalar @{$context->{cdrs}}) == 4 ? 1 : 0);
if (not exists $context->{file_cdr_id_map}->{$cdr_id}) {
my $call_id_prefix = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::get_callidprefix($call_id);
if (exists $context->{block_call_id_map}->{$call_id_prefix}) {
$context->{cdrs} = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::findby_callidprefix($context->{db},
$call_id,$export_cdr_joins,$export_cdr_conditions);
my $cdrs_in_block = delete $context->{block_call_id_map}->{$call_id_prefix};
if ((scalar @{$context->{cdrs}}) == $cdrs_in_block) {
foreach my $cdr (@{$context->{cdrs}}) {
$context->{file_cdr_id_map}->{$cdr->{id}} = $cdr; #->{start_time};
$context->{rownum} += 1;
}
$result = 1;
}
}
}
# todo: prepare the fields from the call's CDRs:
$context->{dt} = current_local();
$context->{source} = "43011001";
@ -212,41 +288,53 @@ sub _commit_export_status {
) = @params{qw/
context
/};
#my %dropped = ();
_info($context,"file " . $context->{file}->get_filename() . " (" . kbytes2gigs(int((-s $context->{file}->get_filename()) / 1024)) . ") written (" . $context->{file}->get_record_count() . " records in " . $context->{file}->get_block_count() . " blocks)");
eval {
ping_dbs();
$context->{db}->db_begin();
foreach my $id (keys %{$context->{block_cdr_id_map}}) {
foreach my $id (keys %{$context->{file_cdr_id_map}}) {
#mark exported
NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::upsert_row($context->{db},
cdr_id => $id,
status_id => $context->{export_status_id},
export_status => $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::EXPORTED,
cdr_start_time => $context->{block_cdr_id_map}->{$id}->{start_time},
cdr_start_time => $context->{file_cdr_id_map}->{$id}->{start_time},
);
_info($context,"export_status set for cdr id $id",1);
#$dropped{$cdr_id} = delete $context->{file_cdrs}->{$cdr_id};
}
NGCP::BulkProcessor::Dao::Trunk::accounting::mark::insert_system_mark($context->{db},
$export_cdr_stream,
$context->{file_sequence_number},
); #set mark...
_info($context,"file sequence number $context->{file_sequence_number} saved");
$context->{db}->db_commit();
};
$context->{block_cdr_id_map} = {};
$context->{file_cdr_id_map} = {};
my $err = $@;
if ($err) {
eval {
$context->{db}->db_rollback(1);
#foreach (keys %dropped) {
# $cdr_id_map{$_} = $dropped{$_};
#}
};
eval {
unlink $context->{file}->get_filename();
};
die($err);
} else {
push(@{$context->{ama_files}},$context->{file}->get_filename());
_increment_file_sequence_number($context) if $context->{has_next};
}
}
sub _increment_file_sequence_number {
my ($context) = @_;
lock $file_sequence_number;
$file_sequence_number = $file_sequence_number + 1;
_info($context,"file sequence number incremented: $file_sequence_number",1);
$context->{file_sequence_number} = $file_sequence_number;
}
sub _get_transfer_in {
my %params = @_;
@ -255,20 +343,21 @@ sub _get_transfer_in {
) = @params{qw/
context
/};
return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new(
NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9013->new(
rewritten => 0,
sensor_id => '008708', # Graz
sensor_id => '438716', # Graz
padding => 0,
recording_office_id => '008708',
recording_office_id => '438716',
date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($context->{dt}),
connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($context->{dt}),
connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($context->{dt}), # adjacent
file_sequence_number => 1,
file_sequence_number => $context->{file_sequence_number},
)
);
@ -287,21 +376,31 @@ sub _get_record {
call_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::CallType::STATION_PAID,
rewritten => 0,
sensor_id => '008708', # Graz
sensor_id => '438716', # Graz
padding => 0,
recording_office_id => '008708',
recording_office_id => '438716', #008708
#call code 970c
#timing ind 000
#seervice observed 0c
#called party off-hook setzen
#unanswered =>
date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($context->{dt}),
service_feature => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ServiceFeature::OTHER,
#mit 43
originating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($context->{source}),
originating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($context->{source}),
originating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($context->{source}),
domestic_international => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternational::get_number_domestic_international($context->{destination}),
domestic_international => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternational::INTERNATIONAL, #get_number_domestic_international($context->{destination}),
#2c
#destination number mit 43
#dialed_in , 0er wegstreichen
terminating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($context->{destination}),
terminating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($context->{destination}),
terminating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($context->{destination}),
@ -323,7 +422,7 @@ sub _get_transfer_out {
/};
return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new(
NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9014->new(
@_,
rewritten => 0,
sensor_id => '008708', # Graz
@ -334,15 +433,17 @@ sub _get_transfer_out {
connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($context->{dt}),
#file_sequence_number => 1,
file_sequence_number => $context->{file_sequence_number},
#=> (scalar @records),
)
);
}
sub _export_cdrs_create_context {
sub _check_export_status_stream {
my ($context) = @_;
@ -359,11 +460,47 @@ sub _export_cdrs_create_context {
_error($context,"cannot find export stream '$export_cdr_stream'");
$result = 0;
} elsif ($export_status) {
_info($context,"export stream '$export_cdr_stream' set");
_info($context,"using export stream '$export_cdr_stream'");
}
return $result;
}
sub _export_cdrs_create_context {
my ($context) = @_;
my $result = 1;
$context->{tid} = threadid();
$result &= _check_export_status_stream($context);
$context->{file} = NGCP::BulkProcessor::Projects::Export::Ama::Format::File->new();
$context->{file_cdr_id_map} = {};
$context->{has_next} = 1;
my $fsn;
eval {
$fsn = NGCP::BulkProcessor::Dao::Trunk::accounting::mark::get_system_mark(undef,
$export_cdr_stream
); #load mark...
};
if ($@) {
_error($context,"cannot get last file sequence number");
$result = 0;
} else {
if ($fsn < 0) {
$fsn = 0;
} elsif ($fsn >= $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn) {
_warn($context,"file sequence number $fsn exceeding limit (" . $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber::max_fsn . ")");
$result = 0;
}
_info($context,"last file sequence number is $fsn");
lock $file_sequence_number;
$file_sequence_number = $fsn;
}
return $result;
}

@ -37,6 +37,26 @@ sub add_record {
}
sub records_fit {
my $self = shift;
my @records = @_;
if (not $self->{padded}) {
my $length = $self->get_length();
foreach my $record (@records) {
if (not ref $record) {
$length += $record;
} else {
$length += $record->get_length();
}
}
if ($length <= 2 * $max_block_length) {
return 1;
}
}
return 0;
}
sub get_hex {
my $self = shift;
@ -45,7 +65,7 @@ sub get_hex {
$result .= $record->get_hex();
}
if ($self->{padded}) {
$result .= 'aa' x (2 * $max_block_length - length($result));
$result .= 'a' x (2 * $max_block_length - length($result));
}
return $result;
}

@ -10,13 +10,17 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Field qw($TERMINATOR);
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::Projects::Export::Ama::Format::Field);
our @EXPORT_OK = qw(
$min_fsn
$max_fsn
);
my $field_name = "file sequence number";
my $length = 4;
my @param_names = qw/file_sequence_number/;
our $min_fsn = 1;
our $max_fsn = 999;
sub new {
my $class = shift;
@ -41,7 +45,7 @@ sub get_hex {
my $self = shift;
my ($file_sequence_number) = $self->_get_params(@_);
die("invalid file sequence number '$file_sequence_number'") if ($file_sequence_number < 1 or $file_sequence_number > 999);
die("invalid file sequence number '$file_sequence_number'") if ($file_sequence_number < $min_fsn or $file_sequence_number > $max_fsn);
return sprintf('%03d',$file_sequence_number) . $TERMINATOR;
}

@ -56,7 +56,7 @@ sub get_hex {
my $self = shift;
my ($service_feature) = $self->_get_params(@_);
die("invalid service feature '$service_feature'") unless length($service_feature) == 3;
return $service_feature . $TERMINATOR;
return sprintf("%03d",$service_feature) . $TERMINATOR;
}

@ -3,7 +3,13 @@ use strict;
## no critic
use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw(
$output_path
$ama_filename_format
);
use NGCP::BulkProcessor::Projects::Export::Ama::Format::Block qw();
use NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9014 qw();
use NGCP::BulkProcessor::Logging qw(
getlogger
@ -36,27 +42,44 @@ sub reset {
$self->{current_block} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Block->new();
$self->{blocks} = [ $self->{current_block} ];
$self->{record_count} = 0;
$self->_save_transfer_in(undef);
$self->_save_transfer_out(undef);
return;
}
sub get_record_count {
my $self = shift;
return $self->{record_count};
}
sub get_block_count {
my $self = shift;
return scalar @{$self->{blocks}};
}
sub add_record {
my $self = shift;
my ($record,$pad) = shift;
my ($record,$pad) = @_;
my $result;
if (not $self->{current_block}->add_record($record)) {
if ((scalar @{$self->{blocks}}) >= $max_blocks) {
$result = 0;
if (not $pad and (scalar @{$self->{blocks}}) >= $max_blocks and not $self->{current_block}->records_fit($record,
$NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9014::length)) {
$result = 0;
} else {
if (not $self->{current_block}->add_record($record)) {
if ((scalar @{$self->{blocks}}) >= $max_blocks) {
$result = 0;
} else {
$self->{current_block}->set_padded(1);
$self->{current_block} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Block->new();
push(@{$self->{blocks}},$self->{current_block});
$result = $self->{current_block}->add_record($record);
$self->{record_count} += 1;
}
} else {
$self->{current_block}->set_padded(1);
$self->{current_block} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Block->new();
push(@{$self->{blocks}},$self->{current_block});
$result = $self->{current_block}->add_record($record);
$self->{record_count} += 1;
$self->{current_block}->set_padded(1) if $pad;
$result = 1;
}
} else {
$self->{record_count} += 1;
$self->{current_block}->set_padded(1) if $pad;
$result = 1;
}
return $result;
@ -64,7 +87,10 @@ sub add_record {
sub get_filename {
my $self = shift;
return 'test.ama';
return sprintf($ama_filename_format,
$output_path,
$self->{transfer_in}->get_structure()->get_file_sequence_number_field()->{file_sequence_number},
);
}
sub flush {
@ -77,17 +103,22 @@ sub flush {
/};
#unlink 'test.ama';
if ((scalar @{$self->{blocks}}) > 0 and (my $filename = $self->get_filename())) {
if (open(my $fh,">:raw",$filename)) {
foreach my $block (@{$self->{blocks}}) {
print $fh pack('H*',$block->get_hex());
}
close $fh;
&$commit_cb(@_) if defined $commit_cb;
#restdebug($self,"$self->{crt_path} saved",getlogger(__PACKAGE__));
return 1;
} else {
fileerror('failed to open ' . $filename . ": $!",getlogger(__PACKAGE__));
if (-e $filename) {
fileerror($filename . ' already exists',getlogger(__PACKAGE__));
return 0;
} else {
if (open(my $fh,">:raw",$filename)) {
foreach my $block (@{$self->{blocks}}) {
print $fh pack('H*',$block->get_hex());
}
close $fh;
&$commit_cb(@_) if defined $commit_cb;
#restdebug($self,"$self->{crt_path} saved",getlogger(__PACKAGE__));
return 1;
} else {
fileerror('failed to open ' . $filename . ": $!",getlogger(__PACKAGE__));
return 0;
}
}
} else {
return 0;
@ -107,15 +138,17 @@ sub close {
/};
my $result = 0;
$self->add_record(
&$get_transfer_out(
$self->_save_transfer_out(&$get_transfer_out(
#file_sequence_number => 1,
#=> (scalar @records),
@_
),
)),
1,
);
# update count fields:
$self->{transfer_out}->get_structure()->get_block_count_field()->_set_params(block_count => $self->get_block_count());
$self->{transfer_out}->get_structure()->get_record_count_field()->_set_params(record_count => $self->get_record_count());
$result |= $self->flush(
commit_cb => $commit_cb,
@_
@ -142,27 +175,28 @@ sub write_record {
/};
$self->add_record(
&$get_transfer_in(
$self->_save_transfer_in(&$get_transfer_in(
#file_sequence_number => 1,
@_,
),
)),
1
) unless $self->{record_count} > 0;
my $result = 0;
my $record = &$get_record(@_);
if (not $self->add_record($record)) {
#my $blah="y";
$result |= $self->close(
get_transfer_out => $get_transfer_out,
commit_cb => $commit_cb,
@_
);
$self->add_record(
&$get_transfer_in(
$self->_save_transfer_in(&$get_transfer_in(
#file_sequence_number => 1,
@_
),
)),
1
);
@ -173,4 +207,26 @@ sub write_record {
}
sub _save_transfer_in {
my $self = shift;
my $record = shift;
if (defined $record) {
$self->{transfer_in} = $record;
} else {
undef $self->{transfer_in};
}
return $record;
}
sub _save_transfer_out {
my $self = shift;
my $record = shift;
if (defined $record) {
$self->{transfer_out} = $record;
} else {
undef $self->{transfer_out};
}
return $record;
}
1;

@ -0,0 +1,4 @@
TRUNK_FACILITY_ID 129990000C
TRUNK_FACILITY_ID 220140000C

@ -23,6 +23,11 @@ sub new {
}
sub get_structure {
my $self = shift;
return $self->{structure};
}
sub get_hex {
my $self = shift;

@ -32,7 +32,7 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ElapsedTime qw()
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::Projects::Export::Ama::Format::FieldSet NGCP::BulkProcessor::Projects::Export::Ama::Format::Structure);
our @EXPORT_OK = qw(
$length
);
#get_instance
@ -40,7 +40,7 @@ our @EXPORT_OK = qw(
#my $INSTANCE = __PACKAGE__->new();
my $length = 142;
our $length = 142;
sub new {

@ -21,10 +21,10 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumb
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::Projects::Export::Ama::Format::FieldSet NGCP::BulkProcessor::Projects::Export::Ama::Format::Structure);
our @EXPORT_OK = qw(
$length
);
my $length = 64;
our $length = 64;
sub new {
@ -71,9 +71,10 @@ sub new {
tracer_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::TracerType::TRANSFER_IN,
@_,
));
$self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber->new(
$self->{file_sequence_number} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber->new(
@_,
));
);
$self->_add_field($self->{file_sequence_number});
return $self;
@ -84,6 +85,11 @@ sub get_structure_code_field {
return $self->{structure_code};
}
sub get_file_sequence_number_field {
my $self = shift;
return $self->{file_sequence_number};
}
#sub get_instance {
# return
#}

@ -23,10 +23,10 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::BlockCount qw();
require Exporter;
our @ISA = qw(Exporter NGCP::BulkProcessor::Projects::Export::Ama::Format::FieldSet NGCP::BulkProcessor::Projects::Export::Ama::Format::Structure);
our @EXPORT_OK = qw(
$length
);
my $length = 78;
our $length = 78;
sub new {
@ -70,18 +70,23 @@ sub new {
@_,
));
$self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::TracerType->new(
tracer_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::TracerType::TRANSFER_IN,
tracer_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::TracerType::TRANSFER_OUT,
@_,
));
$self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber->new(
$self->{file_sequence_number} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::FileSequenceNumber->new(
@_,
));
$self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::RecordCount->new(
);
$self->_add_field($self->{file_sequence_number});
$self->{record_count} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::RecordCount->new(
@_,
));
$self->_add_field(NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::BlockCount->new(
);
$self->_add_field($self->{record_count});
$self->{block_count} = NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::BlockCount->new(
@_,
));
);
$self->_add_field($self->{block_count});
return $self;
@ -92,6 +97,21 @@ sub get_structure_code_field {
return $self->{structure_code};
}
sub get_file_sequence_number_field {
my $self = shift;
return $self->{file_sequence_number};
}
sub get_record_count_field {
my $self = shift;
return $self->{record_count};
}
sub get_block_count_field {
my $self = shift;
return $self->{block_count};
}
#sub get_instance {
# return
#}

@ -46,6 +46,7 @@ our @EXPORT_OK = qw(
$force
$export_cdr_multithreading
$export_cdr_numofthreads
$export_cdr_blocksize
$export_cdr_joins
$export_cdr_conditions
@ -54,6 +55,8 @@ our @EXPORT_OK = qw(
$domestic_destination_pattern
$international_destination_pattern
$ama_filename_format
);
#check_dry
#$dry
@ -72,6 +75,7 @@ our $force = 0;
our $skip_errors = 0;
our $export_cdr_multithreading = $enablemultithreading;
our $export_cdr_numofthreads = $cpucount;
our $export_cdr_blocksize = undef;
our $export_cdr_joins = [];
our $export_cdr_conditions = [];
@ -81,6 +85,8 @@ our $export_cdr_stream = undef;
our $domestic_destination_pattern = undef;
our $international_destination_pattern = undef;
our $ama_filename_format = '%1$s%2$s.ama';
sub update_settings {
my ($data,$configfile) = @_;
@ -106,6 +112,7 @@ sub update_settings {
$skip_errors = $data->{skip_errors} if exists $data->{skip_errors};
$export_cdr_multithreading = $data->{export_cdr_multithreading} if exists $data->{export_cdr_multithreading};
$export_cdr_numofthreads = _get_numofthreads($cpucount,$data,'export_cdr_numofthreads');
$export_cdr_blocksize = $data->{export_cdr_blocksize} if exists $data->{export_cdr_blocksize};
my $parse_result;
@ -128,6 +135,8 @@ sub update_settings {
($regexp_result,$international_destination_pattern) = parse_regexp($international_destination_pattern,$configfile);
$result &= $regexp_result;
$ama_filename_format = $data->{ama_filename_format} if exists $data->{ama_filename_format};
return $result;
}
@ -196,4 +205,12 @@ sub _parse_export_conditions {
return (1,\@conditions);
}
sub _get_numofthreads {
my ($default_value,$data,$key) = @_;
my $_numofthreads = $default_value;
$_numofthreads = $data->{$key} if exists $data->{$key};
$_numofthreads = $cpucount if $_numofthreads > $cpucount;
return $_numofthreads;
}
1;

@ -67,6 +67,8 @@ use NGCP::BulkProcessor::ConnectorPool qw(destroy_dbs);
use NGCP::BulkProcessor::Projects::Export::Ama::CDR qw(
export_cdrs
reset_fsn
reset_export_status
);
scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
@ -74,6 +76,8 @@ scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unl
my @TASK_OPTS = ();
my $tasks = [];
my $from = undef,
my $to = undef;
my $cleanup_task_opt = 'cleanup';
push(@TASK_OPTS,$cleanup_task_opt);
@ -83,6 +87,12 @@ push(@TASK_OPTS,$cleanup_all_task_opt);
my $export_cdr_task_opt = 'export_cdr';
push(@TASK_OPTS,$export_cdr_task_opt);
my $reset_fsn_task_opt = 'reset_fsn';
push(@TASK_OPTS,$reset_fsn_task_opt);
my $reset_export_status_task_opt = 'reset_export_status';
push(@TASK_OPTS,$reset_export_status_task_opt);
if (init()) {
main();
exit(0);
@ -102,6 +112,8 @@ sub init {
#"dry" => \$dry,
"skip-errors" => \$skip_errors,
"force" => \$force,
"from=s" => \$from,
"to=s" => \$to,
); # or scripterror('error in command line arguments',getlogger(getscriptpath()));
$tasks = removeduplicates($tasks,1);
@ -138,20 +150,17 @@ sub main() {
$completion |= 1;
}
} elsif (lc($reset_fsn_task_opt) eq lc($task)) {
if (taskinfo($reset_fsn_task_opt,$result,1)) {
#next unless check_dry();
$result &= reset_fsn_task(\@messages);
}
#} elsif (lc($provision_subscriber_task_opt) eq lc($task)) {
# if (taskinfo($provision_subscriber_task_opt,$result,1)) {
# next unless check_dry();
# $result &= provision_subscriber_task(\@messages);
# $completion |= 1;
# }
#} elsif (lc($generate_cdr_task_opt) eq lc($task)) {
# if (taskinfo($generate_cdr_task_opt,$result,1)) {
# next unless check_dry();
# $result &= generate_cdr_task(\@messages);
# $completion |= 1;
# }
} elsif (lc($reset_export_status_task_opt) eq lc($task)) {
if (taskinfo($reset_export_status_task_opt,$result,1)) {
#next unless check_dry();
$result &= reset_export_status_task(\@messages);
}
} else {
$result = 0;
@ -205,21 +214,61 @@ sub cleanup_task {
sub export_cdr_task {
my ($messages) = @_;
my ($result,$warning_count,$ama_files) = (0,0,[]);
eval {
($result,$warning_count,$ama_files) = export_cdrs();
};
my $err = $@;
my $stats = ": " . (scalar @$ama_files) . ' files'; # . ((scalar @$ama_files) > 0 ? "\n " : '') . join("\n ",@$ama_files);
foreach my $ama_file (@$ama_files) {
$stats .= "\n " . $ama_file;
}
#eval {
# #stats .= "\n total CDRs: " .
# # NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::countby_ratingstatus(undef) . ' rows';
#};
if ($err or !$result) {
push(@$messages,"export cdrs INCOMPLETE$stats");
} else {
push(@$messages,"export cdrs completed$stats");
}
destroy_dbs();
return $result;
}
sub reset_fsn_task {
my ($messages) = @_;
my ($result) = (0);
eval {
($result) = export_cdrs();
($result) = reset_fsn();
};
my $err = $@;
my $stats = ":";
if ($err or !$result) {
push(@$messages,"reset file sequence number INCOMPLETE");
} else {
push(@$messages,"reset file sequence number completed");
}
destroy_dbs();
return $result;
}
sub reset_export_status_task {
my ($messages) = @_;
my ($result) = (0);
eval {
#stats .= "\n total CDRs: " .
# NGCP::BulkProcessor::Dao::Trunk::accounting::cdr::countby_ratingstatus(undef) . ' rows';
($result) = reset_export_status($from,$to);
};
my $err = $@;
my $fromto = 'from ' . ($from ? $from : '-') . ' to ' . ($to ? $to : '-');
if ($err or !$result) {
push(@$messages,"export cdrs INCOMPLETE$stats");
push(@$messages,"reset export status $fromto INCOMPLETE");
} else {
push(@$messages,"export cdrs completed$stats");
push(@$messages,"reset export status $fromto completed");
}
destroy_dbs();
return $result;

@ -1,10 +1,11 @@
#skip_errors=0
providers_yml = providers.yml
export_cdr_multithreading = 1
export_cdr_numofthreads = 2
export_cdr_blocksize = 1000
export_cdr_joins = { 'billing.voip_subscribers source_voip_subscribers' => { 'source_voip_subscribers.uuid' => 'accounting.cdr.source_user_id' } }, { 'billing.voip_subscribers destination_voip_subscribers' => { 'destination_voip_subscribers.uuid' => 'accounting.cdr.destination_user_id' } }, { 'billing.billing_zones_history source_carrier_bbz' => { 'source_carrier_bbz.id' => 'accounting.cdr.source_carrier_billing_zone_id' } }, { 'billing.billing_zones_history source_reseller_bbz' => { 'source_reseller_bbz.id' => 'accounting.cdr.source_reseller_billing_zone_id' } }, { 'billing.billing_zones_history source_customer_bbz' => { 'source_customer_bbz.id' => 'accounting.cdr.source_customer_billing_zone_id' } }, { 'billing.billing_zones_history destination_carrier_bbz' => { 'destination_carrier_bbz.id' => 'accounting.cdr.destination_carrier_billing_zone_id' } }, { 'billing.billing_zones_history destination_reseller_bbz' => { 'destination_reseller_bbz.id' => 'accounting.cdr.destination_reseller_billing_zone_id' } }, { 'billing.billing_zones_history destination_customer_bbz' => { 'destination_customer_bbz.id' => 'accounting.cdr.destination_customer_billing_zone_id' } }
export_cdr_conditions = { 'accounting.cdr.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } }, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } }
export_cdr_limit = 300000
export_cdr_stream = ama_simple
ama_filename_format = %1$s%2$03d.debug.ama

@ -1,14 +1,15 @@
#skip_errors=0
providers_yml = providers.yml
export_cdr_multithreading = 1
export_cdr_numofthreads = 2
export_cdr_blocksize = 1000
#export_cdr_joins = { 'accounting.cdr_export_status_data esd' => { 'esd.cdr_id' => 'accounting.cdr.id' } }, { 'accounting.cdr_export_status es' => { 'es.id' => 'esd.status_id' } }
#export_cdr_conditions = { 'es.type' => { '=' => '"default"' } }, { 'esd.export_status' => { '=' => '"unexported"' } }, { 'accounting.cdr.call_status' => { '=' => '"ok"' } }
#, { 'accounting.cdr.rating_status' => { '=' => '"ok"' } }
export_cdr_conditions = { 'accounting.cdr.call_status' => { '=' => '"ok"' } }
export_cdr_stream = default
export_cdr_stream = ama_simple
#default
#export_cdr_limit = 3500
#300000
ama_filename_format = %1$s%2$03d.debug.ama

@ -6,6 +6,17 @@ use File::Basename;
use Cwd;
use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../');
use NGCP::BulkProcessor::Projects::Export::Ama::Settings qw(
update_settings
);
use NGCP::BulkProcessor::LoadConfig qw(
load_config
$SIMPLE_CONFIG_TYPE
$YAML_CONFIG_TYPE
$ANY_CONFIG_TYPE
);
use NGCP::BulkProcessor::Calendar qw(current_local);
use NGCP::BulkProcessor::Projects::Export::Ama::Format::File qw();
@ -22,6 +33,9 @@ use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternat
use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime qw();
use NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ElapsedTime qw();
load_config('config.cfg');
load_config('settings.cfg',\&update_settings,$SIMPLE_CONFIG_TYPE);
my $dt = current_local();
my $source = "43011001";
my $destination = "43011002";
@ -31,79 +45,101 @@ my $file = NGCP::BulkProcessor::Projects::Export::Ama::Format::File->new(
);
my $limit = 1000;
my $limit = 5000;
my $i = 0;
while ($i < $limit) {
$file->write_record(sub {
return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new(
NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9013->new(
my $file_sequence_number = 123;
sub get_transfer_in {
return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new(
NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9013->new(
rewritten => 0,
sensor_id => '008708', # Graz
padding => 0,
recording_office_id => '008708',
rewritten => 0,
sensor_id => '008708', # Graz
date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt),
padding => 0,
recording_office_id => '008708',
connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt),
date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt),
file_sequence_number => $file_sequence_number,
)
);
}
sub get_record {
return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new(
NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure0510->new(
call_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::CallType::STATION_PAID,
connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt),
rewritten => 0,
sensor_id => '008708', # Graz
file_sequence_number => 1,
)
);
padding => 0,
recording_office_id => '008708',
}, sub {
return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new(
NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure0510->new(
call_type => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::CallType::STATION_PAID,
date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt),
rewritten => 0,
sensor_id => '008708', # Graz
service_feature => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ServiceFeature::OTHER,
padding => 0,
recording_office_id => '008708',
originating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($source),
originating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($source),
originating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($source),
date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt),
domestic_international => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternational::get_number_domestic_international($destination),
service_feature => $NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ServiceFeature::OTHER,
terminating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($destination),
terminating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($destination),
terminating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($destination),
originating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($source),
originating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($source),
originating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($source),
connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt),
elapsed_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ElapsedTime::get_elapsed_time($duration),
)
);
}
domestic_international => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::DomesticInternational::get_number_domestic_international($destination),
sub get_transfer_out {
terminating_significant_digits => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_length($destination),
terminating_open_digits_1 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_1($destination),
terminating_open_digits_2 => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::SignificantDigitsNextField::get_number_digits_2($destination),
return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new(
NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9014->new(
connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt),
elapsed_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ElapsedTime::get_elapsed_time($duration),
)
);
}, sub {
rewritten => 0,
sensor_id => '008708', # Graz
return NGCP::BulkProcessor::Projects::Export::Ama::Format::Record->new(
NGCP::BulkProcessor::Projects::Export::Ama::Format::Structures::Structure9014->new(
padding => 0,
recording_office_id => '008708',
rewritten => 0,
sensor_id => '008708', # Graz
padding => 0,
recording_office_id => '008708',
date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt),
date => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::Date::get_ama_date($dt),
connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt),
connect_time => NGCP::BulkProcessor::Projects::Export::Ama::Format::Fields::ConnectTime::get_connect_time($dt),
file_sequence_number => $file_sequence_number,
file_sequence_number => 1,
#=> (scalar @records),
)
);
}
=> (scalar @records),
)
);
});
sub commit_cb {
print $file_sequence_number . "\n";
$file_sequence_number++;
}
while ($i < $limit) {
$file->write_record(
get_transfer_in => \&get_transfer_in,
get_record => \&get_record,
get_transfer_out => \&get_transfer_out,
commit_cb => \&commit_cb,
);
$i++;
}
$file->close(
get_transfer_out => \&get_transfer_out,
commit_cb => \&commit_cb,
);
#print $test->{structure}->to_string()."\n";
#print $test->get_hex();

@ -813,7 +813,9 @@ sub DESTROY {
if ($self->{tid} == threadid()) {
$self->_db_disconnect();
delete $self->{drh};
dbdebug($self,(ref $self) . ' connector destroyed',getlogger(__PACKAGE__));
eval {
dbdebug($self,(ref $self) . ' connector destroyed',getlogger(__PACKAGE__));
};
#} else {
# print "NOT destroyed\n";
}

@ -21,7 +21,7 @@ sub new {
sub _set_data {
my $self = shift;
my ($data,$dupecheck) = shift;
my ($data,$dupecheck) = @_;
$self->clear();
if (defined $data and ref $data eq 'ARRAY') {
if ($dupecheck) {

@ -124,7 +124,7 @@ sub float_equal {
sub round {
my ($number) = shift;
my ($number) = @_;
return int($number + .5 * ($number <=> 0));
}

Loading…
Cancel
Save