From 0da00356edaf49415dca452f2b31bb8ae31456e4 Mon Sep 17 00:00:00 2001 From: Andreas Granig Date: Thu, 3 Jul 2014 22:18:50 +0200 Subject: [PATCH] MT#7793 Add flapping filter. Works, but needs improvement to filter also over file splits. --- NGCP/CDR/Export.pm | 6 ++-- event-exporter.conf | 2 +- event-exporter.pl | 78 ++++++++++++++++++++++++++++++++++----------- 3 files changed, 63 insertions(+), 23 deletions(-) diff --git a/NGCP/CDR/Export.pm b/NGCP/CDR/Export.pm index ffe1f72..93630cb 100644 --- a/NGCP/CDR/Export.pm +++ b/NGCP/CDR/Export.pm @@ -31,11 +31,11 @@ sub set_mark { } sub update_export_status{ - my ($dbh, $tbl, $ids) = @_; + my ($dbh, $tbl, $ids, $status) = @_; return unless(@{ $ids }); - my $u = $dbh->prepare("update $tbl set export_status='ok', exported_at=now()" . + my $u = $dbh->prepare("update $tbl set export_status=?, exported_at=now()" . " where id in (" . join (',', map { '?' }(1 .. @{ $ids }) ) . ")"); - $u->execute(@{ $ids }) or die($dbh->errstr); + $u->execute($status, @{ $ids }) or die($dbh->errstr); } diff --git a/event-exporter.conf b/event-exporter.conf index e14042a..4012569 100644 --- a/event-exporter.conf +++ b/event-exporter.conf @@ -7,7 +7,7 @@ DBDB=accounting EDRDIR=/tmp # comment out to export all into one file per run -MAX_ROWS_PER_FILE=5000 +#MAX_ROWS_PER_FILE=5000 # if 1, don't export events which are switched on and off again # during an export interval diff --git a/event-exporter.pl b/event-exporter.pl index bdac221..78fef2f 100755 --- a/event-exporter.pl +++ b/event-exporter.pl @@ -121,9 +121,12 @@ my @trailer = ( { 'order by' => 'accounting.events.id' }, ); -unless($fields[0] eq "accounting.events.id") { - die "First field must always be 'accounting.events.id'\n"; -} +# make sure we always select id, subscriber_id, type, old and new; +# if you change it, make sure to adapt slice in the loop too! +unshift @fields, (qw/ + accounting.events.id accounting.events.subscriber_id accounting.events.type + accounting.events.old_status accounting.events.new_status +/); my @intjoins = (); foreach my $f(@joins) { @@ -161,45 +164,82 @@ my $sth = $dbh->prepare($q); $sth->execute(); my ($rec_idx, $file_idx) = (0, $mark->{lastseq}); -my @lines = (); -my @ids = (); +my $written = 0; +my %lines = (); my $rows = $sth->fetchall_arrayref(); my %filter = (); +my @filter_ids = (); while(my $row = shift @{ $rows }) { - my @fields = map { defined $_ ? "\"$_\"" : '""' } @{ $row }; + my @head = @{ $row }[0 .. 4]; + my ($id, $sub_id, $type, $old, $new) = @head; + my @fields = map { defined $_ ? "\"$_\"" : '""' } @{ $row }[5,-1]; + # this only works if events are not spread across export files; + # we might gather infos first, then export in chunks later to make it work if($FILTER_FLAPPING) { - # TODO: how to figure out with our dynamic query which subscriber - # did which action and whether the old/new status is ok? + if($type =~ /^start_(.+)$/) { + my $t = $1; + my $k = "$sub_id;$t;$new"; + unless(exists $filter{$k}) { + $filter{$k} = { id => $id, c => 1 }; + } else { + $filter{$k}->{c}++; + } + my $line = join ",", @fields; + $lines{$id} = $line; + $rec_idx++; + } elsif($type =~ /^end_(.+)$/) { + my $t = $1; + my $k = "$sub_id;$t;$old"; + my $entry = $filter{$k}; + if(defined $filter{$k} && $filter{$k}->{c} > 0) { + say "... id $id is an end event of id " . $filter{$k}->{id} . ", filter"; + push @filter_ids, ($id, $filter{$k}->{id}); + delete $lines{$filter{$k}->{id}}; + $rec_idx--; + $filter{$k}->{c}--; + } else { + my $line = join ",", @fields; + $lines{$id} = $line; + $rec_idx++; + } + } else { + my $line = join ",", @fields; + $lines{$id} = $line; + $rec_idx++; + } + } else { + my $line = join ",", @fields; + $lines{$id} = $line; + $rec_idx++; } - - my $line = join ",", @fields; - push @lines, $line; - - $rec_idx++; - if(($MAX_ROWS_PER_FILE && $rec_idx >= $MAX_ROWS_PER_FILE) || @{ $rows } == 0) { + say "--- rec_idx=$rec_idx, row count=".@{ $rows }; $rec_idx = 0; $file_idx++; + my @vals = map { $lines{$_} } sort { int($a) <=> int($b) } keys %lines; + my @ids = keys %lines; NGCP::CDR::Export::write_file( - \@lines, $tempdir, $PREFIX, $VERSION, $file_ts, $file_idx, $SUFFIX, + \@vals, $tempdir, $PREFIX, $VERSION, $file_ts, $file_idx, $SUFFIX, ); - @lines = (); + NGCP::CDR::Export::update_export_status($dbh, "accounting.events", \@filter_ids, "filtered"); + NGCP::CDR::Export::update_export_status($dbh, "accounting.events", \@ids, "ok"); + %lines = (); + $written = 1; # make sure to not write another empty file } - push @ids, $row->[0]; } # write empty file in case of no records -unless(@ids) { +unless($written) { $file_idx++; + my @lines = (); NGCP::CDR::Export::write_file( \@lines, $tempdir, $PREFIX, $VERSION, $file_ts, $file_idx, $SUFFIX, ); } -NGCP::CDR::Export::update_export_status($dbh, "accounting.events", \@ids); NGCP::CDR::Export::set_mark($dbh, $collid, { lastseq => $file_idx }); $dbh->commit or die("failed to commit db changes: " . $dbh->errstr);