@ -50,23 +50,23 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! \brief Number of buckets our multiplexed thread container can have */
# define MULTIPLEXED_BUCKETS 53
/*! \brief Number of channel s we handle in a single thread */
# define MULTIPLEXED_MAX_ CHANNELS 8
/*! \brief Number of bridge s we handle in a single thread */
# define MULTIPLEXED_MAX_ BRIDGES 4
/*! \brief Structure which represents a single thread handling multiple 2 channel bridges */
struct multiplexed_thread {
/*! Thread itself */
pthread_t thread ;
/*! Channels serviced by this thread */
struct ast_channel * chans [ 2 * MULTIPLEXED_MAX_BRIDGES ] ;
/*! Pipe used to wake up the multiplexed thread */
int pipe [ 2 ] ;
/*! Channels in this thread */
struct ast_channel * chans [ MULTIPLEXED_MAX_CHANNELS ] ;
/*! Number of channels in this thread */
unsigned int count ;
/*! Bit used to indicate that the thread is waiting on channels */
unsigned int waiting : 1 ;
/*! Number of channels actually being serviced by this thread */
unsigned int service_count ;
/*! Number of bridges in this thread */
unsigned int bridges ;
/*! TRUE if the thread is waiting on channels */
unsigned int waiting : 1 ;
} ;
/*! \brief Container of all operating multiplexed threads */
@ -76,7 +76,8 @@ static struct ao2_container *muxed_threads;
static int find_multiplexed_thread ( void * obj , void * arg , int flags )
{
struct multiplexed_thread * muxed_thread = obj ;
return ( muxed_thread - > count < = ( MULTIPLEXED_MAX_CHANNELS - 2 ) ) ? CMP_MATCH | CMP_STOP : 0 ;
return ( muxed_thread - > bridges < MULTIPLEXED_MAX_BRIDGES ) ? CMP_MATCH | CMP_STOP : 0 ;
}
/*! \brief Destroy callback for a multiplexed thread structure */
@ -147,8 +148,8 @@ static int multiplexed_bridge_create(struct ast_bridge *bridge)
ast_debug ( 1 , " Found multiplexed thread '%p' for bridge '%p' \n " , muxed_thread , bridge ) ;
}
/* Bump the count of the thread structure up by two since the channels for this bridge will be joining shortly */
muxed_thread - > count + = 2 ;
/* Increase the number of bridges using this multiplexed bridge */
+ + muxed_thread - > bridges ;
ao2_unlock ( muxed_threads ) ;
@ -157,7 +158,17 @@ static int multiplexed_bridge_create(struct ast_bridge *bridge)
return 0 ;
}
/*! \brief Internal function which nudges the thread */
/*!
* \ internal
* \ brief Nudges the multiplex thread .
* \ since 12.0 .0
*
* \ param muxed_thread Controller to poke the thread .
*
* \ note This function assumes the muxed_thread is locked .
*
* \ return Nothing
*/
static void multiplexed_nudge ( struct multiplexed_thread * muxed_thread )
{
int nudge = 0 ;
@ -179,28 +190,39 @@ static void multiplexed_nudge(struct multiplexed_thread *muxed_thread)
static int multiplexed_bridge_destroy ( struct ast_bridge * bridge )
{
struct multiplexed_thread * muxed_thread ;
pthread_t thread ;
muxed_thread = bridge - > bridge_pvt ;
if ( ! muxed_thread ) {
return - 1 ;
}
bridge - > bridge_pvt = NULL ;
ao2_lock ( muxed_threads ) ;
muxed_thread - > count - = 2 ;
if ( ! muxed_thread - > count ) {
ast_debug ( 1 , " Unlinking multiplexed thread '%p' since nobody is using it anymore \n " , muxed_thread ) ;
if ( - - muxed_thread - > bridges ) {
/* Other bridges are still using the multiplexed thread. */
ao2_unlock ( muxed_threads ) ;
} else {
ast_debug ( 1 , " Unlinking multiplexed thread '%p' since nobody is using it anymore \n " ,
muxed_thread ) ;
ao2_unlink ( muxed_threads , muxed_thread ) ;
}
ao2_unlock ( muxed_threads ) ;
multiplexed_nudge ( muxed_thread ) ;
/* Stop the multiplexed bridge thread. */
ao2_lock ( muxed_thread ) ;
multiplexed_nudge ( muxed_thread ) ;
thread = muxed_thread - > thread ;
muxed_thread - > thread = AST_PTHREADT_STOP ;
ao2_unlock ( muxed_thread ) ;
ao2_unlock ( muxed_threads ) ;
if ( thread ! = AST_PTHREADT_NULL ) {
/* Wait for multiplexed bridge thread to die. */
pthread_join ( thread , NULL ) ;
}
}
ao2_ref ( muxed_thread , - 1 ) ;
bridge - > bridge_pvt = NULL ;
return 0 ;
}
@ -210,20 +232,24 @@ static void *multiplexed_thread_function(void *data)
struct multiplexed_thread * muxed_thread = data ;
int fds = muxed_thread - > pipe [ 0 ] ;
ao2_lock ( muxed_thread ) ;
ast_debug ( 1 , " Starting actual thread for multiplexed thread '%p' \n " , muxed_thread ) ;
ao2_lock ( muxed_thread ) ;
while ( muxed_thread - > thread ! = AST_PTHREADT_STOP ) {
struct ast_channel * winner ;
struct ast_channel * first = muxed_thread - > chans [ 0 ] ;
int to = - 1 ;
int outfd = - 1 ;
/* Move channels around so not just the first one gets priority */
memmove ( muxed_thread - > chans , muxed_thread - > chans + 1 ,
sizeof ( struct ast_channel * ) * ( muxed_thread - > service_count - 1 ) ) ;
muxed_thread - > chans [ muxed_thread - > service_count - 1 ] = first ;
if ( 1 < muxed_thread - > service_count ) {
struct ast_channel * first ;
/* Move channels around so not just the first one gets priority */
first = muxed_thread - > chans [ 0 ] ;
memmove ( muxed_thread - > chans , muxed_thread - > chans + 1 ,
sizeof ( struct ast_channel * ) * ( muxed_thread - > service_count - 1 ) ) ;
muxed_thread - > chans [ muxed_thread - > service_count - 1 ] = first ;
}
muxed_thread - > waiting = 1 ;
ao2_unlock ( muxed_thread ) ;
@ -263,70 +289,108 @@ static void *multiplexed_thread_function(void *data)
}
}
muxed_thread - > thread = AST_PTHREADT_NULL ;
ao2_unlock ( muxed_thread ) ;
ast_debug ( 1 , " Stopping actual thread for multiplexed thread '%p' \n " , muxed_thread ) ;
ao2_unlock ( muxed_thread ) ;
ao2_ref ( muxed_thread , - 1 ) ;
return NULL ;
}
/*! \brief Helper function which adds or removes a channel and nudges the thread */
static void multiplexed_add_or_remove ( struct multiplexed_thread * muxed_thread , struct ast_channel * chan , int add )
/*!
* \ internal
* \ brief Check to see if the multiplexed bridge thread needs to be started .
* \ since 12.0 .0
*
* \ param muxed_thread Controller to check if need to start thread .
*
* \ note This function assumes the muxed_thread is locked .
*
* \ return Nothing
*/
static void multiplexed_thread_start ( struct multiplexed_thread * muxed_thread )
{
if ( muxed_thread - > service_count & & muxed_thread - > thread = = AST_PTHREADT_NULL ) {
ao2_ref ( muxed_thread , + 1 ) ;
if ( ast_pthread_create ( & muxed_thread - > thread , NULL , multiplexed_thread_function , muxed_thread ) ) {
muxed_thread - > thread = AST_PTHREADT_NULL ; /* For paranoia's sake. */
ao2_ref ( muxed_thread , - 1 ) ;
ast_log ( LOG_WARNING , " Failed to create the common thread for multiplexed thread '%p', trying next time \n " ,
muxed_thread ) ;
}
}
}
/*!
* \ internal
* \ brief Add a channel to the multiplexed bridge .
* \ since 12.0 .0
*
* \ param muxed_thread Controller to add a channel .
* \ param chan Channel to add to the channel service array .
*
* \ return Nothing
*/
static void multiplexed_chan_add ( struct multiplexed_thread * muxed_thread , struct ast_channel * chan )
{
int idx ;
pthread_t thread = AST_PTHREADT_NULL ;
ao2_lock ( muxed_thread ) ;
multiplexed_nudge ( muxed_thread ) ;
for ( idx = 0 ; idx < ARRAY_LEN ( muxed_thread - > chans ) ; + + idx ) {
/* Check if already in the channel service array for safety. */
for ( idx = 0 ; idx < muxed_thread - > service_count ; + + idx ) {
if ( muxed_thread - > chans [ idx ] = = chan ) {
if ( ! add ) {
memmove ( muxed_thread - > chans + idx ,
muxed_thread - > chans + idx + 1 ,
sizeof ( struct ast_channel * ) * ( ARRAY_LEN ( muxed_thread - > chans ) - ( idx + 1 ) ) ) ;
muxed_thread - > chans [ ARRAY_LEN ( muxed_thread - > chans ) - 1 ] = NULL ;
- - muxed_thread - > service_count ;
}
break ;
}
if ( ! muxed_thread - > chans [ idx ] ) {
if ( add ) {
muxed_thread - > chans [ idx ] = chan ;
+ + muxed_thread - > service_count ;
}
break ;
}
}
if ( ARRAY_LEN ( muxed_thread - > chans ) = = idx & & add ) {
ast_log ( LOG_ERROR , " Could not add channel %s to multiplexed thread %p. Array not large enough. \n " ,
ast_channel_name ( chan ) , muxed_thread ) ;
ast_assert ( 0 ) ;
}
if ( muxed_thread - > service_count & & muxed_thread - > thread = = AST_PTHREADT_NULL ) {
ao2_ref ( muxed_thread , + 1 ) ;
if ( ast_pthread_create ( & muxed_thread - > thread , NULL , multiplexed_thread_function , muxed_thread ) ) {
ao2_ref ( muxed_thread , - 1 ) ;
ast_log ( LOG_WARNING , " Failed to create the bridge thread for multiplexed thread '%p', trying next time \n " ,
muxed_thread ) ;
if ( idx = = muxed_thread - > service_count ) {
/* Channel to add was not already in the array. */
if ( muxed_thread - > service_count < ARRAY_LEN ( muxed_thread - > chans ) ) {
muxed_thread - > chans [ muxed_thread - > service_count + + ] = chan ;
} else {
ast_log ( LOG_ERROR , " Could not add channel %s to multiplexed thread %p. Array not large enough. \n " ,
ast_channel_name ( chan ) , muxed_thread ) ;
ast_assert ( 0 ) ;
}
} else if ( ! muxed_thread - > service_count
& & muxed_thread - > thread ! = AST_PTHREADT_NULL
& & muxed_thread - > thread ! = AST_PTHREADT_STOP ) {
thread = muxed_thread - > thread ;
muxed_thread - > thread = AST_PTHREADT_STOP ;
}
multiplexed_thread_start ( muxed_thread ) ;
ao2_unlock ( muxed_thread ) ;
}
if ( thread ! = AST_PTHREADT_NULL ) {
pthread_join ( thread , NULL ) ;
/*!
* \ internal
* \ brief Remove a channel from the multiplexed bridge .
* \ since 12.0 .0
*
* \ param muxed_thread Controller to remove a channel .
* \ param chan Channel to remove from the channel service array .
*
* \ return Nothing
*/
static void multiplexed_chan_remove ( struct multiplexed_thread * muxed_thread , struct ast_channel * chan )
{
int idx ;
ao2_lock ( muxed_thread ) ;
multiplexed_nudge ( muxed_thread ) ;
/* Remove channel from service array. */
for ( idx = 0 ; idx < muxed_thread - > service_count ; + + idx ) {
if ( muxed_thread - > chans [ idx ] ! = chan ) {
continue ;
}
muxed_thread - > chans [ idx ] = muxed_thread - > chans [ - - muxed_thread - > service_count ] ;
break ;
}
multiplexed_thread_start ( muxed_thread ) ;
ao2_unlock ( muxed_thread ) ;
}
/*! \brief Join function which actually adds the channel into the array to be monitored */
@ -338,7 +402,7 @@ static int multiplexed_bridge_join(struct ast_bridge *bridge, struct ast_bridge_
ast_debug ( 1 , " Adding channel '%s' to multiplexed thread '%p' for monitoring \n " , ast_channel_name ( bridge_channel - > chan ) , muxed_thread ) ;
multiplexed_ add_or_remove ( muxed_thread , bridge_channel - > chan , 1 ) ;
multiplexed_ chan_ add( muxed_thread , bridge_channel - > chan ) ;
/* If the second channel has not yet joined do not make things compatible */
if ( c0 = = c1 ) {
@ -361,7 +425,7 @@ static int multiplexed_bridge_leave(struct ast_bridge *bridge, struct ast_bridge
ast_debug ( 1 , " Removing channel '%s' from multiplexed thread '%p' \n " , ast_channel_name ( bridge_channel - > chan ) , muxed_thread ) ;
multiplexed_ add_or _remove( muxed_thread , bridge_channel - > chan , 0 ) ;
multiplexed_ chan _remove( muxed_thread , bridge_channel - > chan ) ;
return 0 ;
}
@ -373,7 +437,7 @@ static void multiplexed_bridge_suspend(struct ast_bridge *bridge, struct ast_bri
ast_debug ( 1 , " Suspending channel '%s' from multiplexed thread '%p' \n " , ast_channel_name ( bridge_channel - > chan ) , muxed_thread ) ;
multiplexed_ add_or _remove( muxed_thread , bridge_channel - > chan , 0 ) ;
multiplexed_ chan _remove( muxed_thread , bridge_channel - > chan ) ;
}
/*! \brief Unsuspend function which means control of the channel is coming back to us */
@ -383,7 +447,7 @@ static void multiplexed_bridge_unsuspend(struct ast_bridge *bridge, struct ast_b
ast_debug ( 1 , " Unsuspending channel '%s' from multiplexed thread '%p' \n " , ast_channel_name ( bridge_channel - > chan ) , muxed_thread ) ;
multiplexed_ add_or_remove ( muxed_thread , bridge_channel - > chan , 1 ) ;
multiplexed_ chan_ add( muxed_thread , bridge_channel - > chan ) ;
}
/*! \brief Write function for writing frames into the bridge */