MT#12385 unify logic from cdr/event-exporter into a perl module

Squashed commit of the following:

commit 0ca0e801b83fbea43103041f2ba0d10e68caea24
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Tue Apr 7 12:18:22 2015 -0400

    support reseller export

commit 0170f79ad3440b3fcf85efb10a90b44e039f68a2
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Wed Apr 1 11:46:18 2015 -0400

    recursion fix

commit 1dfbb5857bccafb9778a44a96c97fae946bc3063
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Wed Apr 1 11:41:34 2015 -0400

    default export missing

commit db29662a668a9a4a8e98643b826b9aa7140f477c
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Wed Apr 1 11:06:59 2015 -0400

    MT#12385 unify logic from cdr/event-exporter into a perl module

    untested!
changes/49/1849/1
Richard Fuchs 11 years ago
parent e0c5fd1021
commit 0376ae2fa5

@ -0,0 +1,366 @@
package NGCP::CDR::Exporter;
use strict;
use warnings;
use v5.14;
use Config::Simple;
use DBI;
use Digest::MD5;
use NGCP::CDR::Export;
use File::Temp;
use File::Copy;
use File::Path;
use NGCP::CDR::Transfer;
use Data::Dumper;
BEGIN {
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT = qw(DEBUG confval write_reseller write_reseller_id update_export_status);
}
our $debug = 0;
our $collid = "exporter";
our @admin_fields;
our @reseller_fields;
my @joins;
my @conditions;
my $dbh;
my $q;
my $sth;
my %reseller_names;
my %reseller_ids;
my %reseller_lines;
my %mark;
my $dname;
my $tempdir;
my $file_ts;
my @reseller_positions;
# default config values
my %config = (
'default.FILTER_FLAPPING' => 0,
'default.MERGE_UPDATE' => 0,
'default.PREFIX' => 'ngcp',
'default.VERSION' => '007',
'default.SUFFIX' => 'cdr',
'default.FILES_OWNER' => 'cdrexport',
'default.FILES_GROUP' => 'cdrexport',
'default.FILES_MASK' => '022',
'default.TRANSFER_TYPE' => "none",
'default.TRANSFER_PORT' => 22,
'default.TRANSFER_USER' => "cdrexport",
'default.TRANSFER_KEY' => "/root/.ssh/id_rsa",
'default.TRANSFER_REMOTE' => "/home/jail/home/cdrexport"
);
sub DEBUG {
say join (' ', @_);
}
my @config_paths = (qw#
/etc/ngcp-cdr-exporter/
.
#);
sub config2array {
my $config_key = shift;
my $val = confval($config_key);
ref($val) eq 'ARRAY' and return @$val;
return $val;
}
sub get_config {
my ($coll, $cf, $conf_upd) = @_;
$collid = $coll;
for my $key (%$conf_upd) {
$config{'default.' . $key} = $$conf_upd{$key};
}
my $config_file;
foreach my $cp(@config_paths) {
if(-f "$cp/$cf") {
$config_file = "$cp/$cf";
last;
}
}
die "Config file $cf not found in path " . (join " or ", @config_paths) . "\n"
unless $config_file;
Config::Simple->import_from("$config_file" , \%config) or
die "Couldn't open the configuration file '$config_file'.\n";
# backwards compat
$config{'default.DESTDIR'} //= $config{'default.CDRDIR'} // $config{'default.EDRDIR'};
die "Invalid destination directory '".$config{'default.DESTDIR'}."'\n"
unless(-d $config{'default.DESTDIR'});
foreach my $f(config2array('ADMIN_EXPORT_FIELDS')) {
$f =~ s/^#.+//; next unless($f);
$f =~ s/^\'//; $f =~ s/\'$//;
push @admin_fields, $f;
}
foreach my $f(config2array('RESELLER_EXPORT_FIELDS')) {
$f =~ s/^#.+//; next unless($f);
$f =~ s/^\'//; $f =~ s/\'$//;
push @reseller_fields, $f;
}
foreach my $f(@{confval('EXPORT_JOINS')}) {
$f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/;
my ($a, $b) = split('\s*=>\s*{\s*', $f);
$a =~ s/^\s*\'//; $a =~ s/\'$//g;
$b =~ s/\s*\}\s*$//;
my ($c, $d) = split('\s*=>\s*', $b);
$c =~ s/^\s*\'//g; $c =~ s/\'\s*//;
$d =~ s/^\s*\'//g; $d =~ s/\'\s*//;
push @joins, { $a => { $c => $d } };
}
foreach my $f(@{confval('EXPORT_CONDITIONS')}) {
next unless($f);
$f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/;
my ($a, $b) = split('\s*=>\s*{\s*', $f);
$a =~ s/^\s*\'//; $a =~ s/\'$//g;
$b =~ s/\s*\}\s*$//;
my ($c, $d) = split('\s*=>\s*', $b);
$c =~ s/^\s*\'//g; $c =~ s/\'\s*//;
$d =~ s/^\s*\'//g; $d =~ s/\'\s*//;
push @conditions, { $a => { $c => $d } };
}
}
sub confval {
my ($val) = @_;
return $config{'default.' . $val};
}
sub prepare_dbh {
my ($trailer, $table) = @_;
$dbh = DBI->connect("dbi:mysql:" . confval('DBDB') .
";host=".confval('DBHOST'),
confval('DBUSER'), confval('DBPASS'))
or die "failed to connect to db: $DBI::errstr";
$dbh->{mysql_auto_reconnect} = 1;
$dbh->{AutoCommit} = 0;
my @intjoins = ();
foreach my $f(@joins) {
my ($table, $keys) = %{ $f };
my ($foreign_key, $own_key) = %{ $keys };
push @intjoins, "left outer join $table on $foreign_key = $own_key";
}
my @conds = ();
foreach my $f(@conditions) {
my ($field, $match) = %{ $f };
my ($op, $val) = %{ $match };
push @conds, "$field $op $val";
}
my @trail = ();
foreach my $f(@$trailer) {
my ($key, $val) = %{ $f };
push @trail, "$key $val";
}
# extract positions of reseller fields from admin fields
my %reseller_index;
@reseller_index{@admin_fields} = (0..$#admin_fields);
for(my $i = 0; $i < @reseller_fields; $i++) {
my $name = $reseller_fields[$i];
unless(exists $reseller_index{$name}) {
die "Invalid RESELLER_EXPORT_FIELDS element '$name', not available in ADMIN_EXPORT_FIELDS!";
}
push @reseller_positions, $reseller_index{$name};
}
$q = "select " .
join(", ", @admin_fields) . " from $table " .
join(" ", @intjoins) . " " .
"where " . join(" and ", @conds) . " " .
join(" ", @trail);
DEBUG $q if $debug;
}
sub prepare_output {
my $tempfh = File::Temp->newdir(undef, CLEANUP => 1);
$tempdir = $tempfh->dirname;
my $now = time();
my @now = localtime($now);
$file_ts = NGCP::CDR::Export::get_ts_for_filename(\@now);
my $full_name = (defined confval('FULL_NAMES') && confval('FULL_NAMES') eq "yes" ? 1 : 0);
my $monthly_dir = (defined confval('MONTHLY_DIR') && confval('MONTHLY_DIR') eq "yes" ? 1 : 0);
my $daily_dir = (defined confval('DAILY_DIR') && confval('DAILY_DIR') eq "yes" ? 1 : 0);
$dname = "";
if($monthly_dir && !$daily_dir) {
$dname .= sprintf("%04i%02i", $now[5] + 1900, $now[4] + 1);
$full_name or $file_ts = sprintf("%02i%02i%02i%02i", @now[3,2,1,0]);
} elsif(!$monthly_dir && $daily_dir) {
$dname .= sprintf("%04i%02i%02i", $now[5] + 1900, $now[4] + 1, $now[3]);
$full_name or $file_ts = sprintf("%02i%02i%02i", @now[2,1,0]);
} elsif($monthly_dir && $daily_dir) {
$dname .= sprintf("%04i%02i/%02i", $now[5] + 1900, $now[4] + 1, $now[3]);
$full_name or $file_ts = sprintf("%02i%02i%02i", @now[2,1,0]);
}
}
sub run {
my ($cb) = @_;
my $sth = $dbh->prepare($q);
$sth->execute();
while(my $row = $sth->fetchrow_arrayref) {
my @res_row = @$row[@reseller_positions];
$cb->($row, \@res_row);
}
}
sub write_reseller {
my ($reseller, $line) = @_;
push(@{$reseller_lines{$reseller}}, $line);
write_wrap($reseller);
}
sub write_reseller_id {
my ($id, $line) = @_;
if(!exists $reseller_names{$id}) {
$reseller_names{$id} = NGCP::CDR::Export::get_reseller_name($dbh, $id);
$reseller_ids{$reseller_names{$id}} = $id;
}
write_reseller($reseller_names{$id}, $line);
}
sub write_wrap {
my ($reseller, $force) = @_;
$force //= 0;
$reseller_lines{$reseller} //= [];
my $vals = $reseller_lines{$reseller};
my $rec_idx = @$vals;
my $max = confval('MAX_ROWS_PER_FILE') // $rec_idx;
($force == 0 && $rec_idx <= $max) and return;
($force == 1 && $rec_idx == 0) and return;
my $reseller_contract_id = "";
my $mark_query = undef;
unless($reseller eq "system") {
$reseller_contract_id = "-".$reseller_ids{$reseller};
$mark_query = [ $reseller_ids{$reseller} ];
}
if (!defined($mark{"lastseq".$reseller_contract_id})) {
my $tmpmark = NGCP::CDR::Export::get_mark($dbh, $collid, $mark_query);
%mark = ( %mark, %$tmpmark );
$mark{"lastseq".$reseller_contract_id} //= 0;
}
my $file_idx = $mark{"lastseq".$reseller_contract_id} // 0;
my $reseller_dname = $reseller . "/" . $dname;
if($reseller ne "system") {
$reseller_dname = "resellers/$reseller_dname";
}
my $reseller_tempdir = $tempdir . "/" . $reseller_dname;
do {
my $recs = ($rec_idx > $max) ? $max : $rec_idx;
$file_idx++;
my @filevals = @$vals[0 .. $recs-1];
@$vals = @$vals[$recs .. @$vals-1]; # modified $reseller_lines
my $err;
-d $reseller_tempdir || File::Path::make_path($reseller_tempdir, {error => \$err});
if(defined $err && @$err) {
DEBUG "!!! failed to create directory $reseller_tempdir: " . Dumper $err;
}
NGCP::CDR::Export::write_file(
\@filevals, $reseller_tempdir, confval('PREFIX'),
confval('VERSION'), $file_ts, $file_idx, confval('SUFFIX'),
);
$rec_idx -= $recs;
} while($rec_idx > 0);
opendir(my $fh, $reseller_tempdir);
foreach my $file(readdir($fh)) {
my $src = "$reseller_tempdir/$file";
my $dst = confval('DESTDIR') . "/$reseller_dname/$file";
if(-f $src) {
DEBUG "### moving $src to $dst\n";
my $err;
-d confval('DESTDIR') . "/$reseller_dname" ||
File::Path::make_path(confval('DESTDIR') . "/$reseller_dname", {error => \$err});
if(defined $err && @$err) {
DEBUG "!!! failed to create directory $reseller_dname: " . Dumper $err;
}
unless(copy($src, $dst)) {
DEBUG "!!! failed to move $src to $dst: $!\n";
} else {
DEBUG "### successfully moved $src to final destination $dst\n";
}
NGCP::CDR::Export::chownmod($dst, confval('FILES_OWNER'),
confval('FILES_GROUP'), oct(666),
confval('FILES_MASK'));
if(confval('TRANSFER_TYPE') eq "sftp") {
NGCP::CDR::Transfer::sftp(
$dst, confval('TRANSFER_HOST'),
confval('TRANSFER_PORT'),
confval('TRANSFER_REMOTE'),
confval('TRANSFER_USER'),
confval('TRANSFER_PASS'),
);
} elsif(confval('TRANSFER_TYPE') eq "sftp-sh") {
NGCP::CDR::Transfer::sftp_sh(
$dst, confval('TRANSFER_HOST'),
confval('TRANSFER_PORT'),
confval('TRANSFER_REMOTE'),
confval('TRANSFER_USER'),
confval('TRANSFER_KEY'),
);
}
}
}
$mark{"lastseq".$reseller_contract_id} = $file_idx;
NGCP::CDR::Export::set_mark($dbh, $collid, { "lastseq$reseller_contract_id" => $file_idx });
close($fh);
}
sub finish {
my @resellers = keys %reseller_lines;
for my $reseller (@resellers) {
write_wrap($reseller, 1);
}
# we write empty cdrs for resellers which didn't have a call during this
# export run, so get them into the list
my $missing_resellers = NGCP::CDR::Export::get_missing_resellers($dbh, [ keys %reseller_names ]);
for(my $i = 0; $i < @{ $missing_resellers->{names} }; ++$i) {
my $name = $missing_resellers->{names}->[$i];
my $id = $missing_resellers->{ids}->[$i];
push @resellers, $name;
$reseller_ids{$name} = $id;
$reseller_names{$id} = $name;
write_wrap($name, 2);
}
}
sub update_export_status {
NGCP::CDR::Export::update_export_status($dbh, @_);
}
sub commit {
$dbh->commit or die("failed to commit db changes: " . $dbh->errstr);
}
1;
# vim: set tabstop=4 expandtab:

@ -6,8 +6,8 @@ DBUSER=exporter
DBPASS=exporter
DBDB=accounting
#CDRDIR=/home/jail/home/cdrexport
CDRDIR=/tmp/cdr
#DESTDIR=/home/jail/home/cdrexport
DESTDIR=/tmp/cdr
MONTHLY_DIR=yes
DAILY_DIR=yes

@ -4,194 +4,54 @@ use strict;
use warnings;
use v5.14;
use Config::Simple;
use DBI;
use Digest::MD5;
use NGCP::CDR::Export;
use File::Temp;
use File::Copy;
use File::Path;
my $debug = 1;
my $collid = "exporter";
# default config values
my $config = {
'default.PREFIX' => 'ngcp',
'default.VERSION' => '007',
'default.SUFFIX' => 'cdr',
'default.FILES_OWNER' => 'cdrexport',
'default.FILES_GROUP' => 'cdrexport',
'default.FILES_MASK' => '022',
'default.TRANSFER_TYPE' => "none",
'default.TRANSFER_PORT' => 22,
'default.TRANSFER_USER' => "cdrexport",
'default.TRANSFER_REMOTE' => "/home/jail/home/cdrexport"
};
sub DEBUG {
say join (' ', @_);
}
use NGCP::CDR::Exporter;
my @config_paths = (qw#
/etc/ngcp-cdr-exporter/
.
#);
my $cf = 'cdr-exporter.conf';
my $config_file;
foreach my $cp(@config_paths) {
if(-f "$cp/$cf") {
$config_file = "$cp/$cf";
last;
}
}
die "Config file $cf not found in path " . (join " or ", @config_paths) . "\n"
unless $config_file;
Config::Simple->import_from("$config_file" , \%{$config}) or
die "Couldn't open the configuration file '$config_file'.\n";
die "Invalid destination directory '".$config->{'default.CDRDIR'}."'\n"
unless(-d $config->{'default.CDRDIR'});
my $now = time();
my @now = localtime($now);
my $file_ts = NGCP::CDR::Export::get_ts_for_filename(\@now);
my @admin_fields = ();
foreach my $f(@{$config->{'default.ADMIN_EXPORT_FIELDS'}}) {
$f =~ s/^#.+//; next unless($f);
$f =~ s/^\'//; $f =~ s/\'$//;
push @admin_fields, $f;
}
my @reseller_fields = ();
foreach my $f(@{$config->{'default.RESELLER_EXPORT_FIELDS'}}) {
$f =~ s/^#.+//; next unless($f);
$f =~ s/^\'//; $f =~ s/\'$//;
push @reseller_fields, $f;
}
# $NGCP::CDR::Exporter::debug = 1;
# my $collid = "exporter";
my @joins = ();
foreach my $f(@{$config->{'default.EXPORT_JOINS'}}) {
$f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/;
my ($a, $b) = split('\s*=>\s*{\s*', $f);
$a =~ s/^\s*\'//; $a =~ s/\'$//g;
$b =~ s/\s*\}\s*$//;
my ($c, $d) = split('\s*=>\s*', $b);
$c =~ s/^\s*\'//g; $c =~ s/\'\s*//;
$d =~ s/^\s*\'//g; $d =~ s/\'\s*//;
push @joins, { $a => { $c => $d } };
}
NGCP::CDR::Exporter::get_config('exporter', 'cdr-exporter.conf');
my @conditions = ();
foreach my $f(@{$config->{'default.EXPORT_CONDITIONS'}}) {
next unless($f);
$f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/;
my ($a, $b) = split('\s*=>\s*{\s*', $f);
$a =~ s/^\s*\'//; $a =~ s/\'$//g;
$b =~ s/\s*\}\s*$//;
my ($c, $d) = split('\s*=>\s*', $b);
$c =~ s/^\s*\'//g; $c =~ s/\'\s*//;
$d =~ s/^\s*\'//g; $d =~ s/\'\s*//;
push @conditions, { $a => { $c => $d } };
}
my @trailer = (
{ 'order by' => 'accounting.cdr.id' },
);
my $dbh = DBI->connect("dbi:mysql:" . $config->{'default.DBDB'} .
";host=".$config->{'default.DBHOST'},
$config->{'default.DBUSER'}, $config->{'default.DBPASS'})
or die "failed to connect to db: $DBI::errstr";
print("+++ Start run with DB " . (confval('DBUSER') || "(undef)") .
"\@".confval('DBDB')." to ".confval('PREFIX')."\n");
$dbh->{mysql_auto_reconnect} = 1;
$dbh->{AutoCommit} = 0;
my @intjoins = ();
foreach my $f(@joins) {
my ($table, $keys) = %{ $f };
my ($foreign_key, $own_key) = %{ $keys };
push @intjoins, "left outer join $table on $foreign_key = $own_key";
}
my @conds = ();
foreach my $f(@conditions) {
my ($field, $match) = %{ $f };
my ($op, $val) = %{ $match };
push @conds, "$field $op $val";
}
my @trail = ();
foreach my $f(@trailer) {
my ($key, $val) = %{ $f };
push @trail, "$key $val";
}
print("+++ Start run with DB " . ($config->{'default.DBUSER'} || "(undef)") .
"\@".$config->{'default.DBDB'}." to ".$config->{'default.PREFIX'}."\n");
# extract positions of reseller fields from admin fields
my @reseller_positions = ();
my %reseller_index;
@reseller_index{@admin_fields} = (0..$#admin_fields);
for(my $i = 0; $i < @reseller_fields; $i++) {
my $name = $reseller_fields[$i];
unless(exists $reseller_index{$name}) {
die "Invalid RESELLER_EXPORT_FIELDS element '$name', not available in ADMIN_EXPORT_FIELDS!";
}
push @reseller_positions, $reseller_index{$name};
}
# add fields we definitely need, will be removed during processing
unshift @admin_fields, qw/
unshift @NGCP::CDR::Exporter::admin_fields, qw/
accounting.cdr.id
accounting.cdr.source_user_id
accounting.cdr.destination_user_id
accounting.cdr.source_provider_id
accounting.cdr.destination_provider_id
/;
my @trailer = (
{ 'order by' => 'accounting.cdr.id' },
);
my $q = "select " .
join(", ", @admin_fields) . " from accounting.cdr " .
join(" ", @intjoins) . " " .
"where " . join(" and ", @conds) . " " .
join(" ", @trail);
#DEBUG $q if $debug;
NGCP::CDR::Exporter::prepare_dbh(\@trailer, 'accounting.cdr');
my $tempfh = File::Temp->newdir(undef, CLEANUP => 1);
my $tempdir = $tempfh->dirname;
my $sth = $dbh->prepare($q);
$sth->execute();
NGCP::CDR::Exporter::prepare_output();
my $written = 0;
my @ignored_ids = ();
my $reseller_names = {};
my $reseller_ids = {};
my $reseller_lines = {};
NGCP::CDR::Exporter::run(\&callback);
my @ignored_ids = ();
my @ids;
my %mark;
my $full_name = (defined $config->{'default.FULL_NAMES'} && $config->{'default.FULL_NAMES'} eq "yes" ? 1 : 0);
my $monthly_dir = (defined $config->{'default.MONTHLY_DIR'} && $config->{'default.MONTHLY_DIR'} eq "yes" ? 1 : 0);
my $daily_dir = (defined $config->{'default.DAILY_DIR'} && $config->{'default.DAILY_DIR'} eq "yes" ? 1 : 0);
my $dname = "";
if($monthly_dir && !$daily_dir) {
$dname .= sprintf("%04i%02i", $now[5] + 1900, $now[4] + 1);
$full_name or $file_ts = sprintf("%02i%02i%02i%02i", @now[3,2,1,0]);
} elsif(!$monthly_dir && $daily_dir) {
$dname .= sprintf("%04i%02i%02i", $now[5] + 1900, $now[4] + 1, $now[3]);
$full_name or $file_ts = sprintf("%02i%02i%02i", @now[2,1,0]);
} elsif($monthly_dir && $daily_dir) {
$dname .= sprintf("%04i%02i/%02i", $now[5] + 1900, $now[4] + 1, $now[3]);
$full_name or $file_ts = sprintf("%02i%02i%02i", @now[2,1,0]);
}
while(my $row = $sth->fetchrow_arrayref) {
sub callback {
my ($row, $res_row) = @_;
my @fields = @{ $row };
my $id = shift @fields;
my $src_uuid = shift @fields;
@ -200,37 +60,25 @@ while(my $row = $sth->fetchrow_arrayref) {
my $dst_provid = shift @fields;
@fields = map { defined $_ ? "'$_'" : "''" } (@fields);
if($config->{'default.EXPORT_INCOMING'} eq "no" && $src_uuid eq "0") {
if(confval('EXPORT_INCOMING') eq "no" && $src_uuid eq "0") {
push @ignored_ids, $id;
next;
}
my $line = join ",", @fields;
push(@{$reseller_lines->{'system'}}, $line);
write_wrap('system');
write_reseller('system', $line);
push(@ids, $id);
my @reseller_fields = @fields[@reseller_positions];
my $reseller_line = join ",", @reseller_fields;
my $reseller_line = join ",", @$res_row;
if($src_uuid ne "0") {
if(!exists $reseller_names->{$src_provid}) {
$reseller_names->{$src_provid} = NGCP::CDR::Export::get_reseller_name($dbh, $src_provid);
$reseller_ids->{$reseller_names->{$src_provid}} = $src_provid;
}
push(@{$reseller_lines->{$reseller_names->{$src_provid}}}, $reseller_line);
write_wrap($reseller_names->{$src_provid});
write_reseller_id($src_provid, $reseller_line);
}
if($dst_uuid ne "0") {
if($config->{'default.EXPORT_INCOMING'} eq "no" && $src_provid ne $dst_provid) {
if(confval('EXPORT_INCOMING') eq "no" && $src_provid ne $dst_provid) {
# don't store incoming call to this reseller
} else {
if(!exists $reseller_names->{$dst_provid}) {
$reseller_names->{$dst_provid} = NGCP::CDR::Export::get_reseller_name($dbh, $dst_provid);
$reseller_ids->{$reseller_names->{$dst_provid}} = $dst_provid;
}
push(@{$reseller_lines->{$reseller_names->{$dst_provid}}}, $reseller_line);
write_wrap($reseller_names->{$dst_provid});
write_reseller_id($dst_provid, $reseller_line);
}
}
}
@ -238,110 +86,15 @@ while(my $row = $sth->fetchrow_arrayref) {
#DEBUG "ignoring cdr ids " . (join ",", @ignored_ids);
my @resellers = keys %$reseller_lines;
for my $reseller (@resellers) {
write_wrap($reseller, 1);
}
# we write empty cdrs for resellers which didn't have a call during this
# export run, so get them into the list
my $missing_resellers = NGCP::CDR::Export::get_missing_resellers($dbh, [ keys %$reseller_names ]);
for(my $i = 0; $i < @{ $missing_resellers->{names} }; ++$i) {
my $name = $missing_resellers->{names}->[$i];
my $id = $missing_resellers->{ids}->[$i];
push @resellers, $name;
$reseller_ids->{$name} = $id;
$reseller_names->{$id} = $name;
write_wrap($name, 2);
}
sub write_wrap {
my ($reseller, $force) = @_;
$force //= 0;
$reseller_lines->{$reseller} //= [];
my $vals = $reseller_lines->{$reseller};
my $rec_idx = @$vals;
my $max = $config->{'default.MAX_ROWS_PER_FILE'} // $rec_idx;
($force == 0 && $rec_idx < $max) and return;
($force == 1 && $rec_idx == 0) and return;
my $reseller_contract_id = "";
my $mark_query = undef;
unless($reseller eq "system") {
$reseller_contract_id = "-".$reseller_ids->{$reseller};
$mark_query = [ $reseller_ids->{$reseller} ];
}
if (!defined($mark{"lastseq".$reseller_contract_id})) {
my $tmpmark = NGCP::CDR::Export::get_mark($dbh, $collid, $mark_query);
%mark = ( %mark, %$tmpmark );
$mark{"lastseq".$reseller_contract_id} //= 0;
}
my $file_idx = $mark{"lastseq".$reseller_contract_id} // 0;
my $reseller_dname = $reseller . "/" . $dname;
if($reseller ne "system") {
$reseller_dname = "resellers/$reseller_dname";
}
my $reseller_tempdir = $tempdir . "/" . $reseller_dname;
do {
my $recs = ($rec_idx > $max) ? $max : $rec_idx;
NGCP::CDR::Exporter::finish();
$file_idx++;
my @filevals = @$vals[0 .. $recs-1];
@$vals = @$vals[$recs .. @$vals-1]; # modified $reseller_lines
my $err;
-d $reseller_tempdir || File::Path::make_path($reseller_tempdir, {error => \$err});
if(defined $err && @$err) {
DEBUG "!!! failed to create directory $reseller_tempdir: " . Dumper $err;
}
NGCP::CDR::Export::write_file(
\@filevals, $reseller_tempdir, $config->{'default.PREFIX'},
$config->{'default.VERSION'}, $file_ts, $file_idx, $config->{'default.SUFFIX'},
);
$rec_idx -= $recs;
} while($rec_idx > 0);
opendir(my $fh, $reseller_tempdir);
foreach my $file(readdir($fh)) {
my $src = "$reseller_tempdir/$file";
my $dst = $config->{'default.CDRDIR'} . "/$reseller_dname/$file";
if(-f $src) {
DEBUG "### moving $src to $dst\n";
my $err;
-d $config->{'default.CDRDIR'} . "/$reseller_dname" ||
File::Path::make_path($config->{'default.CDRDIR'} . "/$reseller_dname", {error => \$err});
if(defined $err && @$err) {
DEBUG "!!! failed to create directory $reseller_dname: " . Dumper $err;
}
unless(copy($src, $dst)) {
DEBUG "!!! failed to move $src to $dst: $!\n";
} else {
DEBUG "### successfully moved $src to final destination $dst\n";
}
NGCP::CDR::Export::chownmod($dst, $config->{'default.FILES_OWNER'},
$config->{'default.FILES_GROUP'}, oct(666),
$config->{'default.FILES_MASK'});
if($config->{'default.TRANSFER_TYPE'} eq "sftp") {
NGCP::CDR::Transfer::sftp(
$dst, $config->{'default.TRANSFER_HOST'},
$config->{'default.TRANSFER_PORT'},
$config->{'default.TRANSFER_REMOTE'},
$config->{'default.TRANSFER_USER'},
$config->{'default.TRANSFER_PASS'},
);
}
}
}
$mark{"lastseq".$reseller_contract_id} = $file_idx;
NGCP::CDR::Export::set_mark($dbh, $collid, { "lastseq$reseller_contract_id" => $file_idx });
close($fh);
}
NGCP::CDR::Export::update_export_status($dbh, "accounting.cdr", \@ids, "ok");
update_export_status("accounting.cdr", \@ids, "ok");
# TODO: should be tagged as ignored/skipped/whatever
NGCP::CDR::Export::update_export_status($dbh, "accounting.cdr", \@ignored_ids, "ok");
$dbh->commit or die("failed to commit db changes: " . $dbh->errstr);
update_export_status("accounting.cdr", \@ignored_ids, "ok");
NGCP::CDR::Exporter::commit();

@ -3,8 +3,8 @@ DBUSER=exporter
DBPASS=exporter
DBDB=accounting
#EDRDIR=/home/jail/home/cdrexport
EDRDIR=/tmp
#DESTDIR=/home/jail/home/cdrexport
DESTDIR=/tmp
# comment out to export all into one file per run
MAX_ROWS_PER_FILE=5000
@ -31,7 +31,9 @@ TRANSFER_PASS=export!!!
TRANSFER_KEY=/path/to/priv-key
TRANSFER_REMOTE=/home/jail/home/cdrexport
EXPORT_FIELDS = 'accounting.events.id', 'accounting.events.type', 'billing.contracts.external_id', 'billing.contacts.company', 'billing.voip_subscribers.external_id', '(select username from provisioning.voip_dbaliases tmp where tmp.subscriber_id = provisioning.voip_subscribers.id order by is_primary, id limit 1)', #'accounting.events.old_status', 'old_profile.name', #'accounting.events.new_status', 'new_profile.name', 'from_unixtime(accounting.events.timestamp)'
ADMIN_EXPORT_FIELDS = 'accounting.events.id', 'accounting.events.type', 'billing.contracts.external_id', 'billing.contacts.company', 'billing.voip_subscribers.external_id', '(select username from provisioning.voip_dbaliases tmp where tmp.subscriber_id = provisioning.voip_subscribers.id order by is_primary, id limit 1)', #'accounting.events.old_status', 'old_profile.name', #'accounting.events.new_status', 'new_profile.name', 'from_unixtime(accounting.events.timestamp)', 'accounting.events.reseller_id'
RESELLER_EXPORT_FIELDS = 'accounting.events.id', 'accounting.events.type', 'billing.contracts.external_id', 'billing.contacts.company', 'billing.voip_subscribers.external_id', '(select username from provisioning.voip_dbaliases tmp where tmp.subscriber_id = provisioning.voip_subscribers.id order by is_primary, id limit 1)', #'accounting.events.old_status', 'old_profile.name', #'accounting.events.new_status', 'new_profile.name', 'from_unixtime(accounting.events.timestamp)'
EXPORT_JOINS = { 'billing.voip_subscribers' => { 'billing.voip_subscribers.id' => 'accounting.events.subscriber_id' } }, { 'billing.contracts' => { 'billing.contracts.id' => 'billing.voip_subscribers.contract_id' } }, { 'billing.contacts' => { 'billing.contacts.id' => 'billing.contracts.contact_id' } }, { 'provisioning.voip_subscribers' => { 'provisioning.voip_subscribers.uuid' => 'billing.voip_subscribers.uuid' } }, { 'provisioning.voip_subscriber_profiles as old_profile' => { 'old_profile.id' => 'accounting.events.old_status' } }, { 'provisioning.voip_subscriber_profiles as new_profile' => { 'new_profile.id' => 'accounting.events.new_status' } }

@ -1,169 +1,63 @@
#!/usr/bin/perl -w
#!/usr/bin/perl
use strict;
use warnings;
use v5.14;
use Config::Simple;
use DBI;
use File::Temp;
use File::Copy;
use NGCP::CDR::Export;
use NGCP::CDR::Transfer;
use Data::Dumper;
my $collid = "eventexporter";
my $debug = 0;
# default config values
my $config = {
'default.FILTER_FLAPPING' => 0,
'default.MERGE_UPDATE' => 0,
'default.PREFIX' => 'sipwise',
'default.VERSION' => '001',
'default.SUFFIX' => 'edr',
'default.FILES_OWNER' => 'cdrexport',
'default.FILES_GROUP' => 'cdrexport',
'default.FILES_MASK' => '022',
'default.TRANSFER_TYPE' => "none",
'default.TRANSFER_PORT' => 22,
'default.TRANSFER_USER' => "cdrexport",
'default.TRANSFER_KEY' => "/root/.ssh/id_rsa",
'default.TRANSFER_REMOTE' => "/home/jail/home/cdrexport"
};
sub DEBUG {
say join (' ', @_);
}
my @config_paths = (qw#
/etc/ngcp-cdr-exporter/
.
#);
my $cf = 'event-exporter.conf';
my $config_file;
foreach my $cp(@config_paths) {
if(-f "$cp/$cf") {
$config_file = "$cp/$cf";
last;
}
}
die "Config file $cf not found in path " . (join " or ", @config_paths) . "\n"
unless $config_file;
Config::Simple->import_from("$config_file" , \%{$config}) or
die "Couldn't open the configuration file '$config_file'.\n";
die "Invalid destination directory '".$config->{'default.EDRDIR'}."'\n"
unless(-d $config->{'default.EDRDIR'});
use NGCP::CDR::Exporter;
my @fields = ();
foreach my $f(@{$config->{'default.EXPORT_FIELDS'}}) {
$f =~ s/^#.+//; next unless($f);
$f =~ s/^\'//; $f =~ s/\'$//;
push @fields, $f;
}
# $NGCP::CDR::Exporter::debug = 1;
# my $collid = "eventexporter";
my @joins = ();
sub config2array {
my $config_key = shift;
return ('ARRAY' eq ref $config->{$config_key}) ? $config->{$config_key} : [$config->{$config_key}];
}
foreach my $f( @{config2array('default.EXPORT_JOINS')} ) {
next unless($f);
$f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/;
my ($a, $b) = split('\s*=>\s*{\s*', $f);
$a =~ s/^\s*\'//; $a =~ s/\'$//g;
$b =~ s/\s*\}\s*$//;
my ($c, $d) = split('\s*=>\s*', $b);
$c =~ s/^\s*\'//g; $c =~ s/\'\s*//;
$d =~ s/^\s*\'//g; $d =~ s/\'\s*//;
push @joins, { $a => { $c => $d } };
}
# default config values overrides
my $config = {
'PREFIX' => 'sipwise',
'VERSION' => '001',
'SUFFIX' => 'edr',
};
my @conditions = ();
foreach my $f(@{config2array('default.EXPORT_CONDITIONS')}) {
next unless($f);
$f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/;
my ($a, $b) = split('\s*=>\s*{\s*', $f);
$a =~ s/^\s*\'//; $a =~ s/\'$//g;
$b =~ s/\s*\}\s*$//;
my ($c, $d) = split('\s*=>\s*', $b);
$c =~ s/^\s*\'//g; $c =~ s/\'\s*//;
$d =~ s/^\s*\'//g; $d =~ s/\'\s*//;
push @conditions, { $a => { $c => $d } };
}
NGCP::CDR::Exporter::get_config('eventexporter', 'event-exporter.conf', $config);
my $dbh = DBI->connect("dbi:mysql:" . $config->{'default.DBDB'} .
";host=".$config->{'default.DBHOST'},
$config->{'default.DBUSER'}, $config->{'default.DBPASS'})
or die "failed to connect to db: $DBI::errstr";
$dbh->{mysql_auto_reconnect} = 1;
$dbh->{AutoCommit} = 0;
my @trailer = (
{ 'order by' => 'accounting.events.id' },
);
# make sure we always select id, subscriber_id, type, old and new;
# if you change it, make sure to adapt slice in the loop too!
unshift @fields, (qw/
unshift @NGCP::CDR::Exporter::admin_fields, (qw/
accounting.events.id accounting.events.subscriber_id accounting.events.reseller_id
accounting.events.type accounting.events.old_status accounting.events.new_status
/);
my @trailer = (
{ 'order by' => 'accounting.events.id' },
);
my @intjoins = ();
foreach my $f(@joins) {
my ($table, $keys) = %{ $f };
my ($foreign_key, $own_key) = %{ $keys };
push @intjoins, "left outer join $table on $foreign_key = $own_key";
}
my @conds = ();
foreach my $f(@conditions) {
my ($field, $match) = %{ $f };
my ($op, $val) = %{ $match };
push @conds, "$field $op $val";
}
my @trail = ();
foreach my $f(@trailer) {
my ($key, $val) = %{ $f };
push @trail, "$key $val";
}
my $file_ts = NGCP::CDR::Export::get_ts_for_filename;
my $mark = NGCP::CDR::Export::get_mark($dbh, $collid);
my $q = "select " .
join(", ", @fields) . " from accounting.events " .
join(" ", @intjoins) . " " .
"where " . join(" and ", @conds) . " " .
join(" ", @trail);
NGCP::CDR::Exporter::prepare_dbh(\@trailer, 'accounting.events');
DEBUG $q if $debug;
my $tempfh = File::Temp->newdir(undef, CLEANUP => 1);
my $tempdir = $tempfh->dirname;
NGCP::CDR::Exporter::prepare_output();
my $sth = $dbh->prepare($q);
$sth->execute();
my ($rec_idx, $file_idx) = (0, $mark->{lastseq});
my $written = 0;
my %lines = ();
my $rows = $sth->fetchall_arrayref();
my %res_lines;
my %filter = ();
my @filter_ids = ();
while(my $row = shift @{ $rows }) {
NGCP::CDR::Exporter::run(\&callback);
sub callback {
my ($row, $res_row) = @_;
my @head = @{ $row }[0 .. 5];
my ($id, $sub_id, $res_id, $type, $old, $new) = @head;
my @fields = map { defined $_ ? "\"$_\"" : '""' } (@{ $row }[6 .. @{ $row }-1]);
my $line = join ",", @fields;
my $reseller_line = join ",", @$res_row;
if($config->{'default.FILTER_FLAPPING'}) {
if(confval('FILTER_FLAPPING')) {
if($type =~ /^start_(.+)$/) {
my $t = $1;
my $k = "$sub_id;$t;$new";
@ -172,10 +66,9 @@ while(my $row = shift @{ $rows }) {
} else {
push @{ $filter{$k} }, $id;
}
my $line = join ",", @fields;
$lines{$id} = $line;
$rec_idx++;
} elsif($config->{'default.MERGE_UPDATE'} && $type =~ /^update_(.+)$/) {
$res_id and $res_lines{$res_id}{$id} = $reseller_line;
} elsif(confval('MERGE_UPDATE') && $type =~ /^update_(.+)$/) {
my $t = $1;
my $k = "$sub_id;$t;$old";
my $ids = $filter{$k} // [];
@ -183,18 +76,19 @@ while(my $row = shift @{ $rows }) {
my $old_id = pop @{ $ids };
say "... id $id is an update event of id $old_id, merge";
delete $lines{$old_id};
$res_id and delete $res_lines{$res_id}{$old_id};
push @filter_ids, $old_id;
my $line = join ",", @fields;
$line =~ s/\"update_/\"start_/;
$reseller_line =~ s/\"update_/\"start_/;
$lines{$id} = $line;
$res_id and $res_lines{$res_id}{$id} = $reseller_line;
delete $filter{$k};
$k = "$sub_id;$t;$new";
push @{ $ids }, ($old_id, $id);
$filter{$k} = $ids;
} else {
my $line = join ",", @fields;
$lines{$id} = $line;
$rec_idx++;
$res_id and $res_lines{$res_id}{$id} = $reseller_line;
}
} elsif($type =~ /^(?:stop|end)_(.+)$/) {
my $t = $1;
@ -205,79 +99,52 @@ while(my $row = shift @{ $rows }) {
say "... id $id is an end event of id $old_id, filter";
push @filter_ids, ($id, $old_id);
delete $lines{$old_id};
$rec_idx--;
$res_id and delete $res_lines{$res_id}{$old_id};
$filter{$k} = $ids;
} else {
my $line = join ",", @fields;
$lines{$id} = $line;
$rec_idx++;
$res_id and $res_lines{$res_id}{$id} = $reseller_line;
}
} else {
my $line = join ",", @fields;
$lines{$id} = $line;
$rec_idx++;
$res_id and $res_lines{$res_id}{$id} = $reseller_line;
}
} else {
my $line = join ",", @fields;
$lines{$id} = $line;
$rec_idx++;
$res_id and $res_lines{$res_id}{$id} = $reseller_line;
}
}
my @vals = map { $lines{$_} } sort { int($a) <=> int($b) } keys %lines;
my @ids = keys %lines;
my $max = $config->{'default.MAX_ROWS_PER_FILE'} // $rec_idx;
do {
my $recs = ($rec_idx > $max) ? $max : $rec_idx;
$file_idx++;
my @filevals = @vals[0 .. $recs-1];
@vals = @vals[$recs .. @vals-1];
NGCP::CDR::Export::write_file(
\@filevals, $tempdir, $config->{'default.PREFIX'},
$config->{'default.VERSION'}, $file_ts, $file_idx, $config->{'default.SUFFIX'},
);
$rec_idx -= $recs;
} while($rec_idx > 0);
NGCP::CDR::Export::update_export_status($dbh, "accounting.events", \@filter_ids, "filtered");
NGCP::CDR::Export::update_export_status($dbh, "accounting.events", \@ids, "ok");
NGCP::CDR::Export::set_mark($dbh, $collid, { lastseq => $file_idx });
$dbh->commit or die("failed to commit db changes: " . $dbh->errstr);
opendir(my $fh, $tempdir);
foreach my $file(readdir($fh)) {
my $src = "$tempdir/$file";
my $dst = $config->{'default.EDRDIR'}."/$file";
if(-f $src) {
DEBUG "### moving $src to $dst\n";
copy($src, $dst);
NGCP::CDR::Export::chownmod($dst, $config->{'default.FILES_OWNER'},
$config->{'default.FILES_GROUP'}, oct(666),
$config->{'default.FILES_MASK'});
if($config->{'default.TRANSFER_TYPE'} eq "sftp") {
NGCP::CDR::Transfer::sftp(
$dst, $config->{'default.TRANSFER_HOST'},
$config->{'default.TRANSFER_PORT'},
$config->{'default.TRANSFER_REMOTE'},
$config->{'default.TRANSFER_USER'},
$config->{'default.TRANSFER_PASS'},
);
} elsif($config->{'default.TRANSFER_TYPE'} eq "sftp-sh") {
NGCP::CDR::Transfer::sftp_sh(
$dst, $config->{'default.TRANSFER_HOST'},
$config->{'default.TRANSFER_PORT'},
$config->{'default.TRANSFER_REMOTE'},
$config->{'default.TRANSFER_USER'},
$config->{'default.TRANSFER_KEY'},
);
}
}
for my $val (@vals) {
write_reseller('system', $val);
}
for my $res (keys(%res_lines)) {
my $res_lines = $res_lines{$res};
my @ids = values(%$res_lines);
@ids = sort {$a <=> $b} (@ids);
for my $id (@ids) {
my $val = $lines{$id};
$val or next;
write_reseller_id($res, $val);
}
}
close($fh);
NGCP::CDR::Exporter::finish();
my @ids = keys %lines;
update_export_status("accounting.events", \@filter_ids, "filtered");
update_export_status("accounting.events", \@ids, "ok");
NGCP::CDR::Exporter::commit();
# vim: set tabstop=4 expandtab:

Loading…
Cancel
Save