@ -115,7 +115,13 @@ AST_THREADSTORAGE(escape_table_buf);
AST_THREADSTORAGE ( escape_column_buf ) ;
AST_THREADSTORAGE ( escape_value_buf ) ;
static int realtime_sqlite3_execute_handle ( struct realtime_sqlite3_db * db , const char * sql , int ( * callback ) ( void * , int , char * * , char * * ) , void * arg , int sync ) ;
typedef int ( * callback_t ) ( void * , int , char * * , char * * ) ;
static int realtime_sqlite3_exec_query_with_handle ( struct realtime_sqlite3_db * , const char * , callback_t , void * ) ;
static int realtime_sqlite3_exec_query ( const char * , const char * , callback_t , void * ) ;
static int realtime_sqlite3_exec_update_with_handle ( struct realtime_sqlite3_db * , const char * ) ;
static int realtime_sqlite3_exec_update ( const char * , const char * ) ;
void db_start_batch ( struct realtime_sqlite3_db * db ) ;
void db_stop_batch ( struct realtime_sqlite3_db * db ) ;
@ -303,20 +309,20 @@ static void *db_sync_thread(void *data)
{
struct realtime_sqlite3_db * db = data ;
ao2_lock ( db ) ;
realtime_sqlite3_exec ut e_handle( db , " BEGIN TRANSACTION " , NULL , NULL , 0 ) ;
realtime_sqlite3_exec _q uery_with _handle( db , " BEGIN TRANSACTION " , NULL , NULL ) ;
for ( ; ; ) {
if ( ! db - > wakeup ) {
ast_cond_wait ( & db - > cond , ao2_object_get_lockaddr ( db ) ) ;
}
db - > wakeup = 0 ;
if ( realtime_sqlite3_exec ut e_handle( db , " COMMIT " , NULL , NULL , 0 ) < 0 ) {
realtime_sqlite3_exec ut e_handle( db , " ROLLBACK " , NULL , NULL , 0 ) ;
if ( realtime_sqlite3_exec _q uery_with _handle( db , " COMMIT " , NULL , NULL ) < 0 ) {
realtime_sqlite3_exec _q uery_with _handle( db , " ROLLBACK " , NULL , NULL ) ;
}
if ( db - > exiting ) {
ao2_unlock ( db ) ;
break ;
}
realtime_sqlite3_exec ut e_handle( db , " BEGIN TRANSACTION " , NULL , NULL , 0 ) ;
realtime_sqlite3_exec _q uery_with _handle( db , " BEGIN TRANSACTION " , NULL , NULL ) ;
ao2_unlock ( db ) ;
usleep ( 1000 * db - > batch ) ;
ao2_lock ( db ) ;
@ -527,18 +533,125 @@ struct cfg_entry_args {
const char * who_asked ;
} ;
/*! Exeute an SQL statement given the database object
/*!
* Structure passed to row counting SQLite callback .
*/
struct row_counter_args {
callback_t wrapped_callback ;
void * wrapped_arg ;
int row_count ;
} ;
/*!
* \ internal
* \ brief SQLite3 callback that counts rows of a result set .
*
* \ details
* This is used to decorate existing callbacks so that we can count the number
* of rows returned from a SELECT statement and still process each row
* independently .
*
* \ param data user data pointer passed in via sqlite3_exec ( )
* \ param num_columns number of columns in the result
* \ param values array of pointers to column values
* \ param columns array of pointers of to column names
*
* \ return the return value of the wrapped callback , or 0 if no wrapped callback
* is provided .
*/
static int row_counter_wrapper ( void * arg , int num_columns , char * * values , char * * columns )
{
struct row_counter_args * wrapped = arg ;
wrapped - > row_count + + ;
if ( wrapped - > wrapped_callback ) {
return wrapped - > wrapped_callback ( wrapped - > wrapped_arg , num_columns , values , columns ) ;
}
return 0 ;
}
/*!
* \ internal
* \ brief Execute a SQL SELECT statement using a database handle
*
* \ param db the database handle to use for the query
* \ param sql the SQL statement to execute
* \ param callback a user defined callback that will be called for each row of
* the result set
* \ param arg data to be passed to the user defined callback
*
* \ return if successful , the number of rows returned from the provided SELECT
* statement . - 1 on failure .
*/
static int realtime_sqlite3_exec_query_with_handle ( struct realtime_sqlite3_db * db , const char * sql , callback_t callback , void * arg )
{
int res = 0 ;
char * errmsg ;
struct row_counter_args wrapper = {
. wrapped_callback = callback ,
. wrapped_arg = arg ,
. row_count = 0 ,
} ;
ao2_lock ( db ) ;
if ( sqlite3_exec ( db - > handle , sql , row_counter_wrapper , & wrapper , & errmsg ) ! = SQLITE_OK ) {
ast_log ( LOG_WARNING , " Could not execute '%s': %s \n " , sql , errmsg ) ;
sqlite3_free ( errmsg ) ;
res = - 1 ;
}
ao2_unlock ( db ) ;
return res = = 0 ? wrapper . row_count : res ;
}
/*!
* \ internal
* \ brief Execute a SQL SELECT statement on the specified database
*
* \ param database the name of the database to query
* \ param sql the SQL statement to execute
* \ param callback a user defined callback that will be called for each row of
* the result set
* \ param arg data to be passed to the user defined callback
*
* \ return if successful , the number of rows returned from the provided SELECT
* statement . - 1 on failure .
*/
static int realtime_sqlite3_exec_query ( const char * database , const char * sql , callback_t callback , void * arg )
{
struct realtime_sqlite3_db * db ;
int res ;
if ( ! ( db = find_database ( database ) ) ) {
ast_log ( LOG_WARNING , " Could not find database: %s \n " , database ) ;
return - 1 ;
}
res = realtime_sqlite3_exec_query_with_handle ( db , sql , callback , arg ) ;
ao2_ref ( db , - 1 ) ;
return res ;
}
/*!
* \ internal
* \ brief Execute a SQL INSERT / UPDATE / DELETE statement using a database handle
*
* \ retval - 1 ERROR
* \ retval > - 1 Number of rows changed
* \ note A database sync operation is always performed after a statement
* is executed .
*
* \ param db the database handle to use for the query
* \ param sql the SQL statement to execute
*
* \ return if successful , the number of rows modified by the provided SQL
* statement . - 1 on failure .
*/
static int realtime_sqlite3_execute_handle ( struct realtime_sqlite3_db * db , const char * sql , int ( * callback ) ( void * , int , char * * , char * * ) , void * arg , int sync )
static int realtime_sqlite3_exec _ upda te_with _handle( struct realtime_sqlite3_db * db , const char * sql )
{
int res = 0 ;
char * errmsg ;
ao2_lock ( db ) ;
if ( sqlite3_exec ( db - > handle , sql , callback , arg , & errmsg ) ! = SQLITE_OK ) {
if ( sqlite3_exec ( db - > handle , sql , NULL , NULL , & errmsg ) ! = SQLITE_OK ) {
ast_log ( LOG_WARNING , " Could not execute '%s': %s \n " , sql , errmsg ) ;
sqlite3_free ( errmsg ) ;
res = - 1 ;
@ -547,19 +660,25 @@ static int realtime_sqlite3_execute_handle(struct realtime_sqlite3_db *db, const
}
ao2_unlock ( db ) ;
if ( sync ) {
db_sync ( db ) ;
}
db_sync ( db ) ;
return res ;
}
/*! Exeute an SQL statement give the database name
/*!
* \ internal
* \ brief Execute a SQL INSERT / UPDATE / DELETE statement using a database handle
*
* \ note A database sync operation is always performed after a statement
* is executed .
*
* \ param database the name of the database to query
* \ param sql the SQL statement to execute
*
* \ retval - 1 ERROR
* \ retval > - 1 Number of rows changed
* \ return if successful , the number of rows modified by the provided SQL
* statement . - 1 on failure .
*/
static int realtime_sqlite3_execute ( const char * database , const char * sql , int ( * callback ) ( void * , int , char * * , char * * ) , void * arg , int sync )
static int realtime_sqlite3_exec _ upda te( const char * database , const char * sql )
{
struct realtime_sqlite3_db * db ;
int res ;
@ -569,7 +688,7 @@ static int realtime_sqlite3_execute(const char *database, const char *sql, int (
return - 1 ;
}
res = realtime_sqlite3_exec ute_handle( db , sql , callback , arg , sync ) ;
res = realtime_sqlite3_exec _ upda te_with _handle( db , sql ) ;
ao2_ref ( db , - 1 ) ;
return res ;
@ -653,7 +772,7 @@ static struct ast_config *realtime_sqlite3_load(const char *database, const char
args . flags = flags ;
args . who_asked = who_asked ;
realtime_sqlite3_exec ut e( database , sql , static_realtime_cb , & args , 0 ) ;
realtime_sqlite3_exec _q uery ( database , sql , static_realtime_cb , & args ) ;
sqlite3_free ( sql ) ;
@ -691,7 +810,7 @@ static int realtime_sqlite3_helper(const char *database, const char *table, cons
ast_str_append ( & sql , 0 , " %s " , " LIMIT 1 " ) ;
}
if ( realtime_sqlite3_exec ut e( database , ast_str_buffer ( sql ) , is_multi ? append_row_to_cfg : row_to_varlist , arg , 0 ) < 0 ) {
if ( realtime_sqlite3_exec _q uery ( database , ast_str_buffer ( sql ) , is_multi ? append_row_to_cfg : row_to_varlist , arg ) < 0 ) {
ast_free ( sql ) ;
return - 1 ;
}
@ -762,7 +881,7 @@ static int realtime_sqlite3_update(const char *database, const char *table, cons
ast_str_append ( & sql , 0 , " WHERE %s %s " , sqlite3_escape_column_op ( keyfield ) , sqlite3_escape_value ( entity ) ) ;
res = realtime_sqlite3_exec ute( database , ast_str_buffer ( sql ) , NULL , NULL , 1 ) ;
res = realtime_sqlite3_exec _ upda te( database , ast_str_buffer ( sql ) ) ;
ast_free ( sql ) ;
return res ;
@ -813,7 +932,7 @@ static int realtime_sqlite3_update2(const char *database, const char *table, con
ast_str_append ( & sql , 0 , " %s " , ast_str_buffer ( where_clause ) ) ;
res = realtime_sqlite3_exec ute( database , ast_str_buffer ( sql ) , NULL , NULL , 1 ) ;
res = realtime_sqlite3_exec _ upda te( database , ast_str_buffer ( sql ) ) ;
ast_free ( sql ) ;
ast_free ( where_clause ) ;
@ -857,7 +976,7 @@ static int realtime_sqlite3_store(const char *database, const char *table, const
ast_str_append ( & sql , 0 , " %s) " , ast_str_buffer ( values ) ) ;
res = realtime_sqlite3_exec ute( database , ast_str_buffer ( sql ) , NULL , NULL , 1 ) ;
res = realtime_sqlite3_exec _ upda te( database , ast_str_buffer ( sql ) ) ;
ast_free ( sql ) ;
ast_free ( values ) ;
@ -893,7 +1012,7 @@ static int realtime_sqlite3_destroy(const char *database, const char *table, con
}
}
res = realtime_sqlite3_exec ute( database , ast_str_buffer ( sql ) , NULL , NULL , 1 ) ;
res = realtime_sqlite3_exec _ upda te( database , ast_str_buffer ( sql ) ) ;
ast_free ( sql ) ;
@ -946,7 +1065,9 @@ static int handle_missing_table(struct realtime_sqlite3_db *db, const char *tabl
return - 1 ;
}
while ( ( column = va_arg ( ap , typeof ( column ) ) ) & & ( type = va_arg ( ap , typeof ( type ) ) ) & & ( sz = va_arg ( ap , typeof ( sz ) ) ) ) {
while ( ( column = va_arg ( ap , typeof ( column ) ) ) ) {
type = va_arg ( ap , typeof ( type ) ) ;
sz = va_arg ( ap , typeof ( sz ) ) ;
if ( first ) {
ast_str_set ( & sql , 0 , " CREATE TABLE IF NOT EXISTS %s (%s %s " , sqlite3_escape_table ( table ) ,
sqlite3_escape_column ( column ) , get_sqlite_column_type ( type ) ) ;
@ -958,7 +1079,7 @@ static int handle_missing_table(struct realtime_sqlite3_db *db, const char *tabl
ast_str_append ( & sql , 0 , " ) " ) ;
res = realtime_sqlite3_exec ute_handle( db , ast_str_buffer ( sql ) , NULL , NULL , 1 ) < 0 ? - 1 : 0 ;
res = realtime_sqlite3_exec _ upda te_with _handle( db , ast_str_buffer ( sql ) ) < 0 ? - 1 : 0 ;
ast_free ( sql ) ;
return res ;
@ -983,7 +1104,7 @@ static int handle_missing_column(struct realtime_sqlite3_db *db, const char *tab
return - 1 ;
}
if ( ! ( res = ( realtime_sqlite3_exec ute_handle( db , sql , NULL , NULL , 1 ) < 0 ? - 1 : 0 ) ) ) {
if ( ! ( res = ( realtime_sqlite3_exec _ upda te_with _handle( db , sql ) < 0 ? - 1 : 0 ) ) ) {
ast_log ( LOG_NOTICE , " Creating column '%s' type %s for table %s \n " , column , sqltype , table ) ;
}
@ -1061,7 +1182,7 @@ static int realtime_sqlite3_require(const char *database, const char *table, va_
return - 1 ;
}
if ( ( res = realtime_sqlite3_exec ut e_handle( db , sql , add_column_name , columns , 0 ) ) < 0 ) {
if ( ( res = realtime_sqlite3_exec _q uery_with _handle( db , sql , add_column_name , columns ) ) < 0 ) {
unref_db ( & db ) ;
ao2_ref ( columns , - 1 ) ;
sqlite3_free ( sql ) ;
@ -1077,8 +1198,10 @@ static int realtime_sqlite3_require(const char *database, const char *table, va_
sqlite3_free ( sql ) ;
while ( ( column = va_arg ( ap , typeof ( column ) ) ) & & ( type = va_arg ( ap , typeof ( type ) ) ) & & ( sz = va_arg ( ap , typeof ( sz ) ) ) ) {
while ( ( column = va_arg ( ap , typeof ( column ) ) ) ) {
char * found ;
type = va_arg ( ap , typeof ( type ) ) ;
sz = va_arg ( ap , typeof ( sz ) ) ;
if ( ! ( found = ao2_find ( columns , column , OBJ_POINTER | OBJ_UNLINK ) ) ) {
if ( handle_missing_column ( db , table , column , type , sz ) ) {
unref_db ( & db ) ;