MT#7793 Add flapping filter.

Works, but needs improvement to filter also over file splits.
mr3.4.1
Andreas Granig 12 years ago
parent d524b74582
commit 0da00356ed

@ -31,11 +31,11 @@ sub set_mark {
}
sub update_export_status{
my ($dbh, $tbl, $ids) = @_;
my ($dbh, $tbl, $ids, $status) = @_;
return unless(@{ $ids });
my $u = $dbh->prepare("update $tbl set export_status='ok', exported_at=now()" .
my $u = $dbh->prepare("update $tbl set export_status=?, exported_at=now()" .
" where id in (" . join (',', map { '?' }(1 .. @{ $ids }) ) . ")");
$u->execute(@{ $ids }) or die($dbh->errstr);
$u->execute($status, @{ $ids }) or die($dbh->errstr);
}

@ -7,7 +7,7 @@ DBDB=accounting
EDRDIR=/tmp
# comment out to export all into one file per run
MAX_ROWS_PER_FILE=5000
#MAX_ROWS_PER_FILE=5000
# if 1, don't export events which are switched on and off again
# during an export interval

@ -121,9 +121,12 @@ my @trailer = (
{ 'order by' => 'accounting.events.id' },
);
unless($fields[0] eq "accounting.events.id") {
die "First field must always be 'accounting.events.id'\n";
}
# make sure we always select id, subscriber_id, type, old and new;
# if you change it, make sure to adapt slice in the loop too!
unshift @fields, (qw/
accounting.events.id accounting.events.subscriber_id accounting.events.type
accounting.events.old_status accounting.events.new_status
/);
my @intjoins = ();
foreach my $f(@joins) {
@ -161,45 +164,82 @@ my $sth = $dbh->prepare($q);
$sth->execute();
my ($rec_idx, $file_idx) = (0, $mark->{lastseq});
my @lines = ();
my @ids = ();
my $written = 0;
my %lines = ();
my $rows = $sth->fetchall_arrayref();
my %filter = ();
my @filter_ids = ();
while(my $row = shift @{ $rows }) {
my @fields = map { defined $_ ? "\"$_\"" : '""' } @{ $row };
my @head = @{ $row }[0 .. 4];
my ($id, $sub_id, $type, $old, $new) = @head;
my @fields = map { defined $_ ? "\"$_\"" : '""' } @{ $row }[5,-1];
# this only works if events are not spread across export files;
# we might gather infos first, then export in chunks later to make it work
if($FILTER_FLAPPING) {
# TODO: how to figure out with our dynamic query which subscriber
# did which action and whether the old/new status is ok?
if($type =~ /^start_(.+)$/) {
my $t = $1;
my $k = "$sub_id;$t;$new";
unless(exists $filter{$k}) {
$filter{$k} = { id => $id, c => 1 };
} else {
$filter{$k}->{c}++;
}
my $line = join ",", @fields;
$lines{$id} = $line;
$rec_idx++;
} elsif($type =~ /^end_(.+)$/) {
my $t = $1;
my $k = "$sub_id;$t;$old";
my $entry = $filter{$k};
if(defined $filter{$k} && $filter{$k}->{c} > 0) {
say "... id $id is an end event of id " . $filter{$k}->{id} . ", filter";
push @filter_ids, ($id, $filter{$k}->{id});
delete $lines{$filter{$k}->{id}};
$rec_idx--;
$filter{$k}->{c}--;
} else {
my $line = join ",", @fields;
$lines{$id} = $line;
$rec_idx++;
}
} else {
my $line = join ",", @fields;
$lines{$id} = $line;
$rec_idx++;
}
} else {
my $line = join ",", @fields;
$lines{$id} = $line;
$rec_idx++;
}
my $line = join ",", @fields;
push @lines, $line;
$rec_idx++;
if(($MAX_ROWS_PER_FILE && $rec_idx >= $MAX_ROWS_PER_FILE) || @{ $rows } == 0) {
say "--- rec_idx=$rec_idx, row count=".@{ $rows };
$rec_idx = 0;
$file_idx++;
my @vals = map { $lines{$_} } sort { int($a) <=> int($b) } keys %lines;
my @ids = keys %lines;
NGCP::CDR::Export::write_file(
\@lines, $tempdir, $PREFIX, $VERSION, $file_ts, $file_idx, $SUFFIX,
\@vals, $tempdir, $PREFIX, $VERSION, $file_ts, $file_idx, $SUFFIX,
);
@lines = ();
NGCP::CDR::Export::update_export_status($dbh, "accounting.events", \@filter_ids, "filtered");
NGCP::CDR::Export::update_export_status($dbh, "accounting.events", \@ids, "ok");
%lines = ();
$written = 1; # make sure to not write another empty file
}
push @ids, $row->[0];
}
# write empty file in case of no records
unless(@ids) {
unless($written) {
$file_idx++;
my @lines = ();
NGCP::CDR::Export::write_file(
\@lines, $tempdir, $PREFIX, $VERSION, $file_ts, $file_idx, $SUFFIX,
);
}
NGCP::CDR::Export::update_export_status($dbh, "accounting.events", \@ids);
NGCP::CDR::Export::set_mark($dbh, $collid, { lastseq => $file_idx });
$dbh->commit or die("failed to commit db changes: " . $dbh->errstr);

Loading…
Cancel
Save