diff --git a/NGCP/CDR/Exporter.pm b/NGCP/CDR/Exporter.pm new file mode 100644 index 0000000..ce60008 --- /dev/null +++ b/NGCP/CDR/Exporter.pm @@ -0,0 +1,366 @@ +package NGCP::CDR::Exporter; + +use strict; +use warnings; +use v5.14; + +use Config::Simple; +use DBI; +use Digest::MD5; +use NGCP::CDR::Export; +use File::Temp; +use File::Copy; +use File::Path; +use NGCP::CDR::Transfer; +use Data::Dumper; + +BEGIN { + require Exporter; + our @ISA = qw(Exporter); + our @EXPORT = qw(DEBUG confval write_reseller write_reseller_id update_export_status); +} + +our $debug = 0; +our $collid = "exporter"; + +our @admin_fields; +our @reseller_fields; +my @joins; +my @conditions; +my $dbh; +my $q; +my $sth; +my %reseller_names; +my %reseller_ids; +my %reseller_lines; +my %mark; +my $dname; +my $tempdir; +my $file_ts; +my @reseller_positions; + +# default config values +my %config = ( + 'default.FILTER_FLAPPING' => 0, + 'default.MERGE_UPDATE' => 0, + 'default.PREFIX' => 'ngcp', + 'default.VERSION' => '007', + 'default.SUFFIX' => 'cdr', + 'default.FILES_OWNER' => 'cdrexport', + 'default.FILES_GROUP' => 'cdrexport', + 'default.FILES_MASK' => '022', + 'default.TRANSFER_TYPE' => "none", + 'default.TRANSFER_PORT' => 22, + 'default.TRANSFER_USER' => "cdrexport", + 'default.TRANSFER_KEY' => "/root/.ssh/id_rsa", + 'default.TRANSFER_REMOTE' => "/home/jail/home/cdrexport" +); + +sub DEBUG { + say join (' ', @_); +} + +my @config_paths = (qw# + /etc/ngcp-cdr-exporter/ + . +#); + +sub config2array { + my $config_key = shift; + my $val = confval($config_key); + ref($val) eq 'ARRAY' and return @$val; + return $val; +} + +sub get_config { + my ($coll, $cf, $conf_upd) = @_; + + $collid = $coll; + + for my $key (%$conf_upd) { + $config{'default.' . $key} = $$conf_upd{$key}; + } + + my $config_file; + foreach my $cp(@config_paths) { + if(-f "$cp/$cf") { + $config_file = "$cp/$cf"; + last; + } + } + die "Config file $cf not found in path " . (join " or ", @config_paths) . "\n" + unless $config_file; + + Config::Simple->import_from("$config_file" , \%config) or + die "Couldn't open the configuration file '$config_file'.\n"; + + # backwards compat + $config{'default.DESTDIR'} //= $config{'default.CDRDIR'} // $config{'default.EDRDIR'}; + + die "Invalid destination directory '".$config{'default.DESTDIR'}."'\n" + unless(-d $config{'default.DESTDIR'}); + + foreach my $f(config2array('ADMIN_EXPORT_FIELDS')) { + $f =~ s/^#.+//; next unless($f); + $f =~ s/^\'//; $f =~ s/\'$//; + push @admin_fields, $f; + } + foreach my $f(config2array('RESELLER_EXPORT_FIELDS')) { + $f =~ s/^#.+//; next unless($f); + $f =~ s/^\'//; $f =~ s/\'$//; + push @reseller_fields, $f; + } + + foreach my $f(@{confval('EXPORT_JOINS')}) { + $f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/; + my ($a, $b) = split('\s*=>\s*{\s*', $f); + $a =~ s/^\s*\'//; $a =~ s/\'$//g; + $b =~ s/\s*\}\s*$//; + my ($c, $d) = split('\s*=>\s*', $b); + $c =~ s/^\s*\'//g; $c =~ s/\'\s*//; + $d =~ s/^\s*\'//g; $d =~ s/\'\s*//; + push @joins, { $a => { $c => $d } }; + } + + foreach my $f(@{confval('EXPORT_CONDITIONS')}) { + next unless($f); + $f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/; + my ($a, $b) = split('\s*=>\s*{\s*', $f); + $a =~ s/^\s*\'//; $a =~ s/\'$//g; + $b =~ s/\s*\}\s*$//; + + my ($c, $d) = split('\s*=>\s*', $b); + $c =~ s/^\s*\'//g; $c =~ s/\'\s*//; + $d =~ s/^\s*\'//g; $d =~ s/\'\s*//; + push @conditions, { $a => { $c => $d } }; + } +} + + +sub confval { + my ($val) = @_; + return $config{'default.' . $val}; +} + +sub prepare_dbh { + my ($trailer, $table) = @_; + + $dbh = DBI->connect("dbi:mysql:" . confval('DBDB') . + ";host=".confval('DBHOST'), + confval('DBUSER'), confval('DBPASS')) + or die "failed to connect to db: $DBI::errstr"; + + $dbh->{mysql_auto_reconnect} = 1; + $dbh->{AutoCommit} = 0; + + my @intjoins = (); + foreach my $f(@joins) { + my ($table, $keys) = %{ $f }; + my ($foreign_key, $own_key) = %{ $keys }; + push @intjoins, "left outer join $table on $foreign_key = $own_key"; + } + my @conds = (); + foreach my $f(@conditions) { + my ($field, $match) = %{ $f }; + my ($op, $val) = %{ $match }; + push @conds, "$field $op $val"; + } + my @trail = (); + foreach my $f(@$trailer) { + my ($key, $val) = %{ $f }; + push @trail, "$key $val"; + } + + # extract positions of reseller fields from admin fields + my %reseller_index; + @reseller_index{@admin_fields} = (0..$#admin_fields); + for(my $i = 0; $i < @reseller_fields; $i++) { + my $name = $reseller_fields[$i]; + unless(exists $reseller_index{$name}) { + die "Invalid RESELLER_EXPORT_FIELDS element '$name', not available in ADMIN_EXPORT_FIELDS!"; + } + push @reseller_positions, $reseller_index{$name}; + } + + $q = "select " . + join(", ", @admin_fields) . " from $table " . + join(" ", @intjoins) . " " . + "where " . join(" and ", @conds) . " " . + join(" ", @trail); + + DEBUG $q if $debug; + +} + +sub prepare_output { + my $tempfh = File::Temp->newdir(undef, CLEANUP => 1); + $tempdir = $tempfh->dirname; + + my $now = time(); + my @now = localtime($now); + $file_ts = NGCP::CDR::Export::get_ts_for_filename(\@now); + + my $full_name = (defined confval('FULL_NAMES') && confval('FULL_NAMES') eq "yes" ? 1 : 0); + my $monthly_dir = (defined confval('MONTHLY_DIR') && confval('MONTHLY_DIR') eq "yes" ? 1 : 0); + my $daily_dir = (defined confval('DAILY_DIR') && confval('DAILY_DIR') eq "yes" ? 1 : 0); + $dname = ""; + if($monthly_dir && !$daily_dir) { + $dname .= sprintf("%04i%02i", $now[5] + 1900, $now[4] + 1); + $full_name or $file_ts = sprintf("%02i%02i%02i%02i", @now[3,2,1,0]); + } elsif(!$monthly_dir && $daily_dir) { + $dname .= sprintf("%04i%02i%02i", $now[5] + 1900, $now[4] + 1, $now[3]); + $full_name or $file_ts = sprintf("%02i%02i%02i", @now[2,1,0]); + } elsif($monthly_dir && $daily_dir) { + $dname .= sprintf("%04i%02i/%02i", $now[5] + 1900, $now[4] + 1, $now[3]); + $full_name or $file_ts = sprintf("%02i%02i%02i", @now[2,1,0]); + } +} + +sub run { + my ($cb) = @_; + + my $sth = $dbh->prepare($q); + $sth->execute(); + while(my $row = $sth->fetchrow_arrayref) { + my @res_row = @$row[@reseller_positions]; + $cb->($row, \@res_row); + } +} + +sub write_reseller { + my ($reseller, $line) = @_; + push(@{$reseller_lines{$reseller}}, $line); + write_wrap($reseller); +} + +sub write_reseller_id { + my ($id, $line) = @_; + if(!exists $reseller_names{$id}) { + $reseller_names{$id} = NGCP::CDR::Export::get_reseller_name($dbh, $id); + $reseller_ids{$reseller_names{$id}} = $id; + } + write_reseller($reseller_names{$id}, $line); +} + +sub write_wrap { + my ($reseller, $force) = @_; + $force //= 0; + $reseller_lines{$reseller} //= []; + my $vals = $reseller_lines{$reseller}; + my $rec_idx = @$vals; + my $max = confval('MAX_ROWS_PER_FILE') // $rec_idx; + ($force == 0 && $rec_idx <= $max) and return; + ($force == 1 && $rec_idx == 0) and return; + my $reseller_contract_id = ""; + my $mark_query = undef; + unless($reseller eq "system") { + $reseller_contract_id = "-".$reseller_ids{$reseller}; + $mark_query = [ $reseller_ids{$reseller} ]; + } + if (!defined($mark{"lastseq".$reseller_contract_id})) { + my $tmpmark = NGCP::CDR::Export::get_mark($dbh, $collid, $mark_query); + %mark = ( %mark, %$tmpmark ); + $mark{"lastseq".$reseller_contract_id} //= 0; + } + my $file_idx = $mark{"lastseq".$reseller_contract_id} // 0; + my $reseller_dname = $reseller . "/" . $dname; + if($reseller ne "system") { + $reseller_dname = "resellers/$reseller_dname"; + } + my $reseller_tempdir = $tempdir . "/" . $reseller_dname; + + do { + my $recs = ($rec_idx > $max) ? $max : $rec_idx; + + $file_idx++; + my @filevals = @$vals[0 .. $recs-1]; + @$vals = @$vals[$recs .. @$vals-1]; # modified $reseller_lines + + my $err; + -d $reseller_tempdir || File::Path::make_path($reseller_tempdir, {error => \$err}); + if(defined $err && @$err) { + DEBUG "!!! failed to create directory $reseller_tempdir: " . Dumper $err; + } + + NGCP::CDR::Export::write_file( + \@filevals, $reseller_tempdir, confval('PREFIX'), + confval('VERSION'), $file_ts, $file_idx, confval('SUFFIX'), + ); + $rec_idx -= $recs; + + } while($rec_idx > 0); + + opendir(my $fh, $reseller_tempdir); + foreach my $file(readdir($fh)) { + my $src = "$reseller_tempdir/$file"; + my $dst = confval('DESTDIR') . "/$reseller_dname/$file"; + if(-f $src) { + DEBUG "### moving $src to $dst\n"; + my $err; + -d confval('DESTDIR') . "/$reseller_dname" || + File::Path::make_path(confval('DESTDIR') . "/$reseller_dname", {error => \$err}); + if(defined $err && @$err) { + DEBUG "!!! failed to create directory $reseller_dname: " . Dumper $err; + } + unless(copy($src, $dst)) { + DEBUG "!!! failed to move $src to $dst: $!\n"; + } else { + DEBUG "### successfully moved $src to final destination $dst\n"; + } + NGCP::CDR::Export::chownmod($dst, confval('FILES_OWNER'), + confval('FILES_GROUP'), oct(666), + confval('FILES_MASK')); + if(confval('TRANSFER_TYPE') eq "sftp") { + NGCP::CDR::Transfer::sftp( + $dst, confval('TRANSFER_HOST'), + confval('TRANSFER_PORT'), + confval('TRANSFER_REMOTE'), + confval('TRANSFER_USER'), + confval('TRANSFER_PASS'), + ); + } elsif(confval('TRANSFER_TYPE') eq "sftp-sh") { + NGCP::CDR::Transfer::sftp_sh( + $dst, confval('TRANSFER_HOST'), + confval('TRANSFER_PORT'), + confval('TRANSFER_REMOTE'), + confval('TRANSFER_USER'), + confval('TRANSFER_KEY'), + ); + } + } + } + $mark{"lastseq".$reseller_contract_id} = $file_idx; + NGCP::CDR::Export::set_mark($dbh, $collid, { "lastseq$reseller_contract_id" => $file_idx }); + close($fh); +} + +sub finish { + my @resellers = keys %reseller_lines; + for my $reseller (@resellers) { + write_wrap($reseller, 1); + } + + # we write empty cdrs for resellers which didn't have a call during this + # export run, so get them into the list + my $missing_resellers = NGCP::CDR::Export::get_missing_resellers($dbh, [ keys %reseller_names ]); + for(my $i = 0; $i < @{ $missing_resellers->{names} }; ++$i) { + my $name = $missing_resellers->{names}->[$i]; + my $id = $missing_resellers->{ids}->[$i]; + push @resellers, $name; + $reseller_ids{$name} = $id; + $reseller_names{$id} = $name; + write_wrap($name, 2); + } +} + +sub update_export_status { + NGCP::CDR::Export::update_export_status($dbh, @_); +} + +sub commit { + $dbh->commit or die("failed to commit db changes: " . $dbh->errstr); +} + +1; + +# vim: set tabstop=4 expandtab: diff --git a/cdr-exporter.conf b/cdr-exporter.conf index c5fbf99..67cd4af 100644 --- a/cdr-exporter.conf +++ b/cdr-exporter.conf @@ -6,8 +6,8 @@ DBUSER=exporter DBPASS=exporter DBDB=accounting -#CDRDIR=/home/jail/home/cdrexport -CDRDIR=/tmp/cdr +#DESTDIR=/home/jail/home/cdrexport +DESTDIR=/tmp/cdr MONTHLY_DIR=yes DAILY_DIR=yes diff --git a/cdr-exporter.pl b/cdr-exporter.pl index b5165cc..260c2d3 100755 --- a/cdr-exporter.pl +++ b/cdr-exporter.pl @@ -4,194 +4,54 @@ use strict; use warnings; use v5.14; -use Config::Simple; -use DBI; -use Digest::MD5; -use NGCP::CDR::Export; -use File::Temp; -use File::Copy; -use File::Path; - -my $debug = 1; -my $collid = "exporter"; - -# default config values -my $config = { - 'default.PREFIX' => 'ngcp', - 'default.VERSION' => '007', - 'default.SUFFIX' => 'cdr', - 'default.FILES_OWNER' => 'cdrexport', - 'default.FILES_GROUP' => 'cdrexport', - 'default.FILES_MASK' => '022', - 'default.TRANSFER_TYPE' => "none", - 'default.TRANSFER_PORT' => 22, - 'default.TRANSFER_USER' => "cdrexport", - 'default.TRANSFER_REMOTE' => "/home/jail/home/cdrexport" -}; - -sub DEBUG { - say join (' ', @_); -} +use NGCP::CDR::Exporter; -my @config_paths = (qw# - /etc/ngcp-cdr-exporter/ - . -#); -my $cf = 'cdr-exporter.conf'; -my $config_file; -foreach my $cp(@config_paths) { - if(-f "$cp/$cf") { - $config_file = "$cp/$cf"; - last; - } -} -die "Config file $cf not found in path " . (join " or ", @config_paths) . "\n" - unless $config_file; -Config::Simple->import_from("$config_file" , \%{$config}) or - die "Couldn't open the configuration file '$config_file'.\n"; - -die "Invalid destination directory '".$config->{'default.CDRDIR'}."'\n" - unless(-d $config->{'default.CDRDIR'}); - -my $now = time(); -my @now = localtime($now); -my $file_ts = NGCP::CDR::Export::get_ts_for_filename(\@now); - -my @admin_fields = (); -foreach my $f(@{$config->{'default.ADMIN_EXPORT_FIELDS'}}) { - $f =~ s/^#.+//; next unless($f); - $f =~ s/^\'//; $f =~ s/\'$//; - push @admin_fields, $f; -} -my @reseller_fields = (); -foreach my $f(@{$config->{'default.RESELLER_EXPORT_FIELDS'}}) { - $f =~ s/^#.+//; next unless($f); - $f =~ s/^\'//; $f =~ s/\'$//; - push @reseller_fields, $f; -} +# $NGCP::CDR::Exporter::debug = 1; +# my $collid = "exporter"; -my @joins = (); -foreach my $f(@{$config->{'default.EXPORT_JOINS'}}) { - $f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/; - my ($a, $b) = split('\s*=>\s*{\s*', $f); - $a =~ s/^\s*\'//; $a =~ s/\'$//g; - $b =~ s/\s*\}\s*$//; - my ($c, $d) = split('\s*=>\s*', $b); - $c =~ s/^\s*\'//g; $c =~ s/\'\s*//; - $d =~ s/^\s*\'//g; $d =~ s/\'\s*//; - push @joins, { $a => { $c => $d } }; -} +NGCP::CDR::Exporter::get_config('exporter', 'cdr-exporter.conf'); -my @conditions = (); -foreach my $f(@{$config->{'default.EXPORT_CONDITIONS'}}) { - next unless($f); - $f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/; - my ($a, $b) = split('\s*=>\s*{\s*', $f); - $a =~ s/^\s*\'//; $a =~ s/\'$//g; - $b =~ s/\s*\}\s*$//; - - my ($c, $d) = split('\s*=>\s*', $b); - $c =~ s/^\s*\'//g; $c =~ s/\'\s*//; - $d =~ s/^\s*\'//g; $d =~ s/\'\s*//; - push @conditions, { $a => { $c => $d } }; -} -my @trailer = ( - { 'order by' => 'accounting.cdr.id' }, -); -my $dbh = DBI->connect("dbi:mysql:" . $config->{'default.DBDB'} . - ";host=".$config->{'default.DBHOST'}, - $config->{'default.DBUSER'}, $config->{'default.DBPASS'}) - or die "failed to connect to db: $DBI::errstr"; +print("+++ Start run with DB " . (confval('DBUSER') || "(undef)") . + "\@".confval('DBDB')." to ".confval('PREFIX')."\n"); -$dbh->{mysql_auto_reconnect} = 1; -$dbh->{AutoCommit} = 0; - -my @intjoins = (); -foreach my $f(@joins) { - my ($table, $keys) = %{ $f }; - my ($foreign_key, $own_key) = %{ $keys }; - push @intjoins, "left outer join $table on $foreign_key = $own_key"; -} -my @conds = (); -foreach my $f(@conditions) { - my ($field, $match) = %{ $f }; - my ($op, $val) = %{ $match }; - push @conds, "$field $op $val"; -} -my @trail = (); -foreach my $f(@trailer) { - my ($key, $val) = %{ $f }; - push @trail, "$key $val"; -} - -print("+++ Start run with DB " . ($config->{'default.DBUSER'} || "(undef)") . - "\@".$config->{'default.DBDB'}." to ".$config->{'default.PREFIX'}."\n"); - -# extract positions of reseller fields from admin fields -my @reseller_positions = (); -my %reseller_index; -@reseller_index{@admin_fields} = (0..$#admin_fields); -for(my $i = 0; $i < @reseller_fields; $i++) { - my $name = $reseller_fields[$i]; - unless(exists $reseller_index{$name}) { - die "Invalid RESELLER_EXPORT_FIELDS element '$name', not available in ADMIN_EXPORT_FIELDS!"; - } - push @reseller_positions, $reseller_index{$name}; -} # add fields we definitely need, will be removed during processing -unshift @admin_fields, qw/ +unshift @NGCP::CDR::Exporter::admin_fields, qw/ accounting.cdr.id accounting.cdr.source_user_id accounting.cdr.destination_user_id accounting.cdr.source_provider_id accounting.cdr.destination_provider_id /; +my @trailer = ( + { 'order by' => 'accounting.cdr.id' }, +); -my $q = "select " . - join(", ", @admin_fields) . " from accounting.cdr " . - join(" ", @intjoins) . " " . - "where " . join(" and ", @conds) . " " . - join(" ", @trail); -#DEBUG $q if $debug; +NGCP::CDR::Exporter::prepare_dbh(\@trailer, 'accounting.cdr'); -my $tempfh = File::Temp->newdir(undef, CLEANUP => 1); -my $tempdir = $tempfh->dirname; -my $sth = $dbh->prepare($q); -$sth->execute(); +NGCP::CDR::Exporter::prepare_output(); -my $written = 0; -my @ignored_ids = (); -my $reseller_names = {}; -my $reseller_ids = {}; -my $reseller_lines = {}; + +NGCP::CDR::Exporter::run(\&callback); + + + + +my @ignored_ids = (); my @ids; -my %mark; - -my $full_name = (defined $config->{'default.FULL_NAMES'} && $config->{'default.FULL_NAMES'} eq "yes" ? 1 : 0); -my $monthly_dir = (defined $config->{'default.MONTHLY_DIR'} && $config->{'default.MONTHLY_DIR'} eq "yes" ? 1 : 0); -my $daily_dir = (defined $config->{'default.DAILY_DIR'} && $config->{'default.DAILY_DIR'} eq "yes" ? 1 : 0); -my $dname = ""; -if($monthly_dir && !$daily_dir) { - $dname .= sprintf("%04i%02i", $now[5] + 1900, $now[4] + 1); - $full_name or $file_ts = sprintf("%02i%02i%02i%02i", @now[3,2,1,0]); -} elsif(!$monthly_dir && $daily_dir) { - $dname .= sprintf("%04i%02i%02i", $now[5] + 1900, $now[4] + 1, $now[3]); - $full_name or $file_ts = sprintf("%02i%02i%02i", @now[2,1,0]); -} elsif($monthly_dir && $daily_dir) { - $dname .= sprintf("%04i%02i/%02i", $now[5] + 1900, $now[4] + 1, $now[3]); - $full_name or $file_ts = sprintf("%02i%02i%02i", @now[2,1,0]); -} -while(my $row = $sth->fetchrow_arrayref) { + +sub callback { + my ($row, $res_row) = @_; + my @fields = @{ $row }; my $id = shift @fields; my $src_uuid = shift @fields; @@ -200,37 +60,25 @@ while(my $row = $sth->fetchrow_arrayref) { my $dst_provid = shift @fields; @fields = map { defined $_ ? "'$_'" : "''" } (@fields); - if($config->{'default.EXPORT_INCOMING'} eq "no" && $src_uuid eq "0") { + if(confval('EXPORT_INCOMING') eq "no" && $src_uuid eq "0") { push @ignored_ids, $id; next; } my $line = join ",", @fields; - push(@{$reseller_lines->{'system'}}, $line); - write_wrap('system'); + write_reseller('system', $line); push(@ids, $id); - my @reseller_fields = @fields[@reseller_positions]; - my $reseller_line = join ",", @reseller_fields; + my $reseller_line = join ",", @$res_row; if($src_uuid ne "0") { - if(!exists $reseller_names->{$src_provid}) { - $reseller_names->{$src_provid} = NGCP::CDR::Export::get_reseller_name($dbh, $src_provid); - $reseller_ids->{$reseller_names->{$src_provid}} = $src_provid; - } - push(@{$reseller_lines->{$reseller_names->{$src_provid}}}, $reseller_line); - write_wrap($reseller_names->{$src_provid}); + write_reseller_id($src_provid, $reseller_line); } if($dst_uuid ne "0") { - if($config->{'default.EXPORT_INCOMING'} eq "no" && $src_provid ne $dst_provid) { + if(confval('EXPORT_INCOMING') eq "no" && $src_provid ne $dst_provid) { # don't store incoming call to this reseller } else { - if(!exists $reseller_names->{$dst_provid}) { - $reseller_names->{$dst_provid} = NGCP::CDR::Export::get_reseller_name($dbh, $dst_provid); - $reseller_ids->{$reseller_names->{$dst_provid}} = $dst_provid; - } - push(@{$reseller_lines->{$reseller_names->{$dst_provid}}}, $reseller_line); - write_wrap($reseller_names->{$dst_provid}); + write_reseller_id($dst_provid, $reseller_line); } } } @@ -238,110 +86,15 @@ while(my $row = $sth->fetchrow_arrayref) { #DEBUG "ignoring cdr ids " . (join ",", @ignored_ids); -my @resellers = keys %$reseller_lines; -for my $reseller (@resellers) { - write_wrap($reseller, 1); -} -# we write empty cdrs for resellers which didn't have a call during this -# export run, so get them into the list -my $missing_resellers = NGCP::CDR::Export::get_missing_resellers($dbh, [ keys %$reseller_names ]); -for(my $i = 0; $i < @{ $missing_resellers->{names} }; ++$i) { - my $name = $missing_resellers->{names}->[$i]; - my $id = $missing_resellers->{ids}->[$i]; - push @resellers, $name; - $reseller_ids->{$name} = $id; - $reseller_names->{$id} = $name; - write_wrap($name, 2); -} - -sub write_wrap { - my ($reseller, $force) = @_; - $force //= 0; - $reseller_lines->{$reseller} //= []; - my $vals = $reseller_lines->{$reseller}; - my $rec_idx = @$vals; - my $max = $config->{'default.MAX_ROWS_PER_FILE'} // $rec_idx; - ($force == 0 && $rec_idx < $max) and return; - ($force == 1 && $rec_idx == 0) and return; - my $reseller_contract_id = ""; - my $mark_query = undef; - unless($reseller eq "system") { - $reseller_contract_id = "-".$reseller_ids->{$reseller}; - $mark_query = [ $reseller_ids->{$reseller} ]; - } - if (!defined($mark{"lastseq".$reseller_contract_id})) { - my $tmpmark = NGCP::CDR::Export::get_mark($dbh, $collid, $mark_query); - %mark = ( %mark, %$tmpmark ); - $mark{"lastseq".$reseller_contract_id} //= 0; - } - my $file_idx = $mark{"lastseq".$reseller_contract_id} // 0; - my $reseller_dname = $reseller . "/" . $dname; - if($reseller ne "system") { - $reseller_dname = "resellers/$reseller_dname"; - } - my $reseller_tempdir = $tempdir . "/" . $reseller_dname; - do { - my $recs = ($rec_idx > $max) ? $max : $rec_idx; +NGCP::CDR::Exporter::finish(); - $file_idx++; - my @filevals = @$vals[0 .. $recs-1]; - @$vals = @$vals[$recs .. @$vals-1]; # modified $reseller_lines - my $err; - -d $reseller_tempdir || File::Path::make_path($reseller_tempdir, {error => \$err}); - if(defined $err && @$err) { - DEBUG "!!! failed to create directory $reseller_tempdir: " . Dumper $err; - } - NGCP::CDR::Export::write_file( - \@filevals, $reseller_tempdir, $config->{'default.PREFIX'}, - $config->{'default.VERSION'}, $file_ts, $file_idx, $config->{'default.SUFFIX'}, - ); - $rec_idx -= $recs; - - } while($rec_idx > 0); - - opendir(my $fh, $reseller_tempdir); - foreach my $file(readdir($fh)) { - my $src = "$reseller_tempdir/$file"; - my $dst = $config->{'default.CDRDIR'} . "/$reseller_dname/$file"; - if(-f $src) { - DEBUG "### moving $src to $dst\n"; - my $err; - -d $config->{'default.CDRDIR'} . "/$reseller_dname" || - File::Path::make_path($config->{'default.CDRDIR'} . "/$reseller_dname", {error => \$err}); - if(defined $err && @$err) { - DEBUG "!!! failed to create directory $reseller_dname: " . Dumper $err; - } - unless(copy($src, $dst)) { - DEBUG "!!! failed to move $src to $dst: $!\n"; - } else { - DEBUG "### successfully moved $src to final destination $dst\n"; - } - NGCP::CDR::Export::chownmod($dst, $config->{'default.FILES_OWNER'}, - $config->{'default.FILES_GROUP'}, oct(666), - $config->{'default.FILES_MASK'}); - if($config->{'default.TRANSFER_TYPE'} eq "sftp") { - NGCP::CDR::Transfer::sftp( - $dst, $config->{'default.TRANSFER_HOST'}, - $config->{'default.TRANSFER_PORT'}, - $config->{'default.TRANSFER_REMOTE'}, - $config->{'default.TRANSFER_USER'}, - $config->{'default.TRANSFER_PASS'}, - ); - } - } - } - $mark{"lastseq".$reseller_contract_id} = $file_idx; - NGCP::CDR::Export::set_mark($dbh, $collid, { "lastseq$reseller_contract_id" => $file_idx }); - close($fh); -} -NGCP::CDR::Export::update_export_status($dbh, "accounting.cdr", \@ids, "ok"); +update_export_status("accounting.cdr", \@ids, "ok"); # TODO: should be tagged as ignored/skipped/whatever -NGCP::CDR::Export::update_export_status($dbh, "accounting.cdr", \@ignored_ids, "ok"); - -$dbh->commit or die("failed to commit db changes: " . $dbh->errstr); +update_export_status("accounting.cdr", \@ignored_ids, "ok"); +NGCP::CDR::Exporter::commit(); diff --git a/event-exporter.conf b/event-exporter.conf index 2df6764..c96751c 100644 --- a/event-exporter.conf +++ b/event-exporter.conf @@ -3,8 +3,8 @@ DBUSER=exporter DBPASS=exporter DBDB=accounting -#EDRDIR=/home/jail/home/cdrexport -EDRDIR=/tmp +#DESTDIR=/home/jail/home/cdrexport +DESTDIR=/tmp # comment out to export all into one file per run MAX_ROWS_PER_FILE=5000 @@ -31,7 +31,9 @@ TRANSFER_PASS=export!!! TRANSFER_KEY=/path/to/priv-key TRANSFER_REMOTE=/home/jail/home/cdrexport -EXPORT_FIELDS = 'accounting.events.id', 'accounting.events.type', 'billing.contracts.external_id', 'billing.contacts.company', 'billing.voip_subscribers.external_id', '(select username from provisioning.voip_dbaliases tmp where tmp.subscriber_id = provisioning.voip_subscribers.id order by is_primary, id limit 1)', #'accounting.events.old_status', 'old_profile.name', #'accounting.events.new_status', 'new_profile.name', 'from_unixtime(accounting.events.timestamp)' +ADMIN_EXPORT_FIELDS = 'accounting.events.id', 'accounting.events.type', 'billing.contracts.external_id', 'billing.contacts.company', 'billing.voip_subscribers.external_id', '(select username from provisioning.voip_dbaliases tmp where tmp.subscriber_id = provisioning.voip_subscribers.id order by is_primary, id limit 1)', #'accounting.events.old_status', 'old_profile.name', #'accounting.events.new_status', 'new_profile.name', 'from_unixtime(accounting.events.timestamp)', 'accounting.events.reseller_id' + +RESELLER_EXPORT_FIELDS = 'accounting.events.id', 'accounting.events.type', 'billing.contracts.external_id', 'billing.contacts.company', 'billing.voip_subscribers.external_id', '(select username from provisioning.voip_dbaliases tmp where tmp.subscriber_id = provisioning.voip_subscribers.id order by is_primary, id limit 1)', #'accounting.events.old_status', 'old_profile.name', #'accounting.events.new_status', 'new_profile.name', 'from_unixtime(accounting.events.timestamp)' EXPORT_JOINS = { 'billing.voip_subscribers' => { 'billing.voip_subscribers.id' => 'accounting.events.subscriber_id' } }, { 'billing.contracts' => { 'billing.contracts.id' => 'billing.voip_subscribers.contract_id' } }, { 'billing.contacts' => { 'billing.contacts.id' => 'billing.contracts.contact_id' } }, { 'provisioning.voip_subscribers' => { 'provisioning.voip_subscribers.uuid' => 'billing.voip_subscribers.uuid' } }, { 'provisioning.voip_subscriber_profiles as old_profile' => { 'old_profile.id' => 'accounting.events.old_status' } }, { 'provisioning.voip_subscriber_profiles as new_profile' => { 'new_profile.id' => 'accounting.events.new_status' } } diff --git a/event-exporter.pl b/event-exporter.pl index eddf7b6..38483f4 100755 --- a/event-exporter.pl +++ b/event-exporter.pl @@ -1,169 +1,63 @@ -#!/usr/bin/perl -w +#!/usr/bin/perl + use strict; +use warnings; use v5.14; -use Config::Simple; -use DBI; -use File::Temp; -use File::Copy; -use NGCP::CDR::Export; -use NGCP::CDR::Transfer; -use Data::Dumper; - -my $collid = "eventexporter"; -my $debug = 0; -# default config values -my $config = { - 'default.FILTER_FLAPPING' => 0, - 'default.MERGE_UPDATE' => 0, - 'default.PREFIX' => 'sipwise', - 'default.VERSION' => '001', - 'default.SUFFIX' => 'edr', - 'default.FILES_OWNER' => 'cdrexport', - 'default.FILES_GROUP' => 'cdrexport', - 'default.FILES_MASK' => '022', - 'default.TRANSFER_TYPE' => "none", - 'default.TRANSFER_PORT' => 22, - 'default.TRANSFER_USER' => "cdrexport", - 'default.TRANSFER_KEY' => "/root/.ssh/id_rsa", - 'default.TRANSFER_REMOTE' => "/home/jail/home/cdrexport" -}; - -sub DEBUG { - say join (' ', @_); -} - -my @config_paths = (qw# - /etc/ngcp-cdr-exporter/ - . -#); - - -my $cf = 'event-exporter.conf'; -my $config_file; -foreach my $cp(@config_paths) { - if(-f "$cp/$cf") { - $config_file = "$cp/$cf"; - last; - } -} -die "Config file $cf not found in path " . (join " or ", @config_paths) . "\n" - unless $config_file; - -Config::Simple->import_from("$config_file" , \%{$config}) or - die "Couldn't open the configuration file '$config_file'.\n"; - -die "Invalid destination directory '".$config->{'default.EDRDIR'}."'\n" - unless(-d $config->{'default.EDRDIR'}); - +use NGCP::CDR::Exporter; -my @fields = (); -foreach my $f(@{$config->{'default.EXPORT_FIELDS'}}) { - $f =~ s/^#.+//; next unless($f); - $f =~ s/^\'//; $f =~ s/\'$//; - push @fields, $f; -} +# $NGCP::CDR::Exporter::debug = 1; +# my $collid = "eventexporter"; -my @joins = (); -sub config2array { - my $config_key = shift; - return ('ARRAY' eq ref $config->{$config_key}) ? $config->{$config_key} : [$config->{$config_key}]; -} -foreach my $f( @{config2array('default.EXPORT_JOINS')} ) { - next unless($f); - $f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/; - my ($a, $b) = split('\s*=>\s*{\s*', $f); - $a =~ s/^\s*\'//; $a =~ s/\'$//g; - $b =~ s/\s*\}\s*$//; - - my ($c, $d) = split('\s*=>\s*', $b); - $c =~ s/^\s*\'//g; $c =~ s/\'\s*//; - $d =~ s/^\s*\'//g; $d =~ s/\'\s*//; - push @joins, { $a => { $c => $d } }; -} +# default config values overrides +my $config = { + 'PREFIX' => 'sipwise', + 'VERSION' => '001', + 'SUFFIX' => 'edr', +}; -my @conditions = (); -foreach my $f(@{config2array('default.EXPORT_CONDITIONS')}) { - next unless($f); - $f =~ s/^\s*\{?\s*//; $f =~ s/\}\s*\}\s*$/}/; - my ($a, $b) = split('\s*=>\s*{\s*', $f); - $a =~ s/^\s*\'//; $a =~ s/\'$//g; - $b =~ s/\s*\}\s*$//; - - my ($c, $d) = split('\s*=>\s*', $b); - $c =~ s/^\s*\'//g; $c =~ s/\'\s*//; - $d =~ s/^\s*\'//g; $d =~ s/\'\s*//; - push @conditions, { $a => { $c => $d } }; -} +NGCP::CDR::Exporter::get_config('eventexporter', 'event-exporter.conf', $config); -my $dbh = DBI->connect("dbi:mysql:" . $config->{'default.DBDB'} . - ";host=".$config->{'default.DBHOST'}, - $config->{'default.DBUSER'}, $config->{'default.DBPASS'}) - or die "failed to connect to db: $DBI::errstr"; -$dbh->{mysql_auto_reconnect} = 1; -$dbh->{AutoCommit} = 0; -my @trailer = ( - { 'order by' => 'accounting.events.id' }, -); - # make sure we always select id, subscriber_id, type, old and new; # if you change it, make sure to adapt slice in the loop too! -unshift @fields, (qw/ +unshift @NGCP::CDR::Exporter::admin_fields, (qw/ accounting.events.id accounting.events.subscriber_id accounting.events.reseller_id accounting.events.type accounting.events.old_status accounting.events.new_status /); +my @trailer = ( + { 'order by' => 'accounting.events.id' }, +); -my @intjoins = (); -foreach my $f(@joins) { - my ($table, $keys) = %{ $f }; - my ($foreign_key, $own_key) = %{ $keys }; - push @intjoins, "left outer join $table on $foreign_key = $own_key"; -} -my @conds = (); -foreach my $f(@conditions) { - my ($field, $match) = %{ $f }; - my ($op, $val) = %{ $match }; - push @conds, "$field $op $val"; -} -my @trail = (); -foreach my $f(@trailer) { - my ($key, $val) = %{ $f }; - push @trail, "$key $val"; -} - -my $file_ts = NGCP::CDR::Export::get_ts_for_filename; -my $mark = NGCP::CDR::Export::get_mark($dbh, $collid); - -my $q = "select " . - join(", ", @fields) . " from accounting.events " . - join(" ", @intjoins) . " " . - "where " . join(" and ", @conds) . " " . - join(" ", @trail); +NGCP::CDR::Exporter::prepare_dbh(\@trailer, 'accounting.events'); -DEBUG $q if $debug; -my $tempfh = File::Temp->newdir(undef, CLEANUP => 1); -my $tempdir = $tempfh->dirname; +NGCP::CDR::Exporter::prepare_output(); -my $sth = $dbh->prepare($q); -$sth->execute(); -my ($rec_idx, $file_idx) = (0, $mark->{lastseq}); -my $written = 0; my %lines = (); -my $rows = $sth->fetchall_arrayref(); +my %res_lines; my %filter = (); my @filter_ids = (); -while(my $row = shift @{ $rows }) { +NGCP::CDR::Exporter::run(\&callback); + + + + + +sub callback { + my ($row, $res_row) = @_; + my @head = @{ $row }[0 .. 5]; my ($id, $sub_id, $res_id, $type, $old, $new) = @head; my @fields = map { defined $_ ? "\"$_\"" : '""' } (@{ $row }[6 .. @{ $row }-1]); + my $line = join ",", @fields; + my $reseller_line = join ",", @$res_row; - if($config->{'default.FILTER_FLAPPING'}) { + if(confval('FILTER_FLAPPING')) { if($type =~ /^start_(.+)$/) { my $t = $1; my $k = "$sub_id;$t;$new"; @@ -172,10 +66,9 @@ while(my $row = shift @{ $rows }) { } else { push @{ $filter{$k} }, $id; } - my $line = join ",", @fields; $lines{$id} = $line; - $rec_idx++; - } elsif($config->{'default.MERGE_UPDATE'} && $type =~ /^update_(.+)$/) { + $res_id and $res_lines{$res_id}{$id} = $reseller_line; + } elsif(confval('MERGE_UPDATE') && $type =~ /^update_(.+)$/) { my $t = $1; my $k = "$sub_id;$t;$old"; my $ids = $filter{$k} // []; @@ -183,18 +76,19 @@ while(my $row = shift @{ $rows }) { my $old_id = pop @{ $ids }; say "... id $id is an update event of id $old_id, merge"; delete $lines{$old_id}; + $res_id and delete $res_lines{$res_id}{$old_id}; push @filter_ids, $old_id; - my $line = join ",", @fields; $line =~ s/\"update_/\"start_/; + $reseller_line =~ s/\"update_/\"start_/; $lines{$id} = $line; + $res_id and $res_lines{$res_id}{$id} = $reseller_line; delete $filter{$k}; $k = "$sub_id;$t;$new"; push @{ $ids }, ($old_id, $id); $filter{$k} = $ids; } else { - my $line = join ",", @fields; $lines{$id} = $line; - $rec_idx++; + $res_id and $res_lines{$res_id}{$id} = $reseller_line; } } elsif($type =~ /^(?:stop|end)_(.+)$/) { my $t = $1; @@ -205,79 +99,52 @@ while(my $row = shift @{ $rows }) { say "... id $id is an end event of id $old_id, filter"; push @filter_ids, ($id, $old_id); delete $lines{$old_id}; - $rec_idx--; + $res_id and delete $res_lines{$res_id}{$old_id}; $filter{$k} = $ids; } else { - my $line = join ",", @fields; $lines{$id} = $line; - $rec_idx++; + $res_id and $res_lines{$res_id}{$id} = $reseller_line; } } else { - my $line = join ",", @fields; $lines{$id} = $line; - $rec_idx++; + $res_id and $res_lines{$res_id}{$id} = $reseller_line; } } else { - my $line = join ",", @fields; $lines{$id} = $line; - $rec_idx++; + $res_id and $res_lines{$res_id}{$id} = $reseller_line; } } my @vals = map { $lines{$_} } sort { int($a) <=> int($b) } keys %lines; -my @ids = keys %lines; -my $max = $config->{'default.MAX_ROWS_PER_FILE'} // $rec_idx; -do { - my $recs = ($rec_idx > $max) ? $max : $rec_idx; - - $file_idx++; - my @filevals = @vals[0 .. $recs-1]; - @vals = @vals[$recs .. @vals-1]; - NGCP::CDR::Export::write_file( - \@filevals, $tempdir, $config->{'default.PREFIX'}, - $config->{'default.VERSION'}, $file_ts, $file_idx, $config->{'default.SUFFIX'}, - ); - $rec_idx -= $recs; - -} while($rec_idx > 0); - -NGCP::CDR::Export::update_export_status($dbh, "accounting.events", \@filter_ids, "filtered"); -NGCP::CDR::Export::update_export_status($dbh, "accounting.events", \@ids, "ok"); -NGCP::CDR::Export::set_mark($dbh, $collid, { lastseq => $file_idx }); - -$dbh->commit or die("failed to commit db changes: " . $dbh->errstr); - -opendir(my $fh, $tempdir); -foreach my $file(readdir($fh)) { - my $src = "$tempdir/$file"; - my $dst = $config->{'default.EDRDIR'}."/$file"; - if(-f $src) { - DEBUG "### moving $src to $dst\n"; - copy($src, $dst); - NGCP::CDR::Export::chownmod($dst, $config->{'default.FILES_OWNER'}, - $config->{'default.FILES_GROUP'}, oct(666), - $config->{'default.FILES_MASK'}); - if($config->{'default.TRANSFER_TYPE'} eq "sftp") { - NGCP::CDR::Transfer::sftp( - $dst, $config->{'default.TRANSFER_HOST'}, - $config->{'default.TRANSFER_PORT'}, - $config->{'default.TRANSFER_REMOTE'}, - $config->{'default.TRANSFER_USER'}, - $config->{'default.TRANSFER_PASS'}, - ); - } elsif($config->{'default.TRANSFER_TYPE'} eq "sftp-sh") { - NGCP::CDR::Transfer::sftp_sh( - $dst, $config->{'default.TRANSFER_HOST'}, - $config->{'default.TRANSFER_PORT'}, - $config->{'default.TRANSFER_REMOTE'}, - $config->{'default.TRANSFER_USER'}, - $config->{'default.TRANSFER_KEY'}, - ); - } - } +for my $val (@vals) { + write_reseller('system', $val); +} +for my $res (keys(%res_lines)) { + my $res_lines = $res_lines{$res}; + my @ids = values(%$res_lines); + @ids = sort {$a <=> $b} (@ids); + for my $id (@ids) { + my $val = $lines{$id}; + $val or next; + write_reseller_id($res, $val); + } } -close($fh); + + + + +NGCP::CDR::Exporter::finish(); + + + +my @ids = keys %lines; + +update_export_status("accounting.events", \@filter_ids, "filtered"); +update_export_status("accounting.events", \@ids, "ok"); + + +NGCP::CDR::Exporter::commit(); # vim: set tabstop=4 expandtab: