TT#104251 refactor paging queries for cdr tables

the implementation used queries based on "OR",
causing a perfromance problem as the query time
depends on the cardinality of the accounting.cdr
table.

Change-Id: I1dbb23cf8802179b89c44718915ca090a2b17b5e
(cherry picked from commit 04e8fb04e1)
mr8.1.1
Rene Krenn 5 years ago
parent 1d4eed775a
commit 49686ff810

@ -245,7 +245,7 @@ sub findby_callidprefix {
my @conditions = @{$conditions // []};
push(@conditions,{ $table . '.call_id' => { 'LIKE' => '?' } });
my $stmt = 'SELECT ' . join(',', map { $table . '.' . $db->columnidentifier($_); } @$expected_fieldnames) . ' ' .
_get_export_stmt($db,undef,$joins,\@conditions) .
_get_export_stmt($db,$joins,\@conditions) .
' ORDER BY LENGTH(' . $table . '.call_id' . ') ASC, ' . $table . '.start_time ASC';
my @params = ($call_id . '%');
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
@ -266,7 +266,7 @@ sub findby_callid {
my @conditions = @{$conditions // []};
push(@conditions,{ $table . '.call_id' => { '=' => '?' } });
my $stmt = 'SELECT ' . join(',', map { $table . '.' . $db->columnidentifier($_); } @$expected_fieldnames) . ' ' .
_get_export_stmt($db,undef,$joins,\@conditions) .
_get_export_stmt($db,$joins,\@conditions) .
' ORDER BY ' . $table . '.start_time ASC';
my @params = ($call_id);
my $rows = $xa_db->db_get_all_arrayref($stmt,@params);
@ -329,9 +329,31 @@ sub process_unexported {
NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = _get_export_stmt($db,$static_context,$joins,$conditions);
my $select_stmt;
my $count_stmt;
my $select_format = 'SELECT ' . $table . '.' . $db->columnidentifier('id') . ', ' . $table . '.' . $db->columnidentifier('call_id') . ' %s ORDER BY ' . $table . '.' . $db->columnidentifier('id');
my $count_format = 'SELECT COUNT(1) FROM (%s) AS __cnt';
if ($static_context) {
$static_context->{part} = 'A';
$select_stmt = sprintf('(' . $select_format . ')',_get_export_stmt_part($db,$static_context,$joins,$conditions));
$count_stmt = sprintf('(' . $count_format . ')',$db->paginate_sort_query('SELECT 1 ' . _get_export_stmt_part($db,$static_context,$joins,$conditions),0,$limit,undef));
$select_stmt .= ' UNION ALL ';
$count_stmt .= ' + ';
$static_context->{part} = 'B';
$select_stmt .= sprintf('(' . $select_format . ')',_get_export_stmt_part($db,$static_context,$joins,$conditions));
$count_stmt .= sprintf('(' . $count_format . ')',$db->paginate_sort_query('SELECT 1 ' . _get_export_stmt_part($db,$static_context,$joins,$conditions),0,$limit,undef));
if (defined $limit) {
$count_stmt = 'SELECT LEAST(' . $count_stmt . ', ' . $limit . ')';
} else {
$count_stmt = 'SELECT ' . $count_stmt;
}
} else {
$select_stmt = sprintf($select_format,_get_export_stmt_part($db,undef,$joins,$conditions));
$count_stmt = sprintf($count_format,$db->paginate_sort_query('SELECT 1 ' . _get_export_stmt_part($db,undef,$joins,$conditions),0,$limit,undef));
}
return process_table(
get_db => $get_db,
class => __PACKAGE__,
@ -354,14 +376,11 @@ sub process_unexported {
multithreading => $multithreading,
tableprocessing_threads => $numofthreads,
blocksize => $blocksize,
#select => $db->paginate_sort_query('SELECT ' . $table . '.* ' . $stmt,undef,undef,$sort),
select => 'SELECT ' . $table . '.' . $db->columnidentifier('id') . ', ' . $table . '.' . $db->columnidentifier('call_id') .
' ' . $stmt . ' ORDER BY ' . $table . '.' . $db->columnidentifier('id'),
selectcount => 'SELECT COUNT(1) FROM (' . $db->paginate_sort_query('SELECT 1 ' . $stmt,0,$limit,undef) . ') AS __cnt',
select => $select_stmt,
selectcount => $count_stmt,
);
}
sub process_fromto {
my %params = @_;
@ -423,9 +442,15 @@ sub process_fromto {
);
}
sub _get_export_stmt {
my ($db,$joins,$conditions) = @_;
return _get_export_stmt_part($db,undef,$joins,$conditions);
}
sub _get_export_stmt_part {
my ($db,$static_context,$joins,$conditions) = @_;
my $table = $db->tableidentifier($tablename);
@ -439,11 +464,21 @@ sub _get_export_stmt {
push @intjoins, "LEFT JOIN $table ON $foreign_key = $own_key";
}
}
my @conds = ();
if (defined $static_context and $static_context->{export_status_id}) {
push @intjoins, 'LEFT JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::gettablename()) . ' AS __cesd ON __cesd.cdr_id = ' . $table . '.id AND __cesd.status_id = ' . $static_context->{export_status_id};
push @conds, '(__cesd.export_status = "' . $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::UNEXPORTED . '" OR __cesd.export_status IS NULL)';
if (defined $static_context and $static_context->{export_status_id} and $static_context->{part}) {
unless (defined $static_context->{last_processed_cdr_id}) {
$static_context->{last_processed_cdr_id} = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::find_last_processed_cdrid($static_context->{export_status_id});
}
if ('b' eq lc($static_context->{part})) {
push @conds, $table . '.id > ' . $static_context->{last_processed_cdr_id};
} elsif ('a' eq lc($static_context->{part})) {
push @intjoins, 'LEFT JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::gettablename()) . ' AS __cesd ON __cesd.cdr_id = ' . $table . '.id AND __cesd.status_id = ' . $static_context->{export_status_id};
push @conds, $table . '.id <= ' . $static_context->{last_processed_cdr_id};
push @conds, '__cesd.export_status = "' . $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::UNEXPORTED . '"';
}
}
$stmt .= " " . join(" ", @intjoins) if (scalar @intjoins) > 0;
if (defined $conditions and (scalar @$conditions) > 0) {
foreach my $f (@$conditions) {

@ -34,6 +34,8 @@ our @EXPORT_OK = qw(
update_row
insert_row
upsert_row
find_last_processed_cdrid
update_export_status
@ -75,6 +77,30 @@ sub new {
}
sub find_last_processed_cdrid {
my ($status_id) = @_;
check_table();
my $db = &$get_db();
my $table = $db->tableidentifier($tablename);
my $stmt = 'SELECT COALESCE(MAX(cdr_id),0) FROM ' . $table;
my @params = ();
my @terms = ();
if (defined $status_id) {
push(@terms,$db->columnidentifier('status_id') . ' = ?');
push(@params,$status_id);
}
if ((scalar @terms) > 0) {
$stmt .= ' WHERE ' . join(' AND ',@terms);
}
return $db->db_get_value($stmt,@params);
}
sub update_row {
my ($xa_db,$data) = @_;

Loading…
Cancel
Save