@ -28,6 +28,7 @@
# include "asterisk/res_pjsip.h"
# include "include/res_pjsip_private.h"
# include "asterisk/res_pjsip_cli.h"
# include "asterisk/taskprocessor.h"
# define TASK_BUCKETS 53
@ -36,30 +37,30 @@ static struct ao2_container *tasks;
static int task_count ;
struct ast_sip_sched_task {
/*! ast_sip_sched task id */
uint32_t task_id ;
/*! ast_sched scheudler id */
int current_scheduler_id ;
/*! task is currently running */
int is_running ;
/*! task */
ast_sip_task task ;
/*! The serializer to be used (if any) (Holds a ref) */
struct ast_taskprocessor * serializer ;
/*! task data */
void * task_data ;
/*! reschedule interval in milliseconds */
int interval ;
/*! the time the task was queued */
/*! task function */
ast_sip_task task ;
/*! the time the task was originally scheduled/queued */
struct timeval when_queued ;
/*! the last time the task was started */
struct timeval last_start ;
/*! the last time the task was ended */
struct timeval last_end ;
/*! When the periodic task is next expected to run */
struct timeval next_periodic ;
/*! reschedule interval in milliseconds */
int interval ;
/*! ast_sched scheudler id */
int current_scheduler_id ;
/*! task is currently running */
int is_running ;
/*! times run */
int run_count ;
/*! the task reschedule, cleanup and policy flags */
enum ast_sip_scheduler_task_flags flags ;
/*! the serializer to be used (if any) */
struct ast_taskprocessor * serializer ;
/*! A name to be associated with the task */
char name [ 0 ] ;
} ;
@ -76,14 +77,19 @@ static int push_to_serializer(const void *data);
*/
static int run_task ( void * data )
{
RAII_VAR ( struct ast_sip_sched_task * , schtd , ao2_bump ( data ) , ao2_cleanup ) ;
RAII_VAR ( struct ast_sip_sched_task * , schtd , data , ao2_cleanup ) ;
int res ;
int delay ;
if ( ! schtd - > interval ) {
/* Task was cancelled while waiting to be executed by the serializer */
return - 1 ;
}
ao2_lock ( schtd ) ;
schtd - > last_start = ast_tvnow ( ) ;
schtd - > is_running = 1 ;
schtd - > run_count + + ;
+ + schtd - > run_count ;
ao2_unlock ( schtd ) ;
res = schtd - > task ( schtd - > task_data ) ;
@ -93,10 +99,10 @@ static int run_task(void *data)
schtd - > last_end = ast_tvnow ( ) ;
/*
* Don ' t restart if the task returned 0 or if the interval
* Don ' t restart if the task returned < = 0 or if the interval
* was set to 0 while the task was running
*/
if ( ! res | | ! schtd - > interval ) {
if ( res < = 0 | | ! schtd - > interval ) {
schtd - > interval = 0 ;
ao2_unlock ( schtd ) ;
ao2_unlink ( tasks , schtd ) ;
@ -110,13 +116,22 @@ static int run_task(void *data)
if ( schtd - > flags & AST_SIP_SCHED_TASK_DELAY ) {
delay = schtd - > interval ;
} else {
delay = schtd - > interval - ( ast_tvdiff_ms ( schtd - > last_end , schtd - > last_start ) % schtd - > interval ) ;
int64_t diff ;
/* Determine next periodic interval we need to expire. */
do {
schtd - > next_periodic = ast_tvadd ( schtd - > next_periodic ,
ast_samp2tv ( schtd - > interval , 1000 ) ) ;
diff = ast_tvdiff_ms ( schtd - > next_periodic , schtd - > last_end ) ;
} while ( diff < = 0 ) ;
delay = diff ;
}
schtd - > current_scheduler_id = ast_sched_add ( scheduler_context , delay , push_to_serializer , schtd ) ;
if ( schtd - > current_scheduler_id < 0 ) {
schtd - > interval = 0 ;
ao2_unlock ( schtd ) ;
ast_log ( LOG_ERROR , " Sched %p: Failed to reschedule task %s \n " , schtd , schtd - > name ) ;
ao2_unlink ( tasks , schtd ) ;
return - 1 ;
}
@ -133,9 +148,29 @@ static int run_task(void *data)
static int push_to_serializer ( const void * data )
{
struct ast_sip_sched_task * schtd = ( struct ast_sip_sched_task * ) data ;
int sched_id ;
ao2_lock ( schtd ) ;
sched_id = schtd - > current_scheduler_id ;
schtd - > current_scheduler_id = - 1 ;
ao2_unlock ( schtd ) ;
if ( sched_id < 0 ) {
/* Task was cancelled while waiting on the lock */
return 0 ;
}
ao2_t_ref ( schtd , + 1 , " Give ref to run_task() " ) ;
if ( ast_sip_push_task ( schtd - > serializer , run_task , schtd ) ) {
ao2_ref ( schtd , - 1 ) ;
/*
* Oh my . Have to cancel the scheduled item because we
* unexpectedly cannot run it anymore .
*/
ao2_unlink ( tasks , schtd ) ;
ao2_lock ( schtd ) ;
schtd - > interval = 0 ;
ao2_unlock ( schtd ) ;
ao2_t_ref ( schtd , - 1 , " Failed so release run_task() ref " ) ;
}
return 0 ;
@ -144,20 +179,22 @@ static int push_to_serializer(const void *data)
int ast_sip_sched_task_cancel ( struct ast_sip_sched_task * schtd )
{
int res ;
int sched_id ;
if ( ! ao2_ref_and_lock ( schtd ) ) {
return - 1 ;
}
/*
* Prevent any tasks in the serializer queue from
* running and restarting the scheduled item on us
* first .
*/
ao2_lock ( schtd ) ;
schtd - > interval = 0 ;
if ( schtd - > current_scheduler_id < 0 | | schtd - > interval < = 0 ) {
ao2_unlock_and_unref ( schtd ) ;
return 0 ;
}
sched_id = schtd - > current_scheduler_id ;
schtd - > current_scheduler_id = - 1 ;
ao2_unlock ( schtd ) ;
res = ast_sched_del ( scheduler_context , sched_id ) ;
schtd - > interval = 0 ;
ao2_unlock_and_unref ( schtd ) ;
ao2_unlink ( tasks , schtd ) ;
res = ast_sched_del ( scheduler_context , schtd - > current_scheduler_id ) ;
return res ;
}
@ -306,7 +343,7 @@ int ast_sip_sched_is_task_running_by_name(const char *name)
return is_running ;
}
static void schtd_d estruc tor( void * data )
static void schtd_d tor( void * data )
{
struct ast_sip_sched_task * schtd = data ;
@ -316,6 +353,7 @@ static void schtd_destructor(void *data)
} else if ( schtd - > task_data & & ( schtd - > flags & AST_SIP_SCHED_TASK_DATA_FREE ) ) {
ast_free ( schtd - > task_data ) ;
}
ast_taskprocessor_unreference ( schtd - > serializer ) ;
}
struct ast_sip_sched_task * ast_sip_schedule_task ( struct ast_taskprocessor * serializer ,
@ -326,38 +364,60 @@ struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *seria
struct ast_sip_sched_task * schtd ;
int res ;
if ( interval < 0 ) {
if ( interval < = 0 ) {
return NULL ;
}
schtd = ao2_alloc ( ( sizeof ( * schtd ) + ( ! ast_strlen_zero ( name ) ? strlen ( name ) : ID_LEN ) + 1 ) , schtd_destructor ) ;
schtd = ao2_alloc ( ( sizeof ( * schtd ) + ( ! ast_strlen_zero ( name ) ? strlen ( name ) : ID_LEN ) + 1 ) ,
schtd_dtor ) ;
if ( ! schtd ) {
return NULL ;
}
schtd - > task_id = ast_atomic_fetchadd_int ( & task_count , 1 ) ;
schtd - > serializer = serializer ;
schtd - > serializer = ao2_bump ( serializer ) ;
schtd - > task_data = task_data ;
schtd - > task = sip_task ;
schtd - > interval = interval ;
schtd - > flags = flags ;
if ( ! ast_strlen_zero ( name ) ) {
strcpy ( schtd - > name , name ) ; /* Safe */
} else {
sprintf ( schtd - > name , " task_%08x " , schtd - > task_id ) ;
uint32_t task_id ;
task_id = ast_atomic_fetchadd_int ( & task_count , 1 ) ;
sprintf ( schtd - > name , " task_%08x " , task_id ) ;
}
schtd - > task_data = task_data ;
schtd - > flags = flags ;
schtd - > interval = interval ;
schtd - > when_queued = ast_tvnow ( ) ;
if ( ! ( schtd - > flags & AST_SIP_SCHED_TASK_DELAY ) ) {
schtd - > next_periodic = ast_tvadd ( schtd - > when_queued ,
ast_samp2tv ( schtd - > interval , 1000 ) ) ;
}
if ( flags & AST_SIP_SCHED_TASK_DATA_AO2 ) {
ao2_ref ( task_data , + 1 ) ;
}
/*
* We must put it in the ' tasks ' container before scheduling
* the task because we don ' t want the push_to_serializer ( )
* sched task to " remove " it on failure before we even put
* it in . If this happens then nothing would remove it from
* the ' tasks ' container .
*/
ao2_link ( tasks , schtd ) ;
/*
* Lock so we are guaranteed to get the sched id set before
* the push_to_serializer ( ) sched task can clear it .
*/
ao2_lock ( schtd ) ;
res = ast_sched_add ( scheduler_context , interval , push_to_serializer , schtd ) ;
schtd - > current_scheduler_id = res ;
ao2_unlock ( schtd ) ;
if ( res < 0 ) {
ao2_unlink ( tasks , schtd ) ;
ao2_ref ( schtd , - 1 ) ;
return NULL ;
} else {
schtd - > current_scheduler_id = res ;
ao2_link ( tasks , schtd ) ;
}
return schtd ;
@ -457,7 +517,8 @@ static struct ast_cli_entry cli_commands[] = {
int ast_sip_initialize_scheduler ( void )
{
if ( ! ( scheduler_context = ast_sched_context_create ( ) ) ) {
scheduler_context = ast_sched_context_create ( ) ;
if ( ! scheduler_context ) {
ast_log ( LOG_ERROR , " Failed to create scheduler. Aborting load \n " ) ;
return - 1 ;
}
@ -487,7 +548,21 @@ int ast_sip_destroy_scheduler(void)
ast_cli_unregister_multiple ( cli_commands , ARRAY_LEN ( cli_commands ) ) ;
if ( scheduler_context ) {
if ( tasks ) {
struct ao2_iterator iter ;
struct ast_sip_sched_task * schtd ;
/* Cancel all scheduled tasks */
iter = ao2_iterator_init ( tasks , 0 ) ;
while ( ( schtd = ao2_iterator_next ( & iter ) ) ) {
ast_sip_sched_task_cancel ( schtd ) ;
ao2_ref ( schtd , - 1 ) ;
}
ao2_iterator_destroy ( & iter ) ;
}
ast_sched_context_destroy ( scheduler_context ) ;
scheduler_context = NULL ;
}
ao2_cleanup ( tasks ) ;