@ -185,29 +185,12 @@ static uint64_t mediator_calc_runtime(struct timeval *tv_start, struct timeval *
( tv_start - > tv_sec * 1000000 + tv_start - > tv_usec ) ) ) ;
}
/**********************************************************************/
// appends the entire `two` list to the end of `one`, resetting `two` to empty
static void mediator_splice_gqueue ( GQueue * one , GQueue * two ) {
if ( ! two - > head )
return ;
if ( ! one - > tail ) {
* one = * two ;
g_queue_init ( two ) ;
return ;
}
one - > tail - > next = two - > head ;
two - > head - > prev = one - > tail ;
one - > length + = two - > length ;
g_queue_init ( two ) ;
}
/**********************************************************************/
int main ( int argc , char * * argv )
{
GQueue mysql_callids = G_QUEUE_INIT ;
GQueue redis_callids = G_QUEUE_INIT ;
GQueue mysql_records = G_QUEUE_INIT ;
GQueue redis_records = G_QUEUE_INIT ;
GQueue acc_records = G_QUEUE_INIT ;
uint64_t cdr_count , last_count ;
int maprefresh ;
struct medmysql_batches * batches ;
@ -332,8 +315,7 @@ int main(int argc, char **argv)
g_queue_clear_full ( & mysql_callids , g_free ) ;
g_queue_clear_full ( & redis_callids , g_free ) ;
g_queue_clear_full ( & mysql_records , med_entry_free ) ;
g_queue_clear_full ( & redis_records , med_entry_free ) ;
g_queue_clear_full ( & acc_records , med_entry_free ) ;
cdr_count = 0 ;
last_count = mediator_count ;
@ -370,30 +352,34 @@ int main(int argc, char **argv)
gettimeofday ( & tv_start , NULL ) ;
# endif
if ( medmysql_fetch_records ( mysql_callid , & mysql_records , 1 ) ! = 0 )
int ret = medmysql_fetch_records ( mysql_callid , & acc_records , 1 ) ;
if ( ret < 0 )
goto out ;
int must_sort = ( ret > 0 ) ;
if ( medredis_fetch_records ( mysql_callid , & redis_records ) = = 0 )
{
mediator_splice_gqueue ( & mysql_records , & redis_records ) ;
ret = medredis_fetch_records ( mysql_callid , & acc_records ) ;
if ( ret > 0 )
must_sort = 1 ;
// only re-sort if records from Redis were added, as MySQL already does the sorting
records_sort ( & mysql_records ) ;
if ( must_sort )
{
// only re-sort if records from Redis were added, as MySQL already does the sorting
records_sort ( & acc_records ) ;
}
int are_records_complete = records_complete ( & mysql _records) ;
int are_records_complete = records_complete ( & acc _records) ;
if ( ! are_records_complete & & ! do_intermediate )
{
L_DEBUG ( " Found incomplete call with cid '%s', skipping... \n " , mysql_callid ) ;
g_queue_clear_full ( & mysql _records, med_entry_free ) ;
g_queue_clear_full ( & acc _records, med_entry_free ) ;
continue ;
}
if ( cdr_process_records ( & mysql _records, & cdr_count , batches , do_intermediate ) ! = 0 )
if ( cdr_process_records ( & acc _records, & cdr_count , batches , do_intermediate ) ! = 0 )
goto out ;
g_queue_clear_full ( & mysql _records, med_entry_free ) ;
g_queue_clear_full ( & acc _records, med_entry_free ) ;
mediator_count + = cdr_count ;
@ -416,34 +402,31 @@ int main(int argc, char **argv)
gettimeofday ( & tv_start , NULL ) ;
# endif
if ( medredis_fetch_records ( redis_callid , & redis_records) ! = 0 )
if ( medredis_fetch_records ( redis_callid , & acc_records) < 0 )
goto out ;
if ( medmysql_fetch_records ( redis_callid , & mysql_records , 0 ) = = 0 )
{
mediator_splice_gqueue ( & redis_records , & mysql_records ) ;
}
medmysql_fetch_records ( redis_callid , & acc_records , 0 ) ;
// always sort records from Redis, regardless of whether records from MySQL were merged
records_sort ( & redis _records) ;
// always sort records from Redis, regardless of whether records from MySQL were merged
records_sort ( & acc_records ) ;
int are_records_complete = records_complete ( & redis _records) ;
int are_records_complete = records_complete ( & acc _records) ;
if ( ! are_records_complete & & ! do_intermediate )
{
L_DEBUG ( " Found incomplete call with cid '%s', skipping... \n " , redis_callid ) ;
g_queue_clear_full ( & redis _records, med_entry_free ) ;
g_queue_clear_full ( & acc _records, med_entry_free ) ;
continue ;
}
L_DEBUG ( " process cdr with cid '%s' and %u records \n " , redis_callid , redis _records. length ) ;
L_DEBUG ( " process cdr with cid '%s' and %u records \n " , redis_callid , acc _records. length ) ;
if ( redis _records. length ) {
if ( cdr_process_records ( & redis _records, & cdr_count , batches , do_intermediate ) ! = 0 ) {
g_queue_clear_full ( & redis _records, med_entry_free ) ;
if ( acc _records. length ) {
if ( cdr_process_records ( & acc _records, & cdr_count , batches , do_intermediate ) ! = 0 ) {
g_queue_clear_full ( & acc _records, med_entry_free ) ;
goto out ;
}
g_queue_clear_full ( & redis _records, med_entry_free ) ;
g_queue_clear_full ( & acc _records, med_entry_free ) ;
mediator_count + = cdr_count ;
}
@ -484,6 +467,7 @@ out:
L_INFO ( " Shutting down. " ) ;
sd_notify ( 0 , " STOPPING=1 \n " ) ;
g_queue_clear_full ( & acc_records , med_entry_free ) ;
g_queue_clear_full ( & mysql_callids , g_free ) ;
g_queue_clear_full ( & redis_callids , g_free ) ;
mediator_destroy_maps ( ) ;