TT#66561 exporter for intermediate cdrs

int-cdr-exporter.pl is added to export intermediate
cdr records in the same format configured for regular
cdrs. the files are written to the same folders and
therefor have a distinctive extension suffix and
separated file sequence numbers.

Change-Id: I9801bc1a4221f22a169bd64ed7956b75fc52a3f2
changes/33/33233/7
Rene Krenn 6 years ago
parent f6baf825d0
commit 0f5405a824

@ -10,6 +10,8 @@ install:
$(INSTALL_PROGRAM) cdr-md5.sh $(DESTDIR)/usr/sbin/ngcp-cdr-md5
$(INSTALL_PROGRAM) cdr-exporter.pl $(DESTDIR)/usr/sbin/ngcp-cdr-exporter
$(INSTALL_PROGRAM) event-exporter.pl $(DESTDIR)/usr/sbin/ngcp-event-exporter
$(INSTALL_PROGRAM) int-cdr-exporter.pl $(DESTDIR)/usr/sbin/ngcp-int-cdr-exporter
$(INSTALL_DIR) $(DESTDIR)/etc/ngcp-cdr-exporter
$(INSTALL_DATA) cdr-exporter.conf $(DESTDIR)/etc/ngcp-cdr-exporter/cdr-exporter.conf
$(INSTALL_DATA) event-exporter.conf $(DESTDIR)/etc/ngcp-cdr-exporter/event-exporter.conf
$(INSTALL_DATA) cdr-exporter.conf $(DESTDIR)/etc/ngcp-cdr-exporter/int-cdr-exporter.conf

@ -52,11 +52,11 @@ sub update_export_status {
}
sub upsert_export_status {
my ($dbh, $stream, $ids, $status) = @_;
my ($dbh, $stream, $tbl, $estbl, $ids, $status) = @_;
return unless(@{ $ids });
while (my @chunk = splice @$ids, 0, 10000) {
my $sth = $dbh->prepare("insert into accounting.cdr_export_status_data " .
"select _cdr.id,_cesc.id,now(),\"$status\",_cdr.start_time from accounting.cdr _cdr " .
my $sth = $dbh->prepare("insert into $estbl " .
"select _cdr.id,_cesc.id,now(),\"$status\",_cdr.start_time from $tbl _cdr " .
"join (select * from accounting.cdr_export_status where type = \"$stream\") as _cesc " .
"where _cdr.id in (" . substr(',?' x scalar @chunk,1) . ") " .
"on duplicate key update export_status = \"$status\", exported_at = now()");

@ -68,7 +68,8 @@ my $stream = "default";
my %config = (
'default.FILTER_FLAPPING' => 0,
'default.MERGE_UPDATE' => 0,
'default.ENABLED' => 1,
'default.ENABLED' => "yes",
'default.INTERMEDIATE' => "no",
'default.PREFIX' => 'ngcp',
'default.VERSION' => '007',
'default.SUFFIX' => 'cdr',
@ -95,12 +96,22 @@ my $rewrite_rule_sets = {};
my $field_positions = {
source_cli => {
aliases => [ qw(source_cli accounting.cdr.source_cli cdr.source_cli) ],
aliases => [ qw(source_cli
accounting.cdr.source_cli
cdr.source_cli
accounting.int_cdr.source_cli
int_cdr.source_cli
base_table.source_cli) ],
admin_positions => undef,
reseller_positions => undef,
},
destination_user_in => {
aliases => [ qw(destination_user_in accounting.cdr.destination_user_in cdr.destination_user_in) ],
aliases => [ qw(destination_user_in
accounting.cdr.destination_user_in
cdr.destination_user_in
accounting.int_cdr.destination_user_in
int_cdr.destination_user_in
base_table.destination_user_in) ],
admin_positions => undef,
reseller_positions => undef,
},
@ -188,8 +199,13 @@ sub prepare_config {
$stream //= 'default';
if (defined $conf_upd) {
for my $key (%$conf_upd) {
$config{$stream . '.' . $key} = $$conf_upd{$key};
for my $key (keys %$conf_upd) {
my $upd = $$conf_upd{$key};
if ('CODE' eq ref $upd) {
$config{$stream . '.' . $key} = &$upd($config{$stream . '.' . $key});
} else {
$config{$stream . '.' . $key} = $$conf_upd{$key};
}
}
}
@ -384,7 +400,7 @@ sub build_query {
}
$q = "select " .
join(", ", @admin_fields) . " from $table " .
join(", ", @admin_fields) . " from $table base_table " .
join(" ", @intjoins) . " " .
"where " . join(" and ", @conds) . " " .
join(" ", @trail);

@ -11,7 +11,7 @@ die("$0 already running") unless flock DATA, LOCK_EX | LOCK_NB; # not tested on
my $stream_limit = 300000;
my @trailer = (
{ 'order by' => 'accounting.cdr.id' },
{ 'order by' => 'base_table.id' },
);
my @ignored_ids;
@ -19,18 +19,18 @@ my @ids;
foreach my $stream (NGCP::CDR::Exporter::import_config('cdr-exporter.conf')) {
#next if $stream eq 'default';
next unless confval('ENABLED');
next unless (confval("ENABLED") // 'no') eq 'yes';
NGCP::CDR::Exporter::prepare_config('exporter', $stream);
NGCP::CDR::Exporter::DEBUG("+++ Start stream '$stream' with DB " .
NGCP::CDR::Exporter::DEBUG("+++ Start cdr export stream '$stream' with DB " .
(confval('DBUSER') || "(undef)") .
"\@".confval('DBDB')." to ".confval('PREFIX')."\n");
"\@".confval('DBDB')." to ".confval('DESTDIR')."\n");
# add fields we definitely need, will be removed during processing
unshift @NGCP::CDR::Exporter::admin_fields, qw/
accounting.cdr.id
accounting.cdr.source_user_id
accounting.cdr.destination_user_id
accounting.cdr.source_provider_id
accounting.cdr.destination_provider_id
base_table.id
base_table.source_user_id
base_table.destination_user_id
base_table.source_provider_id
base_table.destination_provider_id
/;
@ignored_ids = ();
@ids = ();
@ -55,8 +55,8 @@ foreach my $stream (NGCP::CDR::Exporter::import_config('cdr-exporter.conf')) {
$sth->finish();
push @$joins, "left join accounting.cdr_export_status_data as __cesd" .
" on __cesd.cdr_id = accounting.cdr.id and __cesd.status_id = " . $export_status_id;
push @$conds, "accounting.cdr.id <= $last_cdr_id";
" on __cesd.cdr_id = base_table.id and __cesd.status_id = " . $export_status_id;
push @$conds, "base_table.id <= $last_cdr_id";
push @$conds, "__cesd.export_status = 'unexported'";
}
});
@ -67,7 +67,7 @@ foreach my $stream (NGCP::CDR::Exporter::import_config('cdr-exporter.conf')) {
if ('default' ne $stream and $limit > 0) {
NGCP::CDR::Exporter::build_query([ @trailer, { 'limit' => $limit }, ], 'accounting.cdr', sub {
my ($dbh,$joins,$conds) = @_;
push @$conds, "accounting.cdr.id > $last_cdr_id";
push @$conds, "base_table.id > $last_cdr_id";
});
NGCP::CDR::Exporter::run(\&callback);
}
@ -81,9 +81,9 @@ foreach my $stream (NGCP::CDR::Exporter::import_config('cdr-exporter.conf')) {
# TODO: should be tagged as ignored/skipped/whatever
update_export_status("accounting.cdr", \@ignored_ids, "ok");
} else {
upsert_export_status(\@ids, "ok");
upsert_export_status("accounting.cdr", "accounting.cdr_export_status_data", \@ids, "ok");
# TODO: should be tagged as ignored/skipped/whatever
upsert_export_status(\@ignored_ids, "ok");
upsert_export_status("accounting.cdr", "accounting.cdr_export_status_data", \@ignored_ids, "ok");
}
NGCP::CDR::Exporter::commit();

@ -0,0 +1 @@
*/5 * * * * root . /etc/default/ngcp-roles; if /usr/sbin/ngcp-check-active -q && [ "$NGCP_IS_MGMT" = "yes" ] ; then /usr/sbin/ngcp-int-cdr-exporter >/dev/null; fi

1
debian/rules vendored

@ -8,3 +8,4 @@
override_dh_installcron:
dh_installcron
dh_installcron --name=ngcp-event-exporter
dh_installcron --name=ngcp-int-cdr-exporter

@ -21,14 +21,22 @@ my $config = {
NGCP::CDR::Exporter::import_config('event-exporter.conf');
NGCP::CDR::Exporter::prepare_config('eventexporter', undef, $config);
NGCP::CDR::Exporter::DEBUG("+++ Start event export with DB " .
(confval('DBUSER') || "(undef)") .
"\@".confval('DBDB')." to ".confval('DESTDIR')."\n");
# make sure we always select id, subscriber_id, type, old and new;
# if you change it, make sure to adapt slice in the loop too!
unshift @NGCP::CDR::Exporter::admin_fields, (qw/
accounting.events.id accounting.events.subscriber_id accounting.events.reseller_id
accounting.events.type accounting.events.old_status accounting.events.new_status
base_table.id
base_table.subscriber_id
base_table.reseller_id
base_table.type
base_table.old_status
base_table.new_status
/);
my @trailer = (
{ 'order by' => 'accounting.events.id' },
{ 'order by' => 'base_table.id' },
{ 'limit' => '3000' },
);

@ -0,0 +1,161 @@
#!/usr/bin/perl
use strict;
use warnings;
use v5.14;
use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::CDR::Exporter;
die("$0 already running") unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
my $stream_limit = 50000;
my @trailer = (
{ 'order by' => 'base_table.id' },
);
my $config = {
'SUFFIX' => sub { return 'intermediate.' . shift; },
};
my @ignored_ids;
my @ids;
foreach my $stream (NGCP::CDR::Exporter::import_config('int-cdr-exporter.conf')) {
#next if $stream eq 'default';
next unless (confval("ENABLED") // 'no') eq 'yes';
next unless (confval("INTERMEDIATE") // 'no') eq 'yes';
NGCP::CDR::Exporter::prepare_config('intexporter', $stream, $config);
NGCP::CDR::Exporter::DEBUG("+++ Start intermediate cdr export stream '$stream' with DB " .
(confval('DBUSER') || "(undef)") .
"\@".confval('DBDB')." to ".confval('DESTDIR')."\n");
# add fields we definitely need, will be removed during processing
unshift @NGCP::CDR::Exporter::admin_fields, qw/
base_table.id
base_table.source_user_id
base_table.destination_user_id
base_table.source_provider_id
base_table.destination_provider_id
/;
@ignored_ids = ();
@ids = ();
my $last_cdr_id = 0;
my $limit = $stream_limit;
NGCP::CDR::Exporter::build_query([ @trailer, { 'limit' => $limit }, ] , 'accounting.int_cdr', sub {
my ($dbh,$joins,$conds) = @_;
if ('default' ne $stream) {
my $stmt = "insert into accounting.cdr_export_status (id,type) values (null,?)" .
" on duplicate key update id = last_insert_id(id)";
$dbh->do($stmt, undef, $stream) or die "Failed to register stream '$stream'";
my $export_status_id = $dbh->{'mysql_insertid'};
$stmt = "select coalesce(max(cdr_id),0) from accounting.int_cdr_export_status_data" .
" where status_id = ?";
my $sth = $dbh->prepare($stmt);
$sth->execute($export_status_id) or die "Failed to obtain last processed cdr id of stream '$stream'";
($last_cdr_id) = $sth->fetchrow_array();
$sth->finish();
push @$joins, "left join accounting.int_cdr_export_status_data as __cesd" .
" on __cesd.cdr_id = base_table.id and __cesd.status_id = " . $export_status_id;
push @$conds, "base_table.id <= $last_cdr_id";
push @$conds, "__cesd.export_status = 'unexported'";
}
});
NGCP::CDR::Exporter::load_preferences();
NGCP::CDR::Exporter::prepare_output();
$limit = $limit - NGCP::CDR::Exporter::run(\&callback);
if ('default' ne $stream and $limit > 0) {
NGCP::CDR::Exporter::build_query([ @trailer, { 'limit' => $limit }, ], 'accounting.int_cdr', sub {
my ($dbh,$joins,$conds) = @_;
push @$conds, "base_table.id > $last_cdr_id";
});
NGCP::CDR::Exporter::run(\&callback);
}
#DEBUG "ignoring cdr ids " . (join "$sep", @ignored_ids);
NGCP::CDR::Exporter::finish();
if ('default' eq $stream) {
update_export_status("accounting.int_cdr", \@ids, "ok");
# TODO: should be tagged as ignored/skipped/whatever
update_export_status("accounting.int_cdr", \@ignored_ids, "ok");
} else {
upsert_export_status("accounting.int_cdr", "accounting.int_cdr_export_status_data", \@ids, "ok");
# TODO: should be tagged as ignored/skipped/whatever
upsert_export_status("accounting.int_cdr", "accounting.int_cdr_export_status_data", \@ignored_ids, "ok");
}
NGCP::CDR::Exporter::commit();
}
sub filestats_callback {
my ($data_row, $ref) = @_;
my $out = $$ref || [0, 0, 0, 0, 0];
for my $i (0 .. 4) { $$data_row[$i] //= 0 }
($$data_row[0] lt $$out[0] || !$$out[0]) and $$out[0] = $$data_row[0]; # min call start
$$out[1] += $$data_row[1]; # sum duration
($$data_row[0] gt $$out[2] || !$$out[2]) and $$out[2] = $$data_row[0]; # max call start
$$out[3] += $$data_row[2]; # sum carrier cost
$$out[4] += $$data_row[3]; # sum customer cost
$$ref = $out;
}
sub callback {
my ($row, $res_row, $data_row) = @_;
#my $quotes = confval('QUOTES');
#my $sep = prefval() || confval('CSV_SEP');
my @fields = @{ $row };
my $id = shift @fields;
my $src_uuid = shift @fields;
my $dst_uuid = shift @fields;
my $src_provid = shift @fields;
my $dst_provid = shift @fields;
#@fields = map { quote_field($_); } (@fields);
if(confval('EXPORT_INCOMING') eq "no" && $src_uuid eq "0") {
push @ignored_ids, $id;
return;
}
my $sep = prefval('system','cdr_export_field_separator') // confval('CSV_SEP');
my $quotes = confval('QUOTES');
my $escape_symbol = confval('CSV_ESC');
my $line = join($sep, map { quote_field($_,$sep,$quotes,$escape_symbol); }
apply_sclidui_rwrs('system',\@fields,scalar @fields - scalar @$row));
write_reseller('system', $line, \&filestats_callback, $data_row);
push(@ids, $id);
if($src_uuid ne "0") {
$sep = prefval($src_provid,'cdr_export_field_separator') // confval('CSV_SEP');
$quotes = confval('QUOTES');
$escape_symbol = confval('CSV_ESC');
$line = join($sep, map { quote_field($_,$sep,$quotes,$escape_symbol); } apply_sclidui_rwrs($src_provid,$res_row));
write_reseller_id($src_provid, $line, \&filestats_callback, $data_row);
}
if($dst_uuid ne "0") {
if(confval('EXPORT_INCOMING') eq "no" && $src_provid ne $dst_provid) {
# don't store incoming call to this reseller
} else {
if ($src_uuid ne '0' && $src_provid eq $dst_provid) {
# skip duplicate entries
} else {
$sep = prefval($dst_provid,'cdr_export_field_separator') // confval('CSV_SEP');
$quotes = confval('QUOTES');
$escape_symbol = confval('CSV_ESC');
$line = join($sep, map { quote_field($_,$sep,$quotes,$escape_symbol); } apply_sclidui_rwrs($dst_provid,$res_row));
write_reseller_id($dst_provid, $line, \&filestats_callback, $data_row);
}
}
}
}
__DATA__
This exists to allow the locking code at the beginning of the file to work.
DO NOT REMOVE THESE LINES!
Loading…
Cancel
Save