@ -8,8 +8,8 @@
# define PBXSUFFIX "_pbx-1"
# define XFERSUFFIX "_xfer-1"
# define medredis_check_reply(c on, c md, reply, err) do { \
if ( ! ( reply ) & & ! ( con ) - > ctx ) { \
# define medredis_check_reply(c md, reply, err) do { \
if ( ! ( reply ) & & ( ! ( con ) | | ! ( con ) - > ctx ) ) { \
L_ERROR ( " Failed to perform redis query '%s': no connection to server \n " , \
( cmd ) ) ; \
goto err ; \
@ -122,7 +122,7 @@ static void medredis_free_command(gpointer data) {
}
/**********************************************************************/
static int medredis_push_command ( medredis_con_t * con , size_t argc , char * * argv ) {
static int medredis_push_command ( size_t argc , char * * argv ) {
medredis_command_t * cmd = NULL ;
@ -156,7 +156,7 @@ err:
}
/**********************************************************************/
static redisReply * medredis_command ( medredis_con_t * con , const char * cmd ) {
static redisReply * medredis_command ( const char * cmd ) {
L_DEBUG ( " Performing redis query '%s' \n " , cmd ) ;
redisReply * reply = redisCommand ( con - > ctx , cmd ) ;
if ( con - > ctx - > err = = REDIS_ERR_EOF ) {
@ -171,10 +171,10 @@ static redisReply *medredis_command(medredis_con_t *con, const char* cmd) {
}
/**********************************************************************/
static int medredis_append_command_argv ( medredis_con_t * con , size_t argc , char * * argv , int queue ) {
static int medredis_append_command_argv ( size_t argc , char * * argv , int queue ) {
int ret ;
if ( queue > 0 & & medredis_push_command ( con, argc, argv ) ! = 0 ) {
if ( queue > 0 & & medredis_push_command ( argc, argv ) ! = 0 ) {
L_ERROR ( " Failed to queue redis command \n " ) ;
return - 1 ;
}
@ -190,7 +190,7 @@ static int medredis_append_command_argv(medredis_con_t *con, size_t argc, char *
if ( medredis_init ( ) ! = 0 ) {
L_ERROR ( " Failed to reconnect to redis db \n " ) ;
medredis_cleanup ( ) ;
return ret ;
return - 1 ;
}
ret = redisAppendCommandArgv ( con - > ctx , argc , ( const char * * ) argv , NULL ) ;
}
@ -201,7 +201,7 @@ static int medredis_append_command_argv(medredis_con_t *con, size_t argc, char *
}
/**********************************************************************/
static int medredis_get_reply ( medredis_con_t * con , redisReply * * reply ) {
static int medredis_get_reply ( redisReply * * reply ) {
int ret ;
medredis_command_t * cmd ;
@ -218,12 +218,13 @@ static int medredis_get_reply(medredis_con_t *con, redisReply **reply) {
if ( medredis_init ( ) ! = 0 ) {
L_ERROR ( " Failed to reconnect to redis db \n " ) ;
medredis_cleanup ( ) ;
return - 1 ;
}
// take commands from oldest to newest and re-do again,
// but don't queue them once again in retry-mode
while ( ( cmd = g_queue_pop_head ( con - > command_queue ) ) ) {
L_DEBUG ( " re-queueing appended command \n " ) ;
if ( medredis_append_command_argv ( c on, c md- > argc , cmd - > argv , 0 ) ! = 0 ) {
if ( medredis_append_command_argv ( c md- > argc , cmd - > argv , 0 ) ! = 0 ) {
L_ERROR ( " Failed to re-queue redis command " ) ;
return - 1 ;
}
@ -244,7 +245,7 @@ static int medredis_get_reply(medredis_con_t *con, redisReply **reply) {
/**********************************************************************/
static void medredis_consume_replies ( medredis_con_t * con ) {
static void medredis_consume_replies ( void ) {
medredis_command_t * cmd ;
redisReply * reply ;
while ( con - > append_counter > 0 ) {
@ -299,16 +300,16 @@ int medredis_init() {
if ( config_redis_pass ) {
reply = redisCommand ( con - > ctx , " AUTH %s " , config_redis_pass ) ;
medredis_check_reply ( con , " AUTH " , reply , err ) ;
medredis_check_reply ( " AUTH " , reply , err ) ;
medredis_free_reply ( & reply ) ;
}
reply = redisCommand ( con - > ctx , " PING " ) ;
medredis_check_reply ( con , " PING " , reply , err ) ;
medredis_check_reply ( " PING " , reply , err ) ;
medredis_free_reply ( & reply ) ;
reply = redisCommand ( con - > ctx , " SELECT %i " , config_redis_db ) ;
medredis_check_reply ( con , " SELECT " , reply , err ) ;
medredis_check_reply ( " SELECT " , reply , err ) ;
medredis_free_reply ( & reply ) ;
L_DEBUG ( " Redis connection opened to %s:%d/%d \n " ,
@ -370,8 +371,8 @@ med_callid_t *medredis_fetch_callids(uint64_t *count) {
do {
snprintf ( buffer , sizeof ( buffer ) , cmd , cursor ) ;
reply = medredis_command ( con, buffer) ;
medredis_check_reply ( con, buffer, reply , err ) ;
reply = medredis_command ( buffer) ;
medredis_check_reply ( buffer, reply , err ) ;
if ( reply - > type ! = REDIS_REPLY_ARRAY ) {
L_ERROR ( " Invalid reply type for scan, expected array \n " ) ;
@ -486,7 +487,7 @@ static void medredis_append_key(gpointer data, gpointer user_data) {
entry_argv [ 9 ] = " dst_leg " ;
if ( medredis_append_command_argv ( con, entry_argc, entry_argv , 1 ) ! = 0 ) {
if ( medredis_append_command_argv ( entry_argc, entry_argv , 1 ) ! = 0 ) {
L_ERROR ( " Failed to append command to fetch key \n " ) ;
}
free ( key ) ;
@ -568,7 +569,7 @@ int medredis_fetch_records(med_callid_t *callid,
char * cid = cids [ i ] ;
snprintf ( buffer , sizeof ( buffer ) , " acc:cid::%s " , cid ) ;
cid_set_argv [ 1 ] = buffer ;
if ( medredis_append_command_argv ( c on, c id_set_argc, cid_set_argv , 1 ) ! = 0 ) {
if ( medredis_append_command_argv ( c id_set_argc, cid_set_argv , 1 ) ! = 0 ) {
L_ERROR ( " Failed to append redis command to fetch entries for cid '%s' \n " , callid - > value ) ;
goto err ;
}
@ -576,11 +577,11 @@ int medredis_fetch_records(med_callid_t *callid,
}
for ( i = 0 ; i < 3 ; + + i ) {
if ( medredis_get_reply ( con , & reply ) ! = 0 ) {
if ( medredis_get_reply ( & reply ) ! = 0 ) {
L_ERROR ( " Failed to get redis reply for command to fetch entries for cid '%s' \n " , callid - > value ) ;
goto err ;
}
medredis_check_reply ( con , " smembers for cid " , reply , err ) ;
medredis_check_reply ( " smembers for cid " , reply , err ) ;
medredis_dump_reply ( reply ) ;
if ( reply - > type ! = REDIS_REPLY_ARRAY ) {
@ -612,13 +613,13 @@ int medredis_fetch_records(med_callid_t *callid,
do {
med_entry_t * e ;
L_DEBUG ( " Fetching next reply record \n " ) ;
if ( medredis_get_reply ( con , & reply ) ! = 0 ) {
if ( medredis_get_reply ( & reply ) ! = 0 ) {
L_ERROR ( " Failed to get reply from redis \n " ) ;
goto err ;
}
if ( ! reply )
break ;
medredis_check_reply ( con , " get reply " , reply , err ) ;
medredis_check_reply ( " get reply " , reply , err ) ;
medredis_dump_reply ( reply ) ;
e = medredis_reply_to_entry ( reply ) ;
@ -669,7 +670,7 @@ int medredis_fetch_records(med_callid_t *callid,
g_list_free ( records ) ;
g_list_free ( keys ) ;
medredis_consume_replies ( con ) ;
medredis_consume_replies ( ) ;
return 0 ;
@ -684,7 +685,7 @@ err:
}
if ( * entries )
free ( * entries ) ;
medredis_consume_replies ( con ) ;
medredis_consume_replies ( ) ;
return - 1 ;
@ -709,7 +710,7 @@ static int medredis_cleanup_entries(med_entry_t *records, uint64_t count, const
snprintf ( buffer , sizeof ( buffer ) , " acc:cid::%s " , e - > callid ) ;
argv [ 0 ] = " DEL " ;
argv [ 1 ] = buffer ;
if ( medredis_append_command_argv ( con , 2 , argv , 1 ) ! = 0 ) {
if ( medredis_append_command_argv ( 2 , argv , 1 ) ! = 0 ) {
L_ERROR ( " Failed to append redis command to remove key '%s' \n " , buffer ) ;
goto err ;
}
@ -719,12 +720,12 @@ static int medredis_cleanup_entries(med_entry_t *records, uint64_t count, const
argv [ 1 ] = " acc:meth::INVITE " ;
snprintf ( buffer , sizeof ( buffer ) , " acc:entry::%s:%f " , e - > callid , e - > unix_timestamp ) ;
argv [ 2 ] = buffer ;
if ( medredis_append_command_argv ( con , 3 , argv , 1 ) ! = 0 ) {
if ( medredis_append_command_argv ( 3 , argv , 1 ) ! = 0 ) {
L_ERROR ( " Failed to append redis command to remove key '%s' from '%s' \n " , buffer , argv [ 1 ] ) ;
goto err ;
}
argv [ 1 ] = " acc:meth::BYE " ;
if ( medredis_append_command_argv ( con , 3 , argv , 1 ) ! = 0 ) {
if ( medredis_append_command_argv ( 3 , argv , 1 ) ! = 0 ) {
L_ERROR ( " Failed to append redis command to remove key '%s' from '%s' \n " , buffer , argv [ 1 ] ) ;
goto err ;
}
@ -732,17 +733,17 @@ static int medredis_cleanup_entries(med_entry_t *records, uint64_t count, const
// delete acc:entry::$cid:$timestamp
argv [ 0 ] = " DEL " ;
argv [ 1 ] = buffer ;
if ( medredis_append_command_argv ( con , 2 , argv , 1 ) ! = 0 ) {
if ( medredis_append_command_argv ( 2 , argv , 1 ) ! = 0 ) {
L_ERROR ( " Failed to append redis command to remove key '%s' \n " , buffer ) ;
goto err ;
}
}
medredis_consume_replies ( con ) ;
medredis_consume_replies ( ) ;
return 0 ;
err :
medredis_consume_replies ( con ) ;
medredis_consume_replies ( ) ;
return - 1 ;
}
@ -754,4 +755,4 @@ int medredis_trash_entries(med_entry_t *records, uint64_t count) {
/**********************************************************************/
int medredis_backup_entries ( med_entry_t * records , uint64_t count ) {
return medredis_cleanup_entries ( records , count , " backup " ) ;
}
}