@ -90,6 +90,7 @@ struct websocket_pvt {
pthread_t outbound_read_thread ;
size_t bytes_read ;
size_t leftover_len ;
char * remote_addr ;
char * uri_params ;
char * leftover_data ;
enum webchan_control_msg_format control_msg_format ;
@ -104,6 +105,13 @@ struct websocket_pvt {
char connection_id [ 0 ] ;
} ;
/*
* These are the indexes in the channel ' s file descriptor array
* not the file descriptors themselves .
*/
# define WS_TIMER_FDNO (AST_EXTENDED_FDS + 1)
# define WS_WEBSOCKET_FDNO (AST_EXTENDED_FDS + 2)
# define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE"
# define MEDIA_WEBSOCKET_CONNECTION_ID "MEDIA_WEBSOCKET_CONNECTION_ID"
# define INCOMING_CONNECTION_ID "INCOMING"
@ -125,6 +133,9 @@ struct websocket_pvt {
# define MAX_TEXT_MESSAGE_LEN MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1))
/* Forward declarations */
static int read_from_ws_and_queue ( struct websocket_pvt * instance ) ;
static void _websocket_request_hangup ( struct websocket_pvt * instance , int ast_cause ,
enum ast_websocket_status_code tech_cause , int line , const char * function ) ;
static struct ast_channel * webchan_request ( const char * type , struct ast_format_cap * cap , const struct ast_assigned_ids * assignedids , const struct ast_channel * requestor , const char * data , int * cause ) ;
static int webchan_call ( struct ast_channel * ast , const char * dest , int timeout ) ;
static struct ast_frame * webchan_read ( struct ast_channel * ast ) ;
@ -132,6 +143,9 @@ static int webchan_write(struct ast_channel *ast, struct ast_frame *f);
static int webchan_hangup ( struct ast_channel * ast ) ;
static int webchan_send_dtmf_text ( struct ast_channel * ast , char digit , unsigned int duration ) ;
# define websocket_request_hangup(_instance, _cause, _tech) \
_websocket_request_hangup ( _instance , _cause , _tech , __LINE__ , __FUNCTION__ )
static struct ast_channel_tech websocket_tech = {
. type = " WebSocket " ,
. description = " Media over WebSocket Channel Driver " ,
@ -416,19 +430,17 @@ static __attribute__ ((format (gnu_printf, 2, 3))) char *_create_event_ERROR(
( { \
int _res = - 1 ; \
char * _payload = _create_event_ # # _event ( _instance , # # __VA_ARGS__ ) ; \
ao2_lock ( instance ) ; \
if ( _payload & & _instance - > websocket ) { \
_res = ast_websocket_write_string ( _instance - > websocket , _payload ) ; \
if ( _res ! = 0 ) { \
ast_log ( LOG_ERROR , " %s: Unable to send event %s \n " , \
ast_channel_name ( instance - > channel ) , _payload ) ; \
} else { \
ast_debug ( 4 , " %s: Sent %s \n " , \
ast_debug ( 3 , " %s: Sent %s \n " , \
ast_channel_name ( instance - > channel ) , _payload ) ; \
} \
ast_free ( _payload ) ; \
} \
ao2_unlock ( instance ) ; \
( _res ) ; \
} )
@ -551,7 +563,16 @@ static struct ast_frame *dequeue_frame(struct websocket_pvt *instance)
/*!
* \ internal
*
* Called by the core channel thread each time the instance timer fires .
* There are two file descriptors on this channel that can trigger
* this function . . .
*
* The timer fd ( WS_TIMER_FDNO ) which gets triggered at a constant
* rate determined by the format . In this case , we need to pull a
* frame OFF the queue and return it to the core .
*
* The websocket fd ( WS_WEBSOCKET_FDNO ) which gets triggered when
* there ' s incoming data to read from the websocket . In this case ,
* we read the data and put it ON the queue . We ' ll return a null frame .
*
*/
static struct ast_frame * webchan_read ( struct ast_channel * ast )
@ -559,12 +580,21 @@ static struct ast_frame *webchan_read(struct ast_channel *ast)
struct websocket_pvt * instance = NULL ;
struct ast_frame * native_frame = NULL ;
struct ast_frame * slin_frame = NULL ;
int fdno = ast_channel_fdno ( ast ) ;
instance = ast_channel_tech_pvt ( ast ) ;
if ( ! instance ) {
return NULL ;
}
if ( fdno = = WS_WEBSOCKET_FDNO ) {
read_from_ws_and_queue ( instance ) ;
return & ast_null_frame ;
}
if ( fdno ! = WS_TIMER_FDNO ) {
return & ast_null_frame ;
}
if ( ast_timer_get_event ( instance - > timer ) = = AST_TIMING_EVENT_EXPIRED ) {
ast_timer_ack ( instance - > timer , 1 ) ;
}
@ -754,7 +784,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer)
ast_queue_control ( instance - > channel , AST_CONTROL_ANSWER ) ;
} else if ( ast_strings_equal ( command , HANGUP_CHANNEL ) ) {
ast_queue_control( instance - > channel , AST_CONTROL_HANGUP ) ;
websocket_request_hangup( instance , AST_CAUSE_NORMAL , AST_WEBSOCKET_STATUS_NORMAL ) ;
} else if ( ast_strings_equal ( command , START_MEDIA_BUFFERING ) ) {
if ( instance - > passthrough ) {
@ -1064,36 +1094,19 @@ static int read_from_ws_and_queue(struct websocket_pvt *instance)
int fragmented = 0 ;
int res = 0 ;
if ( ! instance | | ! instance - > websocket ) {
ast_log ( LOG_WARNING , " %s: WebSocket instance not found \n " ,
ast_channel_name ( instance - > channel ) ) ;
return - 1 ;
}
ast_debug ( 9 , " %s: Waiting for websocket to have data \n " , ast_channel_name ( instance - > channel ) ) ;
res = ast_wait_for_input (
ast_websocket_fd ( instance - > websocket ) , - 1 ) ;
if ( res < = 0 ) {
ast_log ( LOG_WARNING , " %s: WebSocket read failed: %s \n " ,
ast_channel_name ( instance - > channel ) , strerror ( errno ) ) ;
return - 1 ;
}
/*
* We need to lock here to prevent the websocket handle from
* being pulled out from under us if the core sends us a
* hangup request .
*/
ao2_lock ( instance ) ;
if ( ! instance - > websocket ) {
ao2_unlock ( instance ) ;
ast_log ( LOG_WARNING , " %s: WebSocket session not found \n " ,
ast_channel_name ( instance - > channel ) ) ;
return - 1 ;
}
res = ast_websocket_read ( instance - > websocket , & payload , & payload_len ,
& opcode , & fragmented ) ;
ao2_unlock ( instance ) ;
if ( res ) {
ast_debug ( 3 , " %s: WebSocket read error \n " ,
ast_channel_name ( instance - > channel ) ) ;
websocket_request_hangup ( instance , AST_CAUSE_NETWORK_OUT_OF_ORDER , AST_WEBSOCKET_STATUS_GOING_AWAY ) ;
return - 1 ;
}
ast_debug ( 5 , " %s: WebSocket read %d bytes \n " , ast_channel_name ( instance - > channel ) ,
@ -1104,41 +1117,52 @@ static int read_from_ws_and_queue(struct websocket_pvt *instance)
}
if ( opcode = = AST_WEBSOCKET_OPCODE_CLOSE ) {
ast_debug ( 5 , " %s: WebSocket closed by remote \n " ,
ast_debug ( 3 , " %s: WebSocket closed by remote \n " ,
ast_channel_name ( instance - > channel ) ) ;
websocket_request_hangup ( instance , AST_CAUSE_NORMAL , AST_WEBSOCKET_STATUS_GOING_AWAY ) ;
return - 1 ;
}
if ( opcode ! = AST_WEBSOCKET_OPCODE_BINARY ) {
ast_ debug( 5 , " %s: WebSocket frame type %d not supported . Ignoring. \n " ,
ast_ log( LOG_WARNING , " %s: WebSocket frame type %d not supported \n " ,
ast_channel_name ( instance - > channel ) , ( int ) opcode ) ;
websocket_request_hangup ( instance , AST_CAUSE_FAILURE , AST_WEBSOCKET_STATUS_UNSUPPORTED_DATA ) ;
return 0 ;
}
return process_binary_message ( instance , payload , payload_len ) ;
}
/*!
* \ internal
*
* For incoming websocket connections , this function gets called by
* incoming_ws_established_cb ( ) and is run in the http server thread
* handling the websocket connection .
*
* For outgoing websocket connections , this function gets started as
* a background thread by webchan_call ( ) .
*/
static void * read_thread_handler ( void * obj )
static int websocket_handoff_to_channel ( struct websocket_pvt * instance )
{
RAII_VAR ( struct websocket_pvt * , instance , obj , ao2_cleanup ) ;
int res = 0 ;
int nodelay = 1 ;
struct ast_sockaddr * remote_addr = ast_websocket_remote_address ( instance - > websocket ) ;
ast_debug ( 3 , " %s: Read thread started \n " , ast_channel_name ( instance - > channel ) ) ;
instance - > remote_addr = ast_strdup ( ast_sockaddr_stringify ( remote_addr ) ) ;
ast_debug ( 3 , " %s: WebSocket connection with %s established \n " ,
ast_channel_name ( instance - > channel ) , instance - > remote_addr ) ;
if ( setsockopt ( ast_websocket_fd ( instance - > websocket ) ,
IPPROTO_TCP , TCP_NODELAY , ( char * ) & nodelay , sizeof ( nodelay ) ) < 0 ) {
ast_log ( LOG_WARNING , " Failed to set TCP_NODELAY on websocket connection: %s \n " , strerror ( errno ) ) ;
}
ast_channel_set_fd ( instance - > channel , WS_WEBSOCKET_FDNO , ast_websocket_fd ( instance - > websocket ) ) ;
res = send_event ( instance , MEDIA_START ) ;
if ( res ! = 0 ) {
ast_queue_control ( instance - > channel , AST_CONTROL_HANGUP ) ;
return NULL ;
if ( instance - > type = = AST_WS_TYPE_SERVER ) {
websocket_request_hangup ( instance , AST_CAUSE_NETWORK_OUT_OF_ORDER , AST_WEBSOCKET_STATUS_GOING_AWAY ) ;
} else {
/*
* We were called by webchan_call so just need to set causes .
* The core will hangup the channel .
*/
ast_channel_tech_hangupcause_set ( instance - > channel , AST_WEBSOCKET_STATUS_GOING_AWAY ) ;
ast_channel_hangupcause_set ( instance - > channel , AST_CAUSE_NETWORK_OUT_OF_ORDER ) ;
}
return - 1 ;
}
if ( ! instance - > no_auto_answer ) {
@ -1146,17 +1170,24 @@ static void *read_thread_handler(void *obj)
ast_queue_control ( instance - > channel , AST_CONTROL_ANSWER ) ;
}
while ( read_from_ws_and_queue ( instance ) = = 0 )
{
}
return 0 ;
}
/*
* websocket_hangup will take care of closing the websocket if needed .
*/
ast_debug ( 3 , " %s: HANGUP by websocket close/error \n " , ast_channel_name ( instance - > channel ) ) ;
ast_queue_control ( instance - > channel , AST_CONTROL_HANGUP ) ;
static void _websocket_request_hangup ( struct websocket_pvt * instance , int ast_cause ,
enum ast_websocket_status_code tech_cause , int line , const char * function )
{
if ( ! instance | | ! instance - > channel ) {
return ;
}
ast_debug ( 3 , " %s:%s: Hangup requested from %s line %d. cause: %s(%d) tech_cause: %s(%d) " ,
ast_channel_name ( instance - > channel ) , instance - > remote_addr ,
function , line ,
ast_cause2str ( ast_cause ) , ast_cause , ast_websocket_status_to_str ( tech_cause ) , tech_cause ) ;
return NULL ;
if ( tech_cause ) {
ast_channel_tech_hangupcause_set ( instance - > channel , tech_cause ) ;
}
ast_queue_hangup_with_cause ( instance - > channel , ast_cause ) ;
}
/*! \brief Function called when we should write a frame to the channel */
@ -1195,17 +1226,19 @@ static int webchan_write(struct ast_channel *ast, struct ast_frame *f)
* \ internal
*
* Called by the core to actually call the remote .
* The core will hang up the channel if a non - zero is returned .
* We just need to set hangup causes if appropriate .
*/
static int webchan_call ( struct ast_channel * ast , const char * dest ,
int timeout )
{
struct websocket_pvt * instance = ast_channel_tech_pvt ( ast ) ;
int nodelay = 1 ;
enum ast_websocket_result result ;
if ( ! instance ) {
ast_log ( LOG_WARNING , " %s: WebSocket instance not found \n " ,
ast_channel_name ( ast ) ) ;
ast_channel_hangupcause_set ( ast , AST_CAUSE_FAILURE ) ;
return - 1 ;
}
@ -1218,6 +1251,7 @@ static int webchan_call(struct ast_channel *ast, const char *dest,
if ( ! instance - > client ) {
ast_log ( LOG_WARNING , " %s: WebSocket client not found \n " ,
ast_channel_name ( ast ) ) ;
ast_channel_hangupcause_set ( ast , AST_CAUSE_FAILURE ) ;
return - 1 ;
}
@ -1233,26 +1267,11 @@ static int webchan_call(struct ast_channel *ast, const char *dest,
if ( ! instance - > websocket | | result ! = WS_OK ) {
ast_log ( LOG_WARNING , " %s: WebSocket connection failed to %s: %s \n " ,
ast_channel_name ( ast ) , dest , ast_websocket_result_to_str ( result ) ) ;
ast_channel_hangupcause_set ( ast , AST_CAUSE_NO_ROUTE_DESTINATION ) ;
return - 1 ;
}
if ( setsockopt ( ast_websocket_fd ( instance - > websocket ) ,
IPPROTO_TCP , TCP_NODELAY , ( char * ) & nodelay , sizeof ( nodelay ) ) < 0 ) {
ast_log ( LOG_WARNING , " Failed to set TCP_NODELAY on websocket connection: %s \n " , strerror ( errno ) ) ;
}
ast_debug ( 3 , " %s: WebSocket connection to %s established \n " ,
ast_channel_name ( ast ) , dest ) ;
/* read_thread_handler() will clean up the bump */
if ( ast_pthread_create_detached_background ( & instance - > outbound_read_thread , NULL ,
read_thread_handler , ao2_bump ( instance ) ) ) {
ast_log ( LOG_WARNING , " %s: Failed to create thread. \n " , ast_channel_name ( ast ) ) ;
ao2_cleanup ( instance ) ;
return - 1 ;
}
return 0 ;
return websocket_handoff_to_channel ( instance ) ;
}
static void websocket_destructor ( void * data )
@ -1312,6 +1331,7 @@ static void websocket_destructor(void *data)
}
ast_free ( instance - > uri_params ) ;
ast_free ( instance - > remote_addr ) ;
}
struct instance_proxy {
@ -1501,7 +1521,7 @@ static int set_channel_timer(struct websocket_pvt *instance)
* Calling ast_channel_set_fd will cause the channel thread to call
* webchan_read at ' rate ' times per second .
*/
ast_channel_set_fd ( instance - > channel , 0 , ast_timer_fd ( instance - > timer ) ) ;
ast_channel_set_fd ( instance - > channel , WS_TIMER_FDNO , ast_timer_fd ( instance - > timer ) ) ;
return 0 ;
}
@ -1779,20 +1799,12 @@ static int webchan_hangup(struct ast_channel *ast)
ast_debug ( 3 , " %s: WebSocket call hangup. cid: %s \n " ,
ast_channel_name ( ast ) , instance - > connection_id ) ;
/*
* We need to lock because read_from_ws_and_queue ( ) is probably waiting
* on the websocket file descriptor and will unblock and immediately try to
* check the websocket and read from it . We don ' t want to pull the
* websocket out from under it between the check and read .
*/
ao2_lock ( instance ) ;
if ( instance - > websocket ) {
ast_websocket_close ( instance - > websocket , 1000 ) ;
ast_websocket_close ( instance - > websocket , ast_channel_tech_hangupcause ( ast ) ? : 1000 ) ;
ast_websocket_unref ( instance - > websocket ) ;
instance - > websocket = NULL ;
}
ast_channel_tech_pvt_set ( ast , NULL ) ;
ao2_unlock ( instance ) ;
/* Clean up the reference from adding the instance to the channel */
ao2_cleanup ( instance ) ;
@ -1827,7 +1839,6 @@ static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session,
struct ast_variable * v ;
const char * connection_id = NULL ;
struct websocket_pvt * instance = NULL ;
int nodelay = 1 ;
ast_debug ( 3 , " WebSocket established \n " ) ;
@ -1847,8 +1858,8 @@ static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session,
* Just in case though . . .
*/
ast_log ( LOG_WARNING , " WebSocket connection id not found \n " ) ;
ast_queue_control( instance - > channel , AST_CONTROL_HANGUP ) ;
ast_websocket_close ( ast_ws_session , 1000 ) ;
websocket_request_hangup( instance , AST_CAUSE_FAILURE , AST_WEBSOCKET_STATUS_INTERNAL_ERROR ) ;
ast_websocket_close ( ast_ws_session , AST_WEBSOCKET_STATUS_INTERNAL_ERROR ) ;
return ;
}
@ -1860,22 +1871,18 @@ static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session,
* Just in case though . . .
*/
ast_log ( LOG_WARNING , " %s: WebSocket instance not found \n " , connection_id ) ;
ast_queue_control( instance - > channel , AST_CONTROL_HANGUP ) ;
ast_websocket_close ( ast_ws_session , 1000 ) ;
websocket_request_hangup( instance , AST_CAUSE_FAILURE , AST_WEBSOCKET_STATUS_INTERNAL_ERROR ) ;
ast_websocket_close ( ast_ws_session , AST_WEBSOCKET_STATUS_INTERNAL_ERROR ) ;
return ;
}
instance - > websocket = ao2_bump ( ast_ws_session ) ;
if ( setsockopt ( ast_websocket_fd ( instance - > websocket ) ,
IPPROTO_TCP , TCP_NODELAY , ( char * ) & nodelay , sizeof ( nodelay ) ) < 0 ) {
ast_log ( LOG_WARNING , " Failed to set TCP_NODELAY on manager connection: %s \n " , strerror ( errno ) ) ;
}
/* read_thread_handler cleans up the bump */
read_thread_handler ( ao2_bump ( instance ) ) ;
websocket_handoff_to_channel ( instance ) ;
ao2_cleanup ( instance ) ;
ast_debug ( 3 , " WebSocket closed \n " ) ;
/*
* The instance is the channel ' s responsibility now .
* We just return here .
*/
}
/*!