TT#44404 datatables refactoring for large tables

while limiting the count queries is sufficient for basic
queries, a plethora of perfromance cosniderations arise
when running compound queries (queries with set operations)
against large tables. this is the particular case with the
call history UI.

+ multi-column search must not hit any un-indexed column.
  when limit is providied, it seems ok for the optimizer
  even when no appropriate multi-col index is present.
+ 'col LIKE "%xyz%"' must be turned to 'col LIKE "xyz%"'
  - otherwise and idnex is useless. if fulltextsearch is
  desired, this needs to be done separately, including all
  the pain of maintaining the keyword/substring index etc.

+ each partial query of a compound query must have a LIMIT
  clause. if not, the db tends to try building an intermediate
  table (which gets large -> slow). this is the only option when sorting,
  but it must be prevent if only displaying unsorted or filtered rows.
  BIx::Class::Helper::ResultSet::SetOperations generates malformend
  SQL when limiting the partial queries. Therefore, the generated
  SQL will therefore be parsed and patched.

+ considering the LIMIT for the partial queries, it is also
  mandatory to inject the filter/joins to each. this is done
  properly by ReseultSet::SetOperations; but needs to be done
  explicitly when building compound queries.

+ aggregation footers must be prevented when clipped - it wont
  show useful data (sum, count, ..) anyway.

+ prevent page query when count gives 0

+ prevent 'int_column = "12345678901"' search terms: literal must be
  numeric and not longer than the digits supported by the column (10)

+ implement * wildcard and strict_search

Change-Id: Ie256877d368747ad6bfe74f5b6ac5dae88be9e58
changes/41/23541/18
Rene Krenn 8 years ago
parent 9db48b4d3f
commit 42730fb359

@ -2619,22 +2619,25 @@ sub calllist_master :Chained('base') :PathPart('calls') :CaptureArgs(0) {
$c->stash->{callid} = decode_base64url($c->stash->{callid_enc});
}
my $call_cols = [
{ name => "id", title => $c->loc('#') },
{ name => "direction", search => 1, literal_sql => 'if(source_user_id = "'.$c->stash->{subscriber}->uuid.'", "outgoing", "incoming")' },
{ name => "source_user", search => 1, title => $c->loc('Caller') },
{ name => "destination_user", search => 1, title => $c->loc('Callee') },
# NO SEARCH FOR UNINDEXED COLUMNS !!!
{ name => "id", int_search => 1, title => $c->loc('#'), },
{ name => "direction", search => 0, literal_sql => 'if(source_user_id = "'.$c->stash->{subscriber}->uuid.'", "outgoing", "incoming")', },
#{ name => "source_user", strict_search => 1, 'no_column' => 1, },
{ name => "source_cli", strict_search => 1, title => $c->loc('Caller'), },
#{ name => "destination_user", strict_search => 1, 'no_column' => 1, },
{ name => "destination_user_in", strict_search => 1, title => $c->loc('Callee'), },
{ name => "clir", search => 0, title => $c->loc('CLIR') },
{ name => "source_customer_billing_zones_history.detail", search => 1, title => $c->loc('Billing zone') },
{ name => "call_status", search => 1, title => $c->loc('Status') },
{ name => "source_customer_billing_zones_history.detail", search => 0, title => $c->loc('Billing zone'), }, #index required...
{ name => "call_status", search => 0, title => $c->loc('Status') },
{ name => "start_time", search_from_epoch => 1, search_to_epoch => 1, title => $c->loc('Start Time') },
{ name => "duration", search => 1, title => $c->loc('Duration'), show_total => 'sum' },
{ name => "duration", search => 0, title => $c->loc('Duration'), show_total => 'sum' },
{ name => "cdr_mos_data.mos_average", search => 0, title => $c->loc('MOS avg') },
{ name => "cdr_mos_data.mos_average_packetloss", search => 0, title => $c->loc('MOS packetloss') },
{ name => "cdr_mos_data.mos_average_jitter", search => 0, title => $c->loc('MOS jitter') },
{ name => "cdr_mos_data.mos_average_roundtrip", search => 0, title => $c->loc('MOS roundtrip') },
];
push @{ $call_cols }, (
{ name => "call_id", search => 1, title => $c->loc('Call-ID') },
{ name => "call_id", strict_search => 1, title => $c->loc('Call-ID'), },
) if($c->user->roles eq "admin" || $c->user->roles eq "reseller");
my $vat_factor = $c->config->{appearance}{cdr_apply_vat} && $c->stash->{subscriber}->contract->add_vat
@ -2643,7 +2646,7 @@ sub calllist_master :Chained('base') :PathPart('calls') :CaptureArgs(0) {
$c->log->debug("using vat_factor '$vat_factor'");
push @{ $call_cols }, (
{ name => "total_customer_cost", search => 1, title => $c->loc('Cost'), show_total => 'sum',
{ name => "total_customer_cost", search => 0, title => $c->loc('Cost'), show_total => 'sum',
literal_sql => 'if(source_user_id = "'.$c->stash->{subscriber}->uuid.'", source_customer_cost, destination_customer_cost)'.$vat_factor },
) ;
$c->stash->{calls_dt_columns} = NGCP::Panel::Utils::Datatables::set_columns($c, $call_cols);
@ -3852,7 +3855,10 @@ sub _process_calls_rows {
$data{clir} = $resource->{clir};
$data{duration} = (defined $result->duration ? sprintf("%.2f s", $result->duration) : "");
$data{duration} = (defined $result->duration ? NGCP::Panel::Utils::DateTime::sec_to_hms($c,$result->duration,3) : "");
$data{total_customer_cost} = (defined $result->get_column('total_customer_cost') ? sprintf("%.2f", $result->get_column('total_customer_cost') / 100.0) : "");
$data{total_customer_cost} = "";
eval {
$data{total_customer_cost} = sprintf("%.2f", $result->get_column('total_customer_cost') / 100.0) if defined $result->get_column('total_customer_cost');
};
$data{call_id_url} = encode_base64url($resource->{call_id});
return %data;
},
@ -3880,6 +3886,9 @@ sub ajax_calls :Chained('calllist_master') :PathPart('list/ajax') :Args(0) {
source_user_id => { '!=' => $c->stash->{subscriber}->uuid },
($callid ? (call_id => $callid) : ()),
}),NGCP::Panel::Utils::CallList::SUPPRESS_IN);
$out_rs = NGCP::Panel::Utils::Datatables::apply_dt_joins_filters($c, $out_rs, $c->stash->{calls_dt_columns});
$in_rs = NGCP::Panel::Utils::Datatables::apply_dt_joins_filters($c, $in_rs, $c->stash->{calls_dt_columns});
my $rs = $out_rs->union_all($in_rs);
_process_calls_rows($c,$rs);

@ -7,10 +7,10 @@ extends 'HTML::FormHandler::Field::Compound';
has_field 'cc' => (
type => 'Text',
element_attr => {
class => ['ngcp_e164_cc'],
rel => ['tooltip'],
title => ['Country Code, e.g. 1 for US or 43 for Austria']
element_attr => {
class => ['ngcp_e164_cc'],
rel => ['tooltip'],
title => ['Country Code, e.g. 1 for US or 43 for Austria']
},
do_label => 0,
do_wrapper => 0,
@ -19,10 +19,10 @@ has_field 'cc' => (
has_field 'ac' => (
type => 'Text',
element_attr => {
class => ['ngcp_e164_ac'],
rel => ['tooltip'],
title => ['Area Code, e.g. 212 for NYC or 1 for Vienna']
element_attr => {
class => ['ngcp_e164_ac'],
rel => ['tooltip'],
title => ['Area Code, e.g. 212 for NYC or 1 for Vienna']
},
do_label => 0,
do_wrapper => 0,
@ -31,10 +31,10 @@ has_field 'ac' => (
has_field 'sn' => (
type => 'Text',
element_attr => {
class => ['ngcp_e164_sn'],
rel => ['tooltip'],
title => ['Subscriber Number, e.g. 12345678']
element_attr => {
class => ['ngcp_e164_sn'],
rel => ['tooltip'],
title => ['Subscriber Number, e.g. 12345678']
},
do_label => 0,
do_wrapper => 0,
@ -50,9 +50,9 @@ sub validate {
my $sn = $self->field('sn')->value;
my @sub_fields = (qw/cc ac sn/);
my %sub_errors =
my %sub_errors =
map { $_ => 1 }
map { ($self->field($_) && $self->field($_)->result ) ? @{$self->field($_)->errors} : () }
map { ($self->field($_) && $self->field($_)->result ) ? @{$self->field($_)->errors} : () }
@sub_fields;
for my $sub_error( keys %sub_errors ) {
$self->add_error($sub_error);

@ -28,165 +28,60 @@ sub process {
# check if we need to join more tables
# TODO: can we nest it deeper than once level?
set_columns($c, $cols);
unless ($use_rs_cb) {
for my $col(@{ $cols }) {
if ($col->{show_total}) {
push @$aggregate_cols, $col;
}
my @parts = split /\./, $col->{name};
if($col->{literal_sql}) {
$rs = $rs->search_rs(undef, {
$col->{join} ? ( join => $col->{join} ) : (),
$col->{no_column} ? () : (
'+select' => { '' => \[$col->{literal_sql}], -as => $col->{accessor} },
'+as' => [ $col->{accessor} ],
)
});
} elsif( @parts > 1 ) {
my $join = $parts[$#parts-1];
foreach my $table(reverse @parts[0..($#parts-2)]){
$join = { $table => $join };
}
$rs = $rs->search_rs(undef, {
join => $join,
'+select' => [ $parts[($#parts-1)].'.'.$parts[$#parts] ],
'+as' => [ $col->{accessor} ],
});
}
}
}
#all joins already implemented, and filters aren't applied. But count we will take only if there are search and no other aggregations
my $totalRecords_rs = $rs;
$rs = _resolve_joins($rs,$cols,$aggregate_cols) if (!$use_rs_cb);
#all joins already implemented, and filters aren't applied. But count we will take only if there are search and no other aggregations
#= $use_rs_cb ? 0 : $rs->count;
### Search processing section
# generic searching
my @searchColumns = ();
my %conjunctSearchColumns = ();
#processing single search input - group1 from groups to be joined by 'AND'
my $searchString = $c->request->params->{sSearch} // "";
if($searchString && ! $use_rs_cb) {
#for search string from one search input we need to check all columns which contain the 'search' spec (now: qw/search search_lower_column search_upper_column/). so, for example user entered into search input ip address - we don't know that it is ip address, so we check that name like search OR id like search OR search is between network_lower_value and network upper value
foreach my $col(@{ $cols }) {
my ($name,$search_value,$op,$convert);
# avoid amigious column names if we have the same column in different joined tables
if($col->{search}){
$op = (defined $col->{comparison_op} ? $col->{comparison_op} : 'like');
$name = _get_joined_column_name_($col->{name});
$search_value = (ref $col->{convert_code} eq 'CODE') ? $col->{convert_code}->($searchString) : '%'.$searchString.'%';
my $stmt;
if (defined $search_value) {
if($col->{literal_sql}){
if(!ref $col->{literal_sql}){
#we can't use just accessor because of the count query
$stmt = \[$col->{literal_sql} . " $op ?", [ {} => $search_value] ];
}else{
if($col->{literal_sql}->{format}){
$stmt = \[sprintf($col->{literal_sql}->{format}, " $op ?"), [ {} => $search_value] ];
}
}
}else{
$stmt = { $name => { $op => $search_value } };
}
}
if($stmt){
push @{$searchColumns[0]}, $stmt;
}
} elsif( $col->{search_lower_column} || $col->{search_upper_column} ) {
# searching lower and upper limit columns
foreach my $search_spec (qw/search_lower_column search_upper_column/){
if ($col->{$search_spec}) {
$op = (defined $col->{comparison_op} ? $col->{comparison_op} : ( $search_spec eq 'search_lower_column' ? '<=' : '>=') );
$name = _get_joined_column_name_($col->{name});
$search_value = (ref $col->{convert_code} eq 'CODE') ? $col->{convert_code}->($searchString) : $searchString ;
if (defined $search_value) {
$conjunctSearchColumns{$col->{$search_spec}} = [] unless exists $conjunctSearchColumns{$col->{$search_spec}};
push(@{$conjunctSearchColumns{$col->{$search_spec}}},{$name => { $op => $search_value }});
}
}
}
}
}
foreach my $conjunct_column (keys %conjunctSearchColumns) {
#...things in arrays are OR'ed, and things in hashes are AND'ed
push @{$searchColumns[0]}, { map { %{$_} } @{$conjunctSearchColumns{$conjunct_column}} };
}
}
#/processing single search input
#processing dates search input - group2 from groups to be joined by 'AND'
{
# date-range searching
my $from_date_in = $c->request->params->{sSearch_0} // "";
my $to_date_in = $c->request->params->{sSearch_1} // "";
my($from_date,$to_date);
if($from_date_in) {
$from_date = NGCP::Panel::Utils::DateTime::from_forminput_string($from_date_in, $c->session->{user_tz});
}
if($to_date_in) {
$to_date = NGCP::Panel::Utils::DateTime::from_forminput_string($to_date_in, $c->session->{user_tz});
}
foreach my $col(@{ $cols }) {
# avoid amigious column names if we have the same column in different joined tables
my $name = _get_joined_column_name_($col->{name});
if($col->{search_from_epoch} && $from_date) {
push @searchColumns, { $name => { '>=' => $col->{search_use_datetime} ? $from_date_in : $from_date->epoch } };
}
if($col->{search_to_epoch} && $to_date) {
push @searchColumns, { $name => { '<=' => $col->{search_use_datetime} ? $to_date_in : $to_date->epoch } };
}
}
}
#/processing dates search input
if(@searchColumns){
$rs = $rs->search({
"-and" => [@searchColumns],
});
}
### /Search processing section
($rs,my @searchColumns) = _apply_search_filters($c,$rs,$cols,$use_rs_cb);
my $is_set_operations = 0;
($displayRecords, $displayRecordCountClipped, $is_set_operations) = _get_count_safe($c,$rs,$params) if(!$use_rs_cb);
#my $footer = ((scalar @$aggregate_cols) > 0 and $displayRecordCountClipped);
#$aggregate_cols = [] if $displayRecordCountClipped;
if(@$aggregate_cols){
my(@select, @as);
if(!$use_rs_cb){
push @select, { 'count' => '*', '-as' => 'count' };
push @as, 'count';
}
foreach my $col (@$aggregate_cols){
my $col_accessor = $col->{literal_sql} ? \[ $col->{literal_sql} ] : $col->{accessor} ;#_get_joined_column_name_($col->{name});
push @select, { $col->{show_total} => $col_accessor, '-as' => $col->{accessor} };
push @as, $col->{accessor};
}
my $aggregate_rs = $rs->search_rs(undef,{
'select' => \@select,
'as' => \@as,
});
if(my $row = $aggregate_rs->first){
unless ($displayRecordCountClipped or $displayRecords == 0) {
my(@select, @as);
if(!$use_rs_cb){
$displayRecords = $row->get_column('count');
push @select, { 'count' => '*', '-as' => 'count' };
push @as, 'count';
}
foreach my $col (@$aggregate_cols){
my $col_accessor = $col->{literal_sql} ? \[ $col->{literal_sql} ] : $col->{accessor} ;#_get_joined_column_name_($col->{name});
push @select, { $col->{show_total} => $col_accessor, '-as' => $col->{accessor} };
push @as, $col->{accessor};
}
my $aggregate_rs = $rs->search_rs(undef,{
'select' => \@select,
'as' => \@as,
});
if(my $row = $aggregate_rs->first){
if(!$use_rs_cb){
$displayRecords = $row->get_column('count');
}
foreach my $col (@$aggregate_cols){
$aggregations->{$col->{accessor}} = $row->get_column($col->{accessor});
}
}
if (defined $total_row_func) {
$aggregations = {%{$aggregations}, $total_row_func->($aggregations) };
}
} else {
foreach my $col (@$aggregate_cols){
$aggregations->{$col->{accessor}} = $row->get_column($col->{accessor});
$aggregations->{$col->{accessor}} = '~';
}
}
}
if (defined $total_row_func && (scalar @$aggregate_cols ) > 0) {
$aggregations = {%{$aggregations}, $total_row_func->($aggregations) };
}
if (!$use_rs_cb) {
if (@searchColumns) {
($totalRecords, $totalRecordCountClipped) = get_count_safe($c,$totalRecords_rs,$params);
if (!@$aggregate_cols) {
($displayRecords, $displayRecordCountClipped) = get_count_safe($c,$rs,$params);
}
($totalRecords, $totalRecordCountClipped) = _get_count_safe($c,$totalRecords_rs,$params);
} else {
if (@$aggregate_cols) {
$totalRecords = $displayRecords;
} elsif (!@$aggregate_cols) {
($totalRecords, $totalRecordCountClipped) = get_count_safe($c,$totalRecords_rs,$params);
$displayRecords = $totalRecords;
$displayRecordCountClipped = $totalRecordCountClipped;
}
($totalRecords,$totalRecordCountClipped) = ($displayRecords,$displayRecordCountClipped);
}
}
@ -233,22 +128,48 @@ sub process {
# pagination
my $pageStart = $c->request->params->{iDisplayStart};
my $pageSize = $c->request->params->{iDisplayLength};
my $searchString = $c->request->params->{sSearch} // "";
my @rows = ();
if ($use_rs_cb) {
($rs, $totalRecords, $displayRecords) = $rs->(
offset => $pageStart || 0,
rows => $pageSize || 5,
searchstring => $searchString,
);
@rows = $rs->all;
} else {
if(defined $pageStart && defined $pageSize && $pageSize > 0) {
if($displayRecords > 0 and defined $pageStart and defined $pageSize and $pageSize > 0) {
if ($is_set_operations and $displayRecordCountClipped) { # and defined $params->{count_limit} and $params->{count_limit} > 0) { #and $displayRecordCountClipped) {
my ($stmt, @bind_vals) = @{${$totalRecords_rs->as_query}};
($is_set_operations,$stmt) = _limit_set_queries($stmt,sub {
my $part_stmt = shift;
return $part_stmt . ' limit ' . $params->{count_limit};
});
@bind_vals = map { $_->[1]; } @bind_vals;
$c->log->debug("page stmt: " . $stmt);
$c->log->debug("page stmt bind: " . join(",",@bind_vals));
my $attrs = $totalRecords_rs->_resolved_attrs;
$rs = $totalRecords_rs->result_source->resultset->search(undef, {
alias => $totalRecords_rs->current_source_alias,
from => [{
$totalRecords_rs->current_source_alias => \[ $stmt, @bind_vals ],
-alias => $totalRecords_rs->current_source_alias,
-source_handle => $totalRecords_rs->result_source->handle,
}],
columns => $attrs->{as},
result_class => $rs->result_class,
});
$rs = _resolve_joins($rs,$cols);
}
$rs = $rs->search(undef, {
offset => $pageStart,
rows => $pageSize,
});
@rows = $rs->all;
}
}
for my $row ($rs->all) {
for my $row (@rows) {
push @{ $aaData }, _prune_row($user_tz, $cols, $row->get_inflated_columns);
if (defined $row_func) {
$aaData->[-1] = {%{$aaData->[-1]}, $row_func->($row)} ;
@ -265,30 +186,202 @@ sub process {
}
sub get_count_safe {
sub apply_dt_joins_filters {
my ($c,$rs, $cols) = @_;
$rs = _resolve_joins($rs, $cols, undef, 1, 1);
($rs,my @searchColumns) = _apply_search_filters($c,$rs,$cols);
return $rs;
}
sub _resolve_joins {
my ($rs, $cols, $aggregate_cols, $skip_aggregates,$join_only) = @_;
for my $col(@{ $cols }) {
if ($col->{show_total}) {
push(@$aggregate_cols, $col) if defined $aggregate_cols;
next if $skip_aggregates;
}
my @parts = split /\./, $col->{name};
if($col->{literal_sql}) {
$rs = $rs->search_rs(undef, {
$col->{join} ? ( join => $col->{join} ) : (),
($col->{no_column} or $join_only) ? () : (
'+select' => { '' => \[$col->{literal_sql}], -as => $col->{accessor} },
'+as' => [ $col->{accessor} ],
)
});
} elsif( @parts > 1 ) {
my $join = $parts[$#parts-1];
foreach my $table(reverse @parts[0..($#parts-2)]){
$join = { $table => $join };
}
$rs = $rs->search_rs(undef, {
join => $join,
($join_only ? () : ('+select' => [ $parts[($#parts-1)].'.'.$parts[$#parts] ],
'+as' => [ $col->{accessor} ],)),
});
}
}
return $rs;
}
sub _apply_search_filters {
my ($c,$rs,$cols,$use_rs_cb) = @_;
# generic searching
my @searchColumns = ();
my %conjunctSearchColumns = ();
#processing single search input - group1 from groups to be joined by 'AND'
my $searchString = $c->request->params->{sSearch} // "";
if ($searchString && !$use_rs_cb) {
#for search string from one search input we need to check all columns which contain the 'search' spec (now: qw/search search_lower_column search_upper_column/). so, for example user entered into search input ip address - we don't know that it is ip address, so we check that name like search OR id like search OR search is between network_lower_value and network upper value
foreach my $col(@{ $cols }) {
my ($name,$search_value,$op,$convert);
# avoid amigious column names if we have the same column in different joined tables
if($col->{search} or $col->{strict_search} or $col->{int_search}){
my $is_pattern = 0;
my $searchString_escaped = join('',map {
my $token = $_;
if ($token ne '\\\\') {
$token =~ s/%/\\%/g;
$token =~ s/_/\\_/g;
if ($token =~ s/(?<!\\)\*/%/g) {
$is_pattern = 1;
}
$token =~ s/\\\*/*/g;
}
$token;
} split(/(\\\\)/,$searchString,-1));
if ($is_pattern) {
$op = 'like';
$search_value = $searchString_escaped;
} elsif ($col->{strict_search}) {
$op = '=';
$searchString_escaped = $searchString;
$searchString_escaped =~ s/\\\*/*/g;
$searchString_escaped =~ s/\\\\/\\/g;
$search_value = $searchString_escaped;
} elsif ($col->{int_search}) {
$op = '=';
$search_value = $searchString;
} else {
$op = 'like';
$search_value = '%' . $searchString_escaped . '%';
}
$name = _get_joined_column_name_($col->{name});
$op = $col->{comparison_op} if (defined $col->{comparison_op});
$search_value = $col->{convert_code}->($searchString) if (ref $col->{convert_code} eq 'CODE');
my $stmt;
if (defined $search_value) {
if ($col->{literal_sql}) {
if (!ref $col->{literal_sql}) {
#we can't use just accessor because of the count query
$stmt = \[$col->{literal_sql} . " $op ?", [ {} => $search_value] ];
} else {
if ($col->{literal_sql}->{format}) {
$stmt = \[sprintf($col->{literal_sql}->{format}, " $op ?"), [ {} => $search_value] ];
}
}
} elsif (not $col->{int_search} or $searchString =~ /^\d{1,10}$/) {
$stmt = { $name => { $op => $search_value } };
}
}
if ($stmt) {
push @{$searchColumns[0]}, $stmt;
}
} elsif ( $col->{search_lower_column} || $col->{search_upper_column} ) {
# searching lower and upper limit columns
foreach my $search_spec (qw/search_lower_column search_upper_column/){
if ($col->{$search_spec}) {
$op = (defined $col->{comparison_op} ? $col->{comparison_op} : ( $search_spec eq 'search_lower_column' ? '<=' : '>=') );
$name = _get_joined_column_name_($col->{name});
$search_value = (ref $col->{convert_code} eq 'CODE') ? $col->{convert_code}->($searchString) : $searchString ;
if (defined $search_value) {
$conjunctSearchColumns{$col->{$search_spec}} = [] unless exists $conjunctSearchColumns{$col->{$search_spec}};
push(@{$conjunctSearchColumns{$col->{$search_spec}}},{$name => { $op => $search_value }});
}
}
}
}
}
foreach my $conjunct_column (keys %conjunctSearchColumns) {
#...things in arrays are OR'ed, and things in hashes are AND'ed
push @{$searchColumns[0]}, { map { %{$_} } @{$conjunctSearchColumns{$conjunct_column}} };
}
}
#/processing single search input
#processing dates search input - group2 from groups to be joined by 'AND'
{
# date-range searching
my $from_date_in = $c->request->params->{sSearch_0} // "";
my $to_date_in = $c->request->params->{sSearch_1} // "";
my($from_date,$to_date);
if($from_date_in) {
$from_date = NGCP::Panel::Utils::DateTime::from_forminput_string($from_date_in, $c->session->{user_tz});
}
if($to_date_in) {
$to_date = NGCP::Panel::Utils::DateTime::from_forminput_string($to_date_in, $c->session->{user_tz});
}
foreach my $col(@{ $cols }) {
# avoid amigious column names if we have the same column in different joined tables
my $name = _get_joined_column_name_($col->{name});
if($col->{search_from_epoch} && $from_date) {
push @searchColumns, { $name => { '>=' => $col->{search_use_datetime} ? $from_date_in : $from_date->epoch } };
}
if($col->{search_to_epoch} && $to_date) {
push @searchColumns, { $name => { '<=' => $col->{search_use_datetime} ? $to_date_in : $to_date->epoch } };
}
}
}
#/processing dates search input
if(@searchColumns){
$rs = $rs->search_rs({
"-and" => [@searchColumns],
});
}
### /Search processing section
return ($rs,@searchColumns);
}
sub _get_count_safe {
my ($c,$rs,$params) = @_;
my $count_limit = $params->{count_limit};
#$count_limit = 12;
my $is_set_operations;
if ($c and defined $count_limit and $count_limit > 0) {
#use Data::Dumper;
#$c->log->debug("count_limit: $count_limit " . Dumper($params));
my ($count_clipped) = $c->model('DB')->storage->dbh_do(sub {
my ($storage, $dbh, $stmt, @bind_vals) = @_;
$c->log->debug("entered dbdo");
($is_set_operations,$stmt) = _limit_set_queries($stmt,sub {
my $part_stmt = shift;
return $part_stmt . ' limit ' . ($count_limit + 1);
});
@bind_vals = map { $_->[1]; } @bind_vals;
$c->log->debug("bind: " . join(",",@bind_vals));
$c->log->debug("count stmt: " . "select count(1) from ($stmt) as query_clipped");
$c->log->debug("count stmt bind: " . join(",",@bind_vals));
return $dbh->selectrow_array("select count(1) from ($stmt) as query_clipped",undef,@bind_vals);
},@{${$rs->search_rs(undef,{
page => 1,
rows => $count_limit + 1,
rows => ($count_limit + 1),
#below is required if fields with identical name are selected by $rs:
'select' => (defined $params->{count_projection_column} ? $params->{count_projection_column} : "id"),
'select' => (defined $params->{count_projection_column} ? $params->{count_projection_column} : \"1"),
#select => $rs->_resolved_attrs->{select},
#as => $rs->_resolved_attrs->{as},
})->as_query}});
if ($count_clipped > $count_limit) {
$c->log->debug("result count clipped");
return ($count_limit,1);
return ($count_limit,1,$is_set_operations);
} else {
return ($count_clipped,0);
$c->log->debug("result count not clipped");
return ($count_clipped,0,$is_set_operations);
}
} else {
return ($rs->count,0);
return ($rs->count,0,$is_set_operations);
}
}
@ -427,6 +520,102 @@ sub _get_joined_column_name_{
return $name;
}
sub _limit_set_queries {
my ($stmt,$sub) = @_;
return (undef,$stmt) unless $sub;
#simple lexer for parsing sql stmts with a single level of set operations.
#caveat: set operator names must not appear in colnames, table names, literals etc.
my $set_operation_re = "union\\s+distinct|union\\s+all|union|intersect|except";
my @frags = split(/\s($set_operation_re)\s/i,$stmt, -1);
return (0,$stmt) unless (scalar @frags) > 1;
my @frags_rebuilt = ();
my ($preamble,$postamble) = (undef,undef);
foreach my $frag (@frags) {
if ($frag =~ /($set_operation_re)/i) {
push(@frags_rebuilt,$1);
} else {
my $set_stmt = $frag;
$set_stmt =~ s/\s+$//g;
$set_stmt =~ s/^\s+//g;
my $last = (((scalar @frags) - (scalar @frags_rebuilt)) == 1 ? 1 : 0);
my $first = ((scalar @frags_rebuilt) == 0 ? 1 : 0);
my $quoted = 0;
my $depth = 0;
my ($left_parenthesis_count,$right_parenthesis_count) = (0,0);
my $rebuilt = '';
my $balanced;
if ($last) {
for (my $i = 0; $i < length($set_stmt); $i++) {
my $char = substr($set_stmt, $i, 1);
my $escape = substr($set_stmt, $i, 2);
if ($escape eq '\\\\' or $escape eq '\\"' or $escape eq "\\'") {
$rebuilt .= $escape;
$i++;
} else {
if ($char eq "'" or $char eq '"') {
$quoted = ($quoted ? 0 : 1);
} elsif (not $quoted and $char eq ')') {
last if ($depth == 0);
$depth--;
$right_parenthesis_count++;
} elsif (not $quoted and $char eq '(') {
$depth++;
$left_parenthesis_count++;
}
$rebuilt .= $char;
}
if ($left_parenthesis_count == $right_parenthesis_count and $depth == 0) {
$balanced = $rebuilt;
}
}
$postamble = substr($set_stmt,length($balanced));
} else {
for (my $i = length($set_stmt) - 1; $i >= 0; $i--) {
my $char = substr($set_stmt, $i, 1);
my $escape = substr($set_stmt, $i - 1, 2);
if ($escape eq '\\\\' or $escape eq '\\"' or $escape eq "\\'") {
$rebuilt = $escape . $rebuilt;
$i--;
} else {
if ($char eq "'" or $char eq '"') {
$quoted = ($quoted ? 0 : 1);
} elsif (not $quoted and $char eq ')') {
$depth--;
$right_parenthesis_count++;
} elsif (not $quoted and $char eq '(') {
$depth++;
$left_parenthesis_count++;
}
$rebuilt = $char . $rebuilt;
}
if ($left_parenthesis_count == $right_parenthesis_count and $depth == 0) {
$balanced = $rebuilt;
last if ($first and not $quoted and 'select' eq lc(substr($rebuilt,0,6)));
}
}
if ($first) {
$preamble = substr($set_stmt,0, length($set_stmt) - length($balanced));
}
}
#normalize outer parentheses for easier handling in $sub:
while ($balanced =~ /^\s*\(\s*/g and $balanced =~ /\s*\)\s*$/g) {
$balanced =~ s/^\s*\(\s*//g;
$balanced =~ s/\s*\)\s*$//g;
}
$balanced = &$sub($balanced);
push(@frags_rebuilt,'(' . $balanced . ')');
}
}
unshift(@frags_rebuilt,$preamble) if $preamble;
push(@frags_rebuilt,$postamble) if $postamble;
#my $i=0;
#print(join("\n",map { $i++;$i.'. '.$_; } @frags_rebuilt));
return (1,join(' ',@frags_rebuilt));
}
1;

Loading…
Cancel
Save