@ -245,7 +245,7 @@ sub findby_callidprefix {
my @ conditions = @ { $ conditions // [] } ;
push ( @ conditions , { $ table . '.call_id' = > { 'LIKE' = > '?' } } ) ;
my $ stmt = 'SELECT ' . join ( ',' , map { $ table . '.' . $ db - > columnidentifier ( $ _ ) ; } @$ expected_fieldnames ) . ' ' .
_get_export_stmt ( $ db , undef , $ joins , \ @ conditions ) .
_get_export_stmt ( $ db , $ joins , \ @ conditions ) .
' ORDER BY LENGTH(' . $ table . '.call_id' . ') ASC, ' . $ table . '.start_time ASC' ;
my @ params = ( $ call_id . '%' ) ;
my $ rows = $ xa_db - > db_get_all_arrayref ( $ stmt , @ params ) ;
@ -266,7 +266,7 @@ sub findby_callid {
my @ conditions = @ { $ conditions // [] } ;
push ( @ conditions , { $ table . '.call_id' = > { '=' = > '?' } } ) ;
my $ stmt = 'SELECT ' . join ( ',' , map { $ table . '.' . $ db - > columnidentifier ( $ _ ) ; } @$ expected_fieldnames ) . ' ' .
_get_export_stmt ( $ db , undef , $ joins , \ @ conditions ) .
_get_export_stmt ( $ db , $ joins , \ @ conditions ) .
' ORDER BY ' . $ table . '.start_time ASC' ;
my @ params = ( $ call_id ) ;
my $ rows = $ xa_db - > db_get_all_arrayref ( $ stmt , @ params ) ;
@ -329,9 +329,31 @@ sub process_unexported {
NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data:: check_table ( ) ;
my $ db = & $ get_db ( ) ;
my $ table = $ db - > tableidentifier ( $ tablename ) ;
my $ stmt = _get_export_stmt ( $ db , $ static_context , $ joins , $ conditions ) ;
my $ select_stmt ;
my $ count_stmt ;
my $ select_format = 'SELECT ' . $ table . '.' . $ db - > columnidentifier ( 'id' ) . ', ' . $ table . '.' . $ db - > columnidentifier ( 'call_id' ) . ' %s ORDER BY ' . $ table . '.' . $ db - > columnidentifier ( 'id' ) ;
my $ count_format = 'SELECT COUNT(1) FROM (%s) AS __cnt' ;
if ( $ static_context ) {
$ static_context - > { part } = 'A' ;
$ select_stmt = sprintf ( '(' . $ select_format . ')' , _get_export_stmt_part ( $ db , $ static_context , $ joins , $ conditions ) ) ;
$ count_stmt = sprintf ( '(' . $ count_format . ')' , $ db - > paginate_sort_query ( 'SELECT 1 ' . _get_export_stmt_part ( $ db , $ static_context , $ joins , $ conditions ) , 0 , $ limit , undef ) ) ;
$ select_stmt . = ' UNION ALL ' ;
$ count_stmt . = ' + ' ;
$ static_context - > { part } = 'B' ;
$ select_stmt . = sprintf ( '(' . $ select_format . ')' , _get_export_stmt_part ( $ db , $ static_context , $ joins , $ conditions ) ) ;
$ count_stmt . = sprintf ( '(' . $ count_format . ')' , $ db - > paginate_sort_query ( 'SELECT 1 ' . _get_export_stmt_part ( $ db , $ static_context , $ joins , $ conditions ) , 0 , $ limit , undef ) ) ;
if ( defined $ limit ) {
$ count_stmt = 'SELECT LEAST(' . $ count_stmt . ', ' . $ limit . ')' ;
} else {
$ count_stmt = 'SELECT ' . $ count_stmt ;
}
} else {
$ select_stmt = sprintf ( $ select_format , _get_export_stmt_part ( $ db , undef , $ joins , $ conditions ) ) ;
$ count_stmt = sprintf ( $ count_format , $ db - > paginate_sort_query ( 'SELECT 1 ' . _get_export_stmt_part ( $ db , undef , $ joins , $ conditions ) , 0 , $ limit , undef ) ) ;
}
return process_table (
get_db = > $ get_db ,
class = > __PACKAGE__ ,
@ -354,14 +376,11 @@ sub process_unexported {
multithreading = > $ multithreading ,
tableprocessing_threads = > $ numofthreads ,
blocksize = > $ blocksize ,
#select => $db->paginate_sort_query('SELECT ' . $table . '.* ' . $stmt,undef,undef,$sort),
select = > 'SELECT ' . $ table . '.' . $ db - > columnidentifier ( 'id' ) . ', ' . $ table . '.' . $ db - > columnidentifier ( 'call_id' ) .
' ' . $ stmt . ' ORDER BY ' . $ table . '.' . $ db - > columnidentifier ( 'id' ) ,
selectcount = > 'SELECT COUNT(1) FROM (' . $ db - > paginate_sort_query ( 'SELECT 1 ' . $ stmt , 0 , $ limit , undef ) . ') AS __cnt' ,
select = > $ select_stmt ,
selectcount = > $ count_stmt ,
) ;
}
sub process_fromto {
my % params = @ _ ;
@ -423,9 +442,15 @@ sub process_fromto {
) ;
}
sub _get_export_stmt {
my ( $ db , $ joins , $ conditions ) = @ _ ;
return _get_export_stmt_part ( $ db , undef , $ joins , $ conditions ) ;
}
sub _get_export_stmt_part {
my ( $ db , $ static_context , $ joins , $ conditions ) = @ _ ;
my $ table = $ db - > tableidentifier ( $ tablename ) ;
@ -439,11 +464,21 @@ sub _get_export_stmt {
push @ intjoins , "LEFT JOIN $table ON $foreign_key = $own_key" ;
}
}
my @ conds = ( ) ;
if ( defined $ static_context and $ static_context - > { export_status_id } ) {
push @ intjoins , 'LEFT JOIN ' . $ db - > tableidentifier ( NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data:: gettablename ( ) ) . ' AS __cesd ON __cesd.cdr_id = ' . $ table . '.id AND __cesd.status_id = ' . $ static_context - > { export_status_id } ;
push @ conds , '(__cesd.export_status = "' . $ NGCP:: BulkProcessor:: Dao:: Trunk:: accounting:: cdr_export_status_data:: UNEXPORTED . '" OR __cesd.export_status IS NULL)' ;
if ( defined $ static_context and $ static_context - > { export_status_id } and $ static_context - > { part } ) {
unless ( defined $ static_context - > { last_processed_cdr_id } ) {
$ static_context - > { last_processed_cdr_id } = NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data:: find_last_processed_cdrid ( $ static_context - > { export_status_id } ) ;
}
if ( 'b' eq lc ( $ static_context - > { part } ) ) {
push @ conds , $ table . '.id > ' . $ static_context - > { last_processed_cdr_id } ;
} elsif ( 'a' eq lc ( $ static_context - > { part } ) ) {
push @ intjoins , 'LEFT JOIN ' . $ db - > tableidentifier ( NGCP::BulkProcessor::Dao::Trunk::accounting::cdr_export_status_data:: gettablename ( ) ) . ' AS __cesd ON __cesd.cdr_id = ' . $ table . '.id AND __cesd.status_id = ' . $ static_context - > { export_status_id } ;
push @ conds , $ table . '.id <= ' . $ static_context - > { last_processed_cdr_id } ;
push @ conds , '__cesd.export_status = "' . $ NGCP:: BulkProcessor:: Dao:: Trunk:: accounting:: cdr_export_status_data:: UNEXPORTED . '"' ;
}
}
$ stmt . = " " . join ( " " , @ intjoins ) if ( scalar @ intjoins ) > 0 ;
if ( defined $ conditions and ( scalar @$ conditions ) > 0 ) {
foreach my $ f ( @$ conditions ) {