@ -151,7 +151,7 @@ static void *tps_processing_function(void *data)
{
struct ast_taskprocessor_listener * listener = data ;
struct ast_taskprocessor * tps = listener - > tps ;
struct default_taskprocessor_listener_pvt * pvt = listener - > private _data;
struct default_taskprocessor_listener_pvt * pvt = listener - > user _data;
int dead = 0 ;
while ( ! dead ) {
@ -162,23 +162,9 @@ static void *tps_processing_function(void *data)
return NULL ;
}
static void * default_listener_alloc ( struct ast_taskprocessor_listener * listener )
{
struct default_taskprocessor_listener_pvt * pvt ;
pvt = ast_calloc ( 1 , sizeof ( * pvt ) ) ;
if ( ! pvt ) {
return NULL ;
}
ast_cond_init ( & pvt - > cond , NULL ) ;
ast_mutex_init ( & pvt - > lock ) ;
pvt - > poll_thread = AST_PTHREADT_NULL ;
return pvt ;
}
static int default_listener_start ( struct ast_taskprocessor_listener * listener )
{
struct default_taskprocessor_listener_pvt * pvt = listener - > private _data;
struct default_taskprocessor_listener_pvt * pvt = listener - > user_data ;
if ( ast_pthread_create ( & pvt - > poll_thread , NULL , tps_processing_function , listener ) ) {
return - 1 ;
@ -189,41 +175,33 @@ static int default_listener_start(struct ast_taskprocessor_listener *listener)
static void default_task_pushed ( struct ast_taskprocessor_listener * listener , int was_empty )
{
struct default_taskprocessor_listener_pvt * pvt = listener - > private _data;
struct default_taskprocessor_listener_pvt * pvt = listener - > user _data;
if ( was_empty ) {
default_tps_wake_up ( pvt , 0 ) ;
}
}
static void default_ emptied( struct ast_taskprocessor_listener * listener )
static void default_ listener_pvt_destroy( struct default_taskprocessor_listener_pvt * pvt )
{
/* No-op */
ast_mutex_destroy ( & pvt - > lock ) ;
ast_cond_destroy ( & pvt - > cond ) ;
ast_free ( pvt ) ;
}
static void default_listener_shutdown ( struct ast_taskprocessor_listener * listener )
{
struct default_taskprocessor_listener_pvt * pvt = listener - > private _data;
struct default_taskprocessor_listener_pvt * pvt = listener - > user _data;
default_tps_wake_up ( pvt , 1 ) ;
pthread_join ( pvt - > poll_thread , NULL ) ;
pvt - > poll_thread = AST_PTHREADT_NULL ;
}
static void default_listener_destroy ( void * obj )
{
struct default_taskprocessor_listener_pvt * pvt = obj ;
ast_mutex_destroy ( & pvt - > lock ) ;
ast_cond_destroy ( & pvt - > cond ) ;
ast_free ( pvt ) ;
default_listener_pvt_destroy ( pvt ) ;
}
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
. alloc = default_listener_alloc ,
. start = default_listener_start ,
. task_pushed = default_task_pushed ,
. emptied = default_emptied ,
. shutdown = default_listener_shutdown ,
. destroy = default_listener_destroy ,
} ;
/*!
@ -474,33 +452,41 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
return tps - > name ;
}
static void listener_destroy ( void * obj )
{
struct ast_taskprocessor_listener * listener = obj ;
listener - > callbacks - > destroy ( listener - > private_data ) ;
}
static void listener_shutdown ( struct ast_taskprocessor_listener * listener )
{
listener - > callbacks - > shutdown ( listener ) ;
ao2_ref ( listener - > tps , - 1 ) ;
}
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc ( const struct ast_taskprocessor_listener_callbacks * callbacks )
struct ast_taskprocessor_listener * ast_taskprocessor_listener_alloc ( const struct ast_taskprocessor_listener_callbacks * callbacks , void * user_data )
{
RAII_VAR ( struct ast_taskprocessor_listener * , listener ,
ao2_alloc ( sizeof ( * listener ) , listener_destroy ) , ao2_cleanup ) ;
ao2_alloc ( sizeof ( * listener ) , NULL ) , ao2_cleanup ) ;
if ( ! listener ) {
return NULL ;
}
listener - > callbacks = callbacks ;
listener - > user_data = user_data ;
ao2_ref ( listener , + 1 ) ;
return listener ;
}
static void * default_listener_pvt_alloc ( void )
{
struct default_taskprocessor_listener_pvt * pvt ;
pvt = ast_calloc ( 1 , sizeof ( * pvt ) ) ;
if ( ! pvt ) {
return NULL ;
}
ast_cond_init ( & pvt - > cond , NULL ) ;
ast_mutex_init ( & pvt - > lock ) ;
pvt - > poll_thread = AST_PTHREADT_NULL ;
return pvt ;
}
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
* create the taskprocessor if we were told via ast_tps_options to return a reference only
* if it already exists */
@ -508,6 +494,7 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
{
struct ast_taskprocessor * p ;
struct ast_taskprocessor_listener * listener ;
struct default_taskprocessor_listener_pvt * pvt ;
if ( ast_strlen_zero ( name ) ) {
ast_log ( LOG_ERROR , " requesting a nameless taskprocessor!!! \n " ) ;
@ -522,13 +509,19 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
return NULL ;
}
/* Create a new taskprocessor. Start by creating a default listener */
listener = ast_taskprocessor_listener_alloc ( & default_listener_callbacks ) ;
pvt = default_listener_pvt_alloc ( ) ;
if ( ! pvt ) {
return NULL ;
}
listener = ast_taskprocessor_listener_alloc ( & default_listener_callbacks , pvt ) ;
if ( ! listener ) {
default_listener_pvt_destroy ( pvt ) ;
return NULL ;
}
p = ast_taskprocessor_create_with_listener ( name , listener ) ;
if ( ! p ) {
default_listener_pvt_destroy ( pvt ) ;
ao2_ref ( listener , - 1 ) ;
return NULL ;
}
@ -565,14 +558,6 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
ao2_ref ( p , + 1 ) ;
listener - > tps = p ;
/* Allocation of private data must come after setting taskprocessor parameters
* so that listeners who rely on taskprocessor data will have access to it .
*/
listener - > private_data = listener - > callbacks - > alloc ( listener ) ;
if ( ! listener - > private_data ) {
return NULL ;
}
if ( ! ( ao2_link ( tps_singletons , p ) ) ) {
ast_log ( LOG_ERROR , " Failed to add taskprocessor '%s' to container \n " , p - > name ) ;
return NULL ;
@ -656,7 +641,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
}
ao2_unlock ( tps ) ;
if ( size = = 0 ) {
if ( size = = 0 & & tps - > listener - > callbacks - > emptied ) {
tps - > listener - > callbacks - > emptied ( tps - > listener ) ;
return 0 ;
}