diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm index 3936010..56391e3 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr.pm @@ -245,7 +245,7 @@ sub findby_callidprefix { my @conditions = @{$conditions // []}; push(@conditions,{ $table . '.call_id' => { 'LIKE' => '?' } }); my $stmt = 'SELECT ' . join(',', map { $table . '.' . $db->columnidentifier($_); } @$expected_fieldnames) . ' ' . - _get_export_stmt($db,undef,$joins,\@conditions) . + _get_export_stmt($db,$joins,\@conditions) . ' ORDER BY LENGTH(' . $table . '.call_id' . ') ASC, ' . $table . '.start_time ASC'; my @params = ($call_id . '%'); my $rows = $xa_db->db_get_all_arrayref($stmt,@params); @@ -266,7 +266,7 @@ sub findby_callid { my @conditions = @{$conditions // []}; push(@conditions,{ $table . '.call_id' => { '=' => '?' } }); my $stmt = 'SELECT ' . join(',', map { $table . '.' . $db->columnidentifier($_); } @$expected_fieldnames) . ' ' . - _get_export_stmt($db,undef,$joins,\@conditions) . + _get_export_stmt($db,$joins,\@conditions) . ' ORDER BY ' . $table . '.start_time ASC'; my @params = ($call_id); my $rows = $xa_db->db_get_all_arrayref($stmt,@params); @@ -329,9 +329,31 @@ sub process_unexported { NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::check_table(); my $db = &$get_db(); my $table = $db->tableidentifier($tablename); - - my $stmt = _get_export_stmt($db,$static_context,$joins,$conditions); - + + my $select_stmt; + my $count_stmt; + my $select_format = 'SELECT ' . $table . '.' . $db->columnidentifier('id') . ', ' . $table . '.' . $db->columnidentifier('call_id') . ' %s ORDER BY ' . $table . '.' . $db->columnidentifier('id'); + my $count_format = 'SELECT COUNT(1) FROM (%s) AS __cnt'; + + if ($static_context) { + $static_context->{part} = 'A'; + $select_stmt = sprintf('(' . $select_format . ')',_get_export_stmt_part($db,$static_context,$joins,$conditions)); + $count_stmt = sprintf('(' . $count_format . ')',$db->paginate_sort_query('SELECT 1 ' . _get_export_stmt_part($db,$static_context,$joins,$conditions),0,$limit,undef)); + $select_stmt .= ' UNION ALL '; + $count_stmt .= ' + '; + $static_context->{part} = 'B'; + $select_stmt .= sprintf('(' . $select_format . ')',_get_export_stmt_part($db,$static_context,$joins,$conditions)); + $count_stmt .= sprintf('(' . $count_format . ')',$db->paginate_sort_query('SELECT 1 ' . _get_export_stmt_part($db,$static_context,$joins,$conditions),0,$limit,undef)); + if (defined $limit) { + $count_stmt = 'SELECT LEAST(' . $count_stmt . ', ' . $limit . ')'; + } else { + $count_stmt = 'SELECT ' . $count_stmt; + } + } else { + $select_stmt = sprintf($select_format,_get_export_stmt_part($db,undef,$joins,$conditions)); + $count_stmt = sprintf($count_format,$db->paginate_sort_query('SELECT 1 ' . _get_export_stmt_part($db,undef,$joins,$conditions),0,$limit,undef)); + } + return process_table( get_db => $get_db, class => __PACKAGE__, @@ -354,14 +376,11 @@ sub process_unexported { multithreading => $multithreading, tableprocessing_threads => $numofthreads, blocksize => $blocksize, - #select => $db->paginate_sort_query('SELECT ' . $table . '.* ' . $stmt,undef,undef,$sort), - select => 'SELECT ' . $table . '.' . $db->columnidentifier('id') . ', ' . $table . '.' . $db->columnidentifier('call_id') . - ' ' . $stmt . ' ORDER BY ' . $table . '.' . $db->columnidentifier('id'), - selectcount => 'SELECT COUNT(1) FROM (' . $db->paginate_sort_query('SELECT 1 ' . $stmt,0,$limit,undef) . ') AS __cnt', + select => $select_stmt, + selectcount => $count_stmt, ); } - sub process_fromto { my %params = @_; @@ -423,9 +442,15 @@ sub process_fromto { ); } - sub _get_export_stmt { + my ($db,$joins,$conditions) = @_; + return _get_export_stmt_part($db,undef,$joins,$conditions); + +} + +sub _get_export_stmt_part { + my ($db,$static_context,$joins,$conditions) = @_; my $table = $db->tableidentifier($tablename); @@ -439,11 +464,21 @@ sub _get_export_stmt { push @intjoins, "LEFT JOIN $table ON $foreign_key = $own_key"; } } + my @conds = (); - if (defined $static_context and $static_context->{export_status_id}) { - push @intjoins, 'LEFT JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::gettablename()) . ' AS __cesd ON __cesd.cdr_id = ' . $table . '.id AND __cesd.status_id = ' . $static_context->{export_status_id}; - push @conds, '(__cesd.export_status = "' . $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::UNEXPORTED . '" OR __cesd.export_status IS NULL)'; + if (defined $static_context and $static_context->{export_status_id} and $static_context->{part}) { + unless (defined $static_context->{last_processed_cdr_id}) { + $static_context->{last_processed_cdr_id} = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::find_last_processed_cdrid($static_context->{export_status_id}); + } + if ('b' eq lc($static_context->{part})) { + push @conds, $table . '.id > ' . $static_context->{last_processed_cdr_id}; + } elsif ('a' eq lc($static_context->{part})) { + push @intjoins, 'LEFT JOIN ' . $db->tableidentifier(NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::gettablename()) . ' AS __cesd ON __cesd.cdr_id = ' . $table . '.id AND __cesd.status_id = ' . $static_context->{export_status_id}; + push @conds, $table . '.id <= ' . $static_context->{last_processed_cdr_id}; + push @conds, '__cesd.export_status = "' . $NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data::UNEXPORTED . '"'; + } } + $stmt .= " " . join(" ", @intjoins) if (scalar @intjoins) > 0; if (defined $conditions and (scalar @$conditions) > 0) { foreach my $f (@$conditions) { diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm index 312df98..235a040 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/accounting/cdr_export_status_data.pm @@ -34,6 +34,8 @@ our @EXPORT_OK = qw( update_row insert_row upsert_row + + find_last_processed_cdrid update_export_status @@ -75,6 +77,30 @@ sub new { } + +sub find_last_processed_cdrid { + + my ($status_id) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COALESCE(MAX(cdr_id),0) FROM ' . $table; + my @params = (); + my @terms = (); + if (defined $status_id) { + push(@terms,$db->columnidentifier('status_id') . ' = ?'); + push(@params,$status_id); + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + + return $db->db_get_value($stmt,@params); + +} + sub update_row { my ($xa_db,$data) = @_;