TT#102100 export field transformations

support perl code snippets (in config.yml) to
transform values of cdr export fields.

Change-Id: I42eeae538d11c8c75cdd8f6a3f91fc22ae80f34c
mr9.2
Rene Krenn 4 years ago
parent baba02c198
commit 3e9eb3a148

@ -17,7 +17,7 @@ sub get_mark {
push @ids, "lastseq-$id";
}
for my $mk(@ids) {
$s->execute("$name-$mk") or die($dbh->errstr);
$s->execute("$name-$mk") or die($dbh->errstr . "\n");
my $r = $s->fetch;
$marks{$mk} = ($r && $r->[0]) ? $r->[0] : 0;
}
@ -30,7 +30,7 @@ sub set_mark {
my $i = $dbh->prepare("insert into accounting.mark (collector, acc_id) values(?,?)");
my $u = $dbh->prepare("update accounting.mark set acc_id = ? where collector = ?");
for my $mk (keys %{ $mark }) {
$s->execute("$name-$mk") or die($dbh->errstr);
$s->execute("$name-$mk") or die($dbh->errstr . "\n");
my $r = $s->fetch;
if($r && defined $r->[0]) {
$u->execute($mark->{$mk}, "$name-$mk");
@ -46,7 +46,7 @@ sub update_export_status {
while (my @chunk = splice @$ids, 0, 10000) {
my $sth = $dbh->prepare("update $tbl set export_status=?, exported_at=now()" .
" where id in (" . substr(',?' x scalar @chunk,1) . ")");
$sth->execute($status, @chunk) or die($dbh->errstr);
$sth->execute($status, @chunk) or die($dbh->errstr . "\n");
$sth->finish();
}
}
@ -60,7 +60,7 @@ sub upsert_export_status {
"join (select * from accounting.cdr_export_status where type = \"$stream\") as _cesc " .
"where _cdr.id in (" . substr(',?' x scalar @chunk,1) . ") " .
"on duplicate key update export_status = \"$status\", exported_at = now()");
$sth->execute(@chunk) or die($dbh->errstr);
$sth->execute(@chunk) or die($dbh->errstr . "\n");
$sth->finish();
}
}
@ -133,7 +133,7 @@ sub write_file {
my $fn = sprintf('%s/%s_%s_%s_%010i.%s', $dircomp, $prefix, $version, $ts, $lastseq, $suffix);
my $tfn = sprintf('%s/%s_%s_%s_%010i.%s.'.$$, $dircomp, $prefix, $version, $ts, $lastseq, $suffix);
my $fd;
open($fd, ">", $tfn) or die("failed to open tmp-file $tfn ($!), stop");
open($fd, ">", $tfn) or die("failed to open tmp-file $tfn ($!), stop\n");
my $ctx = Digest::MD5->new;
my $num = @{ $lines };
@ -179,10 +179,10 @@ sub write_file {
}
NGCP::CDR::Exporter::DEBUG("$num data lines written to $tfn, checksum is $md5\n");
close($fd) or die ("failed to close tmp-file $tfn ($!), stop");
close($fd) or die ("failed to close tmp-file $tfn ($!), stop\n");
undef($ctx);
rename($tfn, $fn) or die("failed to move tmp-file $tfn to $fn ($!), stop");
rename($tfn, $fn) or die("failed to move tmp-file $tfn to $fn ($!), stop\n");
NGCP::CDR::Exporter::DEBUG("successfully moved $tfn to $fn\n");
}

@ -15,6 +15,7 @@ use NGCP::CDR::Transfer;
use Data::Dumper;
use Sys::Syslog;
use Proc::ProcessTable qw();
use MIME::Base64 qw(decode_base64);
BEGIN {
require Exporter;
@ -45,8 +46,11 @@ my $exporter_type = "exporter";
my $last_admin_field;
our @admin_fields;
our @admin_field_transformations;
our @reseller_fields;
our @reseller_field_transformations;
our @data_fields;
our @data_field_transformations;
my @joins;
my @conditions;
my $dbh;
@ -128,16 +132,102 @@ sub ERR {
}
my @config_paths = (qw#
/home/rkrenn/temp/cdrexporttransformations/
/etc/ngcp-cdr-exporter/
.
#);
#/home/rkrenn/temp/cdrexportstreams/
sub config2array {
my $config_key = shift;
my ($config_key,$serialized) = @_;
my $val = confval($config_key);
ref($val) eq 'ARRAY' and return @$val;
if ('ARRAY' eq ref($val)) {
return @$val;
} elsif ($serialized) {
my $decoded = decode_base64($val);
die("invalid config value '$val': " . $@ . "\n") if $@;
#$decoded =~ s{\A\$VAR\d+\s*=\s*}{};
## no critic (BuiltinFunctions::ProhibitStringyEval)
$val = eval($decoded);
die("invalid config value '$decoded': " . $@ . "\n") if $@;
if ('ARRAY' eq ref($val)) {
return @$val;
} elsif ('HASH' eq ref($val)) {
return %$val;
} elsif (ref($val)) {
die("'$decoded' is " . ref($val) . "\n");
}
}
return $val;
}
sub transform_value {
my ($value,$transformation,$context) = @_;
if ($transformation) {
eval {
$value = $transformation->{sub}->($value,$context);
};
if ($@) {
warn("error transforming [$transformation->{name}] value '$value': " . $@ . "\n");
}
}
return $value;
}
sub create_transformation_context {
my ($row,$out,$static_context) = @_;
my $context = {
row => $row,
out => $out,
dbh => $dbh,
static => $static_context,
};
return $context;
}
sub get_export_fields {
my ($name,$transformations) = @_;
my @ret;
foreach my $f (config2array($name,1)) {
$f or next;
if ('HASH' eq ref $f) {
my $sql = $f->{sql};
$sql =~ s/^#.+//;
next unless($sql);
$sql =~ s/^\'//;
$sql =~ s/\'$//;
push @ret, $sql;
if ($transformations) {
if ($f->{code}) {
## no critic (BuiltinFunctions::ProhibitStringyEval)
my $sub = eval($f->{code});
die("[$f->{name}] code: " . $@ . "\n") if $@;
die("[$f->{name}] code: not a function\n") unless 'CODE' eq ref $sub;
push(@$transformations,{
sub => $sub,
code => $f->{code},
name => $f->{name},
});
} else {
push(@$transformations,undef);
}
}
} else {
$f =~ s/^#.+//;
next unless($f);
$f =~ s/^\'//;
$f =~ s/\'$//;
push @ret, $f;
if ($transformations) {
push(@$transformations,undef);
}
}
}
return @ret;
}
sub get_config_fields {
@ -238,18 +328,22 @@ sub prepare_config {
}
#test overrides:
#$config{$stream . '.DBHOST'} = '192.168.0.29';
#$config{$stream . '.DBUSER'} = 'root';
#$config{$stream . '.DBPASS'} = '';
#$config{$stream . '.TRANSFER_REMOTE'} = "/home/rkrenn/temp/cdrexportstreams/cdrexport";
#$config{$stream . '.DESTDIR'} = "/home/rkrenn/temp/cdrexportstreams/cdrexport";
$config{$stream . '.DBHOST'} = '192.168.0.180';
$config{$stream . '.DBUSER'} = 'root';
$config{$stream . '.DBPASS'} = '';
$config{$stream . '.TRANSFER_REMOTE'} = "/home/rkrenn/temp/cdrexporttransformations/cdrexport";
$config{$stream . '.DESTDIR'} = "/home/rkrenn/temp/cdrexporttransformations/cdrexport";
$config{$stream . '.EXPORT_CONDITIONS'} = "{ 'base_table.export_status' => { '=' => '\"unexported\"' } }";
die "Invalid destination directory '".$config{$stream . '.DESTDIR'}."'\n"
unless(-d $config{$stream . '.DESTDIR'});
@admin_fields = get_config_fields('ADMIN_EXPORT_FIELDS');
@reseller_fields = get_config_fields('RESELLER_EXPORT_FIELDS');
@data_fields = get_config_fields('DATA_FIELDS');
@admin_field_transformations = ();
@admin_fields = get_export_fields('ADMIN_EXPORT_FIELDS',\@admin_field_transformations);
@reseller_field_transformations = ();
@reseller_fields = get_export_fields('RESELLER_EXPORT_FIELDS',\@reseller_field_transformations);
@data_field_transformations = ();
@data_fields = get_export_fields('DATA_FIELDS',\@data_field_transformations);
@joins = ();
foreach my $f (get_config_fields('EXPORT_JOINS')) {
@ -276,6 +370,7 @@ sub prepare_config {
$d =~ s/^\s*\'//g; $d =~ s/\'\s*//;
push @conditions, { $a => { $c => $d } };
}
die "export conditions required\n" unless scalar @conditions;
%reseller_names = ();
%reseller_ids = ();
@ -373,7 +468,7 @@ sub build_query {
$dbh = DBI->connect("dbi:mysql:" . confval('DBDB') .
";host=".confval('DBHOST'),
confval('DBUSER'), confval('DBPASS'))
or die "failed to connect to db: $DBI::errstr";
or die("failed to connect to db: " . $DBI::errstr . "\n");
$dbh->{AutoCommit} = 0;
}
@ -601,7 +696,7 @@ sub get_dir_ts {
}
my $sth = $dbh->prepare('select second(start.ts),minute(start.ts),hour(start.ts),dayofmonth(start.ts),'.
'month(start.ts)-1,year(start.ts)-1900,dayofweek(start.ts)-1,dayofyear(start.ts)-1 from (' . $stmt . ') as start');
$sth->execute(@params); # or die($DBI::errstr);
$sth->execute(@params); # or die($DBI::errstr . "\n");
@now = $sth->fetchrow_array;
$sth->finish;
} else {
@ -637,13 +732,41 @@ sub run {
my $rec_in = 0;
my $sth = $dbh->prepare($q);
$sth->execute() or die "Query failed: " . $sth->errstr;
$sth->execute() or die("Query failed: " . $sth->errstr . "\n");
my $static_context = {};
while(my $row = $sth->fetchrow_arrayref) {
#print $rec_in ."\n";
$rec_in++;
my @admin_row = @$row[0 .. $last_admin_field];
my @res_row = @$row[@reseller_positions];
my @data_row = @$row[@data_positions];
my @admin_row = ();
my $context = create_transformation_context($row,\@admin_row,$static_context);
foreach my $i (0 .. $last_admin_field) {
$context->{i} = $i;
push(@admin_row,transform_value(
$row->[$i],
$admin_field_transformations[$#admin_row + 1],
$context,
));
}
my @res_row = ();
$context = create_transformation_context($row,\@res_row,$static_context);
foreach my $i (@reseller_positions) {
$context->{i} = $i;
push(@res_row,transform_value(
$row->[$i],
$reseller_field_transformations[$#res_row + 1],
$context,
));
}
my @data_row = ();
$context = create_transformation_context($row,\@data_row,$static_context);
foreach my $i (@data_positions) {
$context->{i} = $i;
push(@data_row,transform_value(
$row->[$i],
$data_field_transformations[$#data_row + 1],
$context,
));
}
$cb->(\@admin_row, \@res_row, \@data_row);
$dbh->ping() if ($rec_in % 10000 == 0);
}
@ -815,7 +938,7 @@ sub upsert_export_status {
sub commit {
ilog('info', 'Committing changes to database');
$dbh->commit or die("failed to commit db changes: " . $dbh->errstr);
$dbh->commit or die("failed to commit db changes: " . $dbh->errstr . "\n");
ilog('info', 'All done');
}

@ -7,7 +7,7 @@ use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::CDR::Exporter;
die("$0 already running") unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
die("$0 already running\n") unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
exit if scalar find_processes(qr/ngcp-cleanup-acc/);
my $stream_limit = 300000;
@ -26,13 +26,15 @@ foreach my $stream (NGCP::CDR::Exporter::import_config('cdr-exporter.conf')) {
(confval('DBUSER') || "(undef)") .
"\@".confval('DBDB')." to ".confval('DESTDIR')."\n");
# add fields we definitely need, will be removed during processing
unshift @NGCP::CDR::Exporter::admin_fields, qw/
my @discriminators = qw/
base_table.id
base_table.source_user_id
base_table.destination_user_id
base_table.source_provider_id
base_table.destination_provider_id
/;
unshift @NGCP::CDR::Exporter::admin_fields, @discriminators;
unshift @NGCP::CDR::Exporter::admin_field_transformations, ((undef) x scalar @discriminators);
@ignored_ids = ();
@ids = ();
@ -45,13 +47,13 @@ foreach my $stream (NGCP::CDR::Exporter::import_config('cdr-exporter.conf')) {
if ('default' ne $stream) {
my $stmt = "insert into accounting.cdr_export_status (id,type) values (null,?)" .
" on duplicate key update id = last_insert_id(id)";
$dbh->do($stmt, undef, $stream) or die "Failed to register stream '$stream'";
$dbh->do($stmt, undef, $stream) or die "Failed to register stream '$stream'\n";
my $export_status_id = $dbh->{'mysql_insertid'};
$stmt = "select coalesce(max(cdr_id),0) from accounting.cdr_export_status_data" .
" where status_id = ?";
my $sth = $dbh->prepare($stmt);
$sth->execute($export_status_id) or die "Failed to obtain last processed cdr id of stream '$stream'";
$sth->execute($export_status_id) or die "Failed to obtain last processed cdr id of stream '$stream'\n";
($last_cdr_id) = $sth->fetchrow_array();
$sth->finish();

@ -7,7 +7,7 @@ use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::CDR::Exporter;
die("$0 already running") unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
die("$0 already running\n") unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
exit if scalar find_processes(qr/ngcp-cleanup-acc/);
$NGCP::CDR::Export::reseller_id_col = 'id';
@ -28,14 +28,17 @@ NGCP::CDR::Exporter::DEBUG("+++ Start event export with DB " .
# make sure we always select id, subscriber_id, type, old and new;
# if you change it, make sure to adapt slice in the loop too!
unshift @NGCP::CDR::Exporter::admin_fields, (qw/
my @discriminators = qw/
base_table.id
base_table.subscriber_id
base_table.reseller_id
base_table.type
base_table.old_status
base_table.new_status
/);
/;
unshift @NGCP::CDR::Exporter::admin_fields, @discriminators;
unshift @NGCP::CDR::Exporter::admin_field_transformations, ((undef) x scalar @discriminators);
my @trailer = (
{ 'order by' => 'base_table.id' },
{ 'limit' => '3000' },

@ -7,7 +7,7 @@ use Fcntl qw(LOCK_EX LOCK_NB);
use NGCP::CDR::Exporter;
die("$0 already running") unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
die("$0 already running\n") unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet
exit if scalar find_processes(qr/ngcp-cleanup-acc/);
my $stream_limit = 50000;
@ -31,13 +31,15 @@ foreach my $stream (NGCP::CDR::Exporter::import_config('int-cdr-exporter.conf'))
(confval('DBUSER') || "(undef)") .
"\@".confval('DBDB')." to ".confval('DESTDIR')."\n");
# add fields we definitely need, will be removed during processing
unshift @NGCP::CDR::Exporter::admin_fields, qw/
my @discriminators = qw/
base_table.id
base_table.source_user_id
base_table.destination_user_id
base_table.source_provider_id
base_table.destination_provider_id
/;
unshift @NGCP::CDR::Exporter::admin_fields, @discriminators;
unshift @NGCP::CDR::Exporter::admin_field_transformations, ((undef) x scalar @discriminators);
@ignored_ids = ();
@ids = ();
@ -48,13 +50,13 @@ foreach my $stream (NGCP::CDR::Exporter::import_config('int-cdr-exporter.conf'))
if ('default' ne $stream) {
my $stmt = "insert into accounting.cdr_export_status (id,type) values (null,?)" .
" on duplicate key update id = last_insert_id(id)";
$dbh->do($stmt, undef, $stream) or die "Failed to register stream '$stream'";
$dbh->do($stmt, undef, $stream) or die "Failed to register stream '$stream'\n";
my $export_status_id = $dbh->{'mysql_insertid'};
$stmt = "select coalesce(max(cdr_id),0) from accounting.int_cdr_export_status_data" .
" where status_id = ?";
my $sth = $dbh->prepare($stmt);
$sth->execute($export_status_id) or die "Failed to obtain last processed cdr id of stream '$stream'";
$sth->execute($export_status_id) or die "Failed to obtain last processed cdr id of stream '$stream'\n";
($last_cdr_id) = $sth->fetchrow_array();
$sth->finish();

Loading…
Cancel
Save