@ -705,6 +705,12 @@ enum iax2_thread_type {
IAX_THREAD_TYPE_DYNAMIC ,
} ;
struct iax2_pkt_buf {
AST_LIST_ENTRY ( iax2_pkt_buf ) entry ;
size_t len ;
unsigned char buf [ 1 ] ;
} ;
struct iax2_thread {
AST_LIST_ENTRY ( iax2_thread ) list ;
enum iax2_thread_type type ;
@ -720,8 +726,10 @@ struct iax2_thread {
pthread_t threadid ;
int threadnum ;
struct sockaddr_in iosin ;
unsigned char buf [ 4096 ] ;
int iores ;
unsigned char readbuf [ 4096 ] ;
unsigned char * buf ;
size_t buf_len ;
size_t buf_size ;
int iofd ;
time_t checktime ;
ast_mutex_t lock ;
@ -736,6 +744,10 @@ struct iax2_thread {
unsigned char type ;
unsigned char csub ;
} ffinfo ;
/*! Queued up full frames for processing. If more full frames arrive for
* a call which this thread is already processing a full frame for , they
* are queued up here . */
AST_LIST_HEAD_NOLOCK ( , iax2_pkt_buf ) full_frames ;
} ;
/* Thread lists */
@ -6509,6 +6521,69 @@ static void save_osptoken(struct iax_frame *fr, struct iax_ies *ies)
ast_string_field_set ( iaxs [ fr - > callno ] , osptoken , full_osptoken ) ;
}
static int socket_process ( struct iax2_thread * thread ) ;
/*!
* \ brief Handle any deferred full frames for this thread
*/
static void handle_deferred_full_frames ( struct iax2_thread * thread )
{
struct iax2_pkt_buf * pkt_buf ;
ast_mutex_lock ( & thread - > lock ) ;
while ( ( pkt_buf = AST_LIST_REMOVE_HEAD ( & thread - > full_frames , entry ) ) ) {
ast_mutex_unlock ( & thread - > lock ) ;
thread - > buf = pkt_buf - > buf ;
thread - > buf_len = pkt_buf - > len ;
thread - > buf_size = pkt_buf - > len + 1 ;
socket_process ( thread ) ;
thread - > buf = NULL ;
ast_free ( pkt_buf ) ;
ast_mutex_lock ( & thread - > lock ) ;
}
ast_mutex_unlock ( & thread - > lock ) ;
}
/*!
* \ brief Queue the last read full frame for processing by a certain thread
*
* If there are already any full frames queued , they are sorted
* by sequence number .
*/
static void defer_full_frame ( struct iax2_thread * thread )
{
struct iax2_pkt_buf * pkt_buf , * cur_pkt_buf ;
struct ast_iax2_full_hdr * fh , * cur_fh ;
if ( ! ( pkt_buf = ast_calloc ( 1 , sizeof ( * pkt_buf ) + thread - > buf_len ) ) )
return ;
pkt_buf - > len = thread - > buf_len ;
memcpy ( pkt_buf - > buf , thread - > buf , pkt_buf - > len ) ;
fh = ( struct ast_iax2_full_hdr * ) pkt_buf - > buf ;
ast_mutex_lock ( & thread - > lock ) ;
AST_LIST_TRAVERSE_SAFE_BEGIN ( & thread - > full_frames , cur_pkt_buf , entry ) {
cur_fh = ( struct ast_iax2_full_hdr * ) cur_pkt_buf - > buf ;
if ( fh - > oseqno < cur_fh - > oseqno ) {
AST_LIST_INSERT_BEFORE_CURRENT ( & thread - > full_frames , pkt_buf , entry ) ;
break ;
}
}
AST_LIST_TRAVERSE_SAFE_END
if ( ! cur_pkt_buf )
AST_LIST_INSERT_TAIL ( & thread - > full_frames , pkt_buf , entry ) ;
ast_mutex_unlock ( & thread - > lock ) ;
}
static int socket_read ( int * id , int fd , short events , void * cbdata )
{
struct iax2_thread * thread ;
@ -6528,8 +6603,10 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
len = sizeof ( thread - > iosin ) ;
thread - > iofd = fd ;
thread - > iores = recvfrom ( fd , thread - > buf , sizeof ( thread - > buf ) , 0 , ( struct sockaddr * ) & thread - > iosin , & len ) ;
if ( thread - > iores < 0 ) {
thread - > buf_len = recvfrom ( fd , thread - > readbuf , sizeof ( thread - > buf ) , 0 , ( struct sockaddr * ) & thread - > iosin , & len ) ;
thread - > buf_size = sizeof ( thread - > readbuf ) ;
thread - > buf = thread - > readbuf ;
if ( thread - > buf_len < 0 ) {
if ( errno ! = ECONNREFUSED & & errno ! = EAGAIN )
ast_log ( LOG_WARNING , " Error: %s \n " , strerror ( errno ) ) ;
handle_error ( ) ;
@ -6554,15 +6631,11 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
! inaddrcmp ( & cur - > ffinfo . sin , & thread - > iosin ) )
break ;
}
AST_LIST_UNLOCK ( & active_list ) ;
if ( cur ) {
/* we found another thread processing a full frame for this call,
so we can ' t accept this frame */
ast_debug ( 1 , " Dropping frame from %s (callno %d) of type %d (subclass %d) due to frame of type %d (subclass %d) already in process \n " ,
ast_inet_ntoa ( thread - > iosin . sin_addr ) , cur - > ffinfo . callno ,
fh - > type , uncompress_subclass ( fh - > csub ) ,
cur - > ffinfo . type , uncompress_subclass ( cur - > ffinfo . csub ) ) ;
insert_idle_thread ( thread ) ;
so queue it up for processing later . */
defer_full_frame ( thread ) ;
AST_LIST_UNLOCK ( & active_list ) ;
return 1 ;
} else {
/* this thread is going to process this frame, so mark it */
@ -6571,6 +6644,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
thread - > ffinfo . type = fh - > type ;
thread - > ffinfo . csub = fh - > csub ;
}
AST_LIST_UNLOCK ( & active_list ) ;
}
/* Mark as ready and send on its way */
@ -6773,7 +6847,7 @@ static int socket_process(struct iax2_thread *thread)
fr - > callno = 0 ;
/* Copy frequently used parameters to the stack */
res = thread - > iores ;
res = thread - > buf_len ;
fd = thread - > iofd ;
memcpy ( & sin , & thread - > iosin , sizeof ( sin ) ) ;
@ -6943,7 +7017,7 @@ static int socket_process(struct iax2_thread *thread)
}
/* Ensure text frames are NULL-terminated */
if ( f . frametype = = AST_FRAME_TEXT & & thread - > buf [ res - 1 ] ! = ' \0 ' ) {
if ( res < sizeof ( thread - > buf ) )
if ( res < thread - > buf_size )
thread - > buf [ res + + ] = ' \0 ' ;
else /* Trims one character from the text message, but that's better than overwriting the end of the buffer. */
thread - > buf [ res - 1 ] = ' \0 ' ;
@ -8098,6 +8172,7 @@ static void *iax2_process_thread(void *data)
thread - > actions + + ;
thread - > iostate = IAX_IOSTATE_PROCESSING ;
socket_process ( thread ) ;
handle_deferred_full_frames ( thread ) ;
break ;
case IAX_IOSTATE_SCHEDREADY :
thread - > actions + + ;
@ -8119,6 +8194,10 @@ static void *iax2_process_thread(void *data)
AST_LIST_REMOVE ( & active_list , thread , list ) ;
AST_LIST_UNLOCK ( & active_list ) ;
/* Make sure another frame didn't sneak in there after we thought we were done. */
handle_deferred_full_frames ( thread ) ;
/* Go back into our respective list */
put_into_idle = 1 ;
}