TT#89352 support Janus control protocol

Change-Id: I0a6d5fc93c5fa505390408043feeca3f5cf61181
pull/1346/head
Richard Fuchs 5 years ago
parent a52c0fecf4
commit 2130e2f62b

@ -83,7 +83,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c
crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \
media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \
codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c t38.c websocket.c \
mqtt.c
mqtt.c janus.strhash.c
LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c dtmflib.c
ifeq ($(with_transcoding),yes)
LIBSRCS+= codeclib.c resample.c

@ -48,6 +48,7 @@
#include "jitter_buffer.h"
#include "t38.h"
#include "mqtt.h"
#include "janus.h"
struct iterator_helper {
@ -77,7 +78,6 @@ unsigned int call_socket_cpu_affinity = 0;
/* ********** */
static void __monologue_destroy(struct call_monologue *monologue, int recurse);
static int monologue_destroy(struct call_monologue *ml);
static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start,
struct timeval *interval_duration);
static void __call_free(void *p);
@ -138,7 +138,7 @@ void call_make_own_foreign(struct call *c, bool foreign) {
static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) {
GList *it;
unsigned int check;
int good = 0;
bool good = false;
struct packet_stream *ps;
struct stream_fd *sfd;
int tmp_t_reason = UNKNOWN;
@ -178,8 +178,9 @@ static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) {
goto delete;
}
// conference: call can be created without participants added
if (!c->streams.head)
goto drop;
goto out;
// ignore media timeout if call was recently taken over
if (c->foreign_media && rtpe_now.tv_sec - c->last_signal <= rtpe_config.timeout)
@ -223,7 +224,7 @@ no_sfd:
}
if (rtpe_now.tv_sec - atomic64_get(timestamp) < check)
good = 1;
good = true;
next:
;
@ -250,7 +251,6 @@ next:
ilog(LOG_INFO, "Closing call due to timeout");
drop:
hlp->del_timeout = g_slist_prepend(hlp->del_timeout, obj_get(c));
goto out;
@ -3048,6 +3048,10 @@ static void __call_cleanup(struct call *c) {
}
recording_finish(c);
if (c->janus_session)
__obj_put((void *) c->janus_session);
c->janus_session = NULL;
}
/* called lock-free, but must hold a reference to the call */
@ -3642,7 +3646,7 @@ static void __monologue_destroy(struct call_monologue *monologue, int recurse) {
}
/* must be called with call->master_lock held in W */
static int monologue_destroy(struct call_monologue *ml) {
int monologue_destroy(struct call_monologue *ml) {
struct call *c = ml->call;
__monologue_destroy(ml, 1);

@ -306,7 +306,7 @@ static void streams_parse(const char *s, GQueue *q) {
i = 0;
pcre_multi_match(streams_re, streams_ree, s, 3, streams_parse_func, &i, q);
}
static void call_unlock_release(struct call **c) {
void call_unlock_release(struct call **c) {
if (!*c)
return;
rwlock_unlock_w(&(*c)->master_lock);
@ -948,17 +948,22 @@ static void call_ng_flags_flags(struct sdp_ng_flags *out, str *s, void *dummy) {
STR_FMT(s));
}
}
static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *input, enum call_opmode opmode) {
bencode_item_t *list, *it, *dict;
int diridx;
str s;
void call_ng_flags_init(struct sdp_ng_flags *out, enum call_opmode opmode) {
ZERO(*out);
out->opmode = opmode;
out->trust_address = trust_address_def;
out->dtls_passive = dtls_passive_def;
out->dtls_reverse_passive = dtls_passive_def;
}
static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *input, enum call_opmode opmode) {
bencode_item_t *list, *it, *dict;
int diridx;
str s;
call_ng_flags_init(out, opmode);
call_ng_flags_list(out, input, "flags", call_ng_flags_flags, NULL);
call_ng_flags_list(out, input, "replace", call_ng_flags_replace, NULL);
@ -1202,7 +1207,7 @@ static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *inpu
}
}
}
static void call_ng_free_flags(struct sdp_ng_flags *flags) {
void call_ng_free_flags(struct sdp_ng_flags *flags) {
if (flags->codec_except)
g_hash_table_destroy(flags->codec_except);
if (flags->codec_set)
@ -1362,7 +1367,7 @@ static void fragments_cleanup(int all) {
}
static void save_last_sdp(struct call_monologue *ml, str *sdp, GQueue *parsed, GQueue *streams) {
void save_last_sdp(struct call_monologue *ml, str *sdp, GQueue *parsed, GQueue *streams) {
str_free_dup(&ml->last_in_sdp);
ml->last_in_sdp = *sdp;
*sdp = STR_NULL;

File diff suppressed because it is too large Load Diff

@ -54,6 +54,7 @@
#include "websocket.h"
#include "codec.h"
#include "mqtt.h"
#include "janus.h"
@ -560,6 +561,7 @@ static void options(int *argc, char ***argv) {
#ifdef SO_INCOMING_CPU
{ "socket-cpu-affinity",0,0,G_OPTION_ARG_INT, &rtpe_config.cpu_affinity,"CPU affinity for media sockets","INT"},
#endif
{ "janus-secret", 0,0, G_OPTION_ARG_STRING, &rtpe_config.janus_secret,"Admin secret for Janus protocol","STRING"},
{ NULL, }
};
@ -976,6 +978,7 @@ static void options_free(void) {
g_free(rtpe_config.mqtt_certfile);
g_free(rtpe_config.mqtt_keyfile);
g_free(rtpe_config.mqtt_publish_topic);
g_free(rtpe_config.janus_secret);
// free common config options
config_load_free(&rtpe_config.common);
@ -1024,6 +1027,7 @@ static void init_everything(void) {
if (rtpe_config.mqtt_host && mqtt_init())
abort();
codecs_init();
janus_init();
}
@ -1298,10 +1302,9 @@ int main(int argc, char **argv) {
redis_close(rtpe_redis_notify);
free_prefix();
options_free();
log_free();
janus_free();
obj_release(rtpe_cli);
obj_release(rtpe_udp);

@ -29,6 +29,7 @@
#include "jitter_buffer.h"
#include "dtmf.h"
#include "mqtt.h"
#include "janus.h"
#ifndef PORT_RANDOM_MIN
@ -2281,7 +2282,10 @@ static int stream_packet(struct packet_handler_ctx *phc) {
phc->mp.raw = phc->s;
// XXX separate stats for received/sent
atomic64_inc(&phc->mp.stream->stats.packets);
if (atomic64_inc(&phc->mp.stream->stats.packets) == 0) {
if (phc->mp.stream->component == 1 && phc->mp.media->index == 1)
janus_media_up(phc->mp.media->monologue);
}
atomic64_add(&phc->mp.stream->stats.bytes, phc->s.len);
atomic64_set(&phc->mp.stream->last_packet, rtpe_now.tv_sec);
RTPE_STATS_INC(packets, 1);

@ -2023,9 +2023,19 @@ static int synth_session_connection(struct sdp_chopper *chop, struct sdp_media *
}
void sdp_chopper_destroy(struct sdp_chopper *chop) {
g_string_free(chop->output, TRUE);
if (chop->output)
g_string_free(chop->output, TRUE);
g_slice_free1(sizeof(*chop), chop);
}
void sdp_chopper_destroy_ret(struct sdp_chopper *chop, str *ret) {
*ret = STR_NULL;
if (chop->output) {
str_init_len(ret, chop->output->str, chop->output->len);
g_string_free(chop->output, FALSE);
chop->output = NULL;
}
sdp_chopper_destroy(chop);
}
static int process_session_attributes(struct sdp_chopper *chop, struct sdp_attributes *attrs,
struct sdp_ng_flags *flags)

@ -8,6 +8,7 @@
#include "cli.h"
#include "control_ng.h"
#include "statistics.h"
#include "janus.h"
struct websocket_message;
@ -32,6 +33,7 @@ struct websocket_conn {
unsigned int jobs;
GQueue messages;
cond_t cond;
GHashTable *janus_sessions;
// output buffer - also protected by lock
GQueue output_q;
@ -546,6 +548,8 @@ static int websocket_http_body(struct websocket_conn *wc, const char *body, size
if (!strcmp(uri, "/ng") && wm->method == M_POST && wm->content_type == CT_NG)
handler = websocket_http_ng;
else if (!strcmp(uri, "/admin") && wm->method == M_POST && wm->content_type == CT_JSON)
handler = websocket_janus_process;
if (!handler) {
ilogs(http, LOG_WARN, "Unhandled HTTP POST URI: '%s'", wm->uri);
@ -568,6 +572,20 @@ static void websocket_conn_cleanup(struct websocket_conn *wc) {
mutex_lock(&wc->lock);
while (wc->jobs)
cond_wait(&wc->cond, &wc->lock);
// detach all Janus sessions
if (wc->janus_sessions) {
GHashTableIter iter;
g_hash_table_iter_init(&iter, wc->janus_sessions);
gpointer key;
while (g_hash_table_iter_next(&iter, &key, NULL)) {
janus_detach_websocket(key, wc);
__obj_put(key);
}
g_hash_table_destroy(wc->janus_sessions);
wc->janus_sessions = NULL;
}
mutex_unlock(&wc->lock);
assert(wc->messages.length == 0);
@ -593,6 +611,14 @@ static int websocket_conn_init(struct lws *wsi, void *p) {
if (!wc)
return -1;
memset(wc, 0, sizeof(*wc));
wc->wsi = wsi;
mutex_init(&wc->lock);
cond_init(&wc->cond);
g_queue_init(&wc->messages);
g_queue_push_tail(&wc->output_q, websocket_output_new());
wc->janus_sessions = g_hash_table_new(g_direct_hash, g_direct_equal);
struct sockaddr_storage sa = {0,};
socklen_t sl = sizeof(sa);
#if LWS_LIBRARY_VERSION_MAJOR >= 3
@ -633,6 +659,16 @@ static int websocket_conn_init(struct lws *wsi, void *p) {
}
void websocket_conn_add_session(struct websocket_conn *wc, struct janus_session *s) {
mutex_lock(&wc->lock);
if (wc->janus_sessions) {
assert(g_hash_table_lookup(wc->janus_sessions, s) == NULL);
g_hash_table_insert(wc->janus_sessions, s, s);
}
mutex_unlock(&wc->lock);
}
static int websocket_do_http(struct lws *wsi, struct websocket_conn *wc, const char *uri) {
ilogs(http, LOG_DEBUG, "HTTP request start: %s", uri);
@ -787,6 +823,11 @@ static int websocket_protocol(struct lws *wsi, enum lws_callback_reasons reason,
}
static int websocket_janus(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in,
size_t len)
{
return websocket_protocol(wsi, reason, user, in, len, websocket_janus_process, "janus-protocol");
}
static int websocket_rtpengine_echo(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in,
size_t len)
{
@ -810,6 +851,11 @@ static const struct lws_protocols websocket_protocols[] = {
.callback = websocket_http,
.per_session_data_size = sizeof(struct websocket_conn),
},
{
.name = "janus-protocol",
.callback = websocket_janus,
.per_session_data_size = sizeof(struct websocket_conn),
},
{
.name = "echo.rtpengine.com",
.callback = websocket_rtpengine_echo,

2
debian/control vendored

@ -40,6 +40,8 @@ Build-Depends:
libxmlrpc-core-c3-dev (>= 1.16.07),
libxtables-dev (>= 1.4) | iptables-dev (>= 1.4),
markdown,
python3,
python3-websockets,
zlib1g-dev,
Package: ngcp-rtpengine-daemon

@ -218,6 +218,7 @@ struct jitter_buffer;
struct codec_tracker;
struct rtcp_timer;
struct mqtt_timer;
struct janus_session;
typedef bencode_buffer_t call_buffer_t;
@ -491,6 +492,7 @@ struct call {
GQueue endpoint_maps;
struct dtls_cert *dtls_cert; /* for outgoing */
struct mqtt_timer *mqtt_timer;
struct janus_session *janus_session;
str callid;
struct timeval created;
@ -575,6 +577,7 @@ int monologue_subscribe_request(struct call_monologue *src, struct call_monologu
int monologue_subscribe_answer(struct call_monologue *src, struct call_monologue *dst, struct sdp_ng_flags *,
GQueue *);
int monologue_unsubscribe(struct call_monologue *src, struct call_monologue *dst, struct sdp_ng_flags *);
int monologue_destroy(struct call_monologue *ml);
int call_delete_branch(const str *callid, const str *branch,
const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay);
void call_destroy(struct call *);

@ -187,6 +187,11 @@ const char *call_subscribe_request_ng(bencode_item_t *, bencode_item_t *);
const char *call_subscribe_answer_ng(bencode_item_t *, bencode_item_t *);
const char *call_unsubscribe_ng(bencode_item_t *, bencode_item_t *);
void save_last_sdp(struct call_monologue *ml, str *sdp, GQueue *parsed, GQueue *streams);
void call_ng_flags_init(struct sdp_ng_flags *out, enum call_opmode opmode);
void call_ng_free_flags(struct sdp_ng_flags *flags);
void call_unlock_release(struct call **c);
int call_interfaces_init(void);
void call_interfaces_free(void);
void call_interfaces_timer(void);

@ -0,0 +1,18 @@
#ifndef __JANUS_H__
#define __JANUS_H__
struct websocket_conn;
struct websocket_message;
struct janus_session;
struct call_monologue;
void janus_init(void);
void janus_free(void);
const char *websocket_janus_process(struct websocket_message *wm);
void janus_detach_websocket(struct janus_session *session, struct websocket_conn *wc);
void janus_media_up(struct call_monologue *);
#endif

@ -148,6 +148,7 @@ struct rtpengine_config {
MOS_LQ,
} mos;
int cpu_affinity;
char *janus_secret;
};

@ -32,6 +32,7 @@ int sdp_parse_candidate(struct ice_candidate *cand, const str *s); // returns -1
struct sdp_chopper *sdp_chopper_new(str *input);
void sdp_chopper_destroy(struct sdp_chopper *chop);
void sdp_chopper_destroy_ret(struct sdp_chopper *chop, str *ret);
INLINE int is_trickle_ice_address(const struct endpoint *ep) {
if (is_addr_unspecified(&ep->address) && ep->port == 9)

@ -8,6 +8,7 @@
struct websocket_conn;
struct websocket_message;
enum lws_write_protocol;
struct janus_session;
typedef const char *(*websocket_message_func_t)(struct websocket_message *);
@ -52,4 +53,7 @@ size_t websocket_queue_len(struct websocket_conn *wc);
int websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length);
// mark a janus session as owned by this transport
void websocket_conn_add_session(struct websocket_conn *, struct janus_session *);
#endif

3
t/.gitignore vendored

@ -68,3 +68,6 @@ tcp_listener.c
test-kernel-module
test-resample
mqtt.c
cli.c
janus.c
websocket.c

@ -22,6 +22,7 @@ CFLAGS+= $(shell pkg-config --cflags spandsp)
CFLAGS+= -DWITH_TRANSCODING
CFLAGS+= $(shell pkg-config --cflags zlib)
CFLAGS+= $(shell pkg-config --cflags json-glib-1.0)
CFLAGS+= $(shell pkg-config --cflags libwebsockets)
CFLAGS+= $(shell pkg-config --cflags libevent_pthreads)
CFLAGS+= $(shell pkg-config xmlrpc_client --cflags 2> /dev/null || xmlrpc-c-config client --cflags)
CFLAGS+= $(shell pkg-config xmlrpc --cflags 2> /dev/null)
@ -49,6 +50,7 @@ LDLIBS+= $(shell pkg-config --libs libavfilter)
LDLIBS+= $(shell pkg-config --libs spandsp)
LDLIBS+= $(shell pkg-config --libs zlib)
LDLIBS+= $(shell pkg-config --libs json-glib-1.0)
LDLIBS+= $(shell pkg-config --libs libwebsockets)
LDLIBS+= -lpcap
LDLIBS+= $(shell pkg-config --libs libevent_pthreads)
LDLIBS+= $(shell pkg-config xmlrpc_client --libs 2> /dev/null || xmlrpc-c-config client --libs)
@ -74,8 +76,8 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c
DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \
dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \
cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \
media_player.c jitter_buffer.c t38.c tcp_listener.c mqtt.c
HASHSRCS+= call_interfaces.c control_ng.c sdp.c
media_player.c jitter_buffer.c t38.c tcp_listener.c mqtt.c websocket.c cli.c
HASHSRCS+= call_interfaces.c control_ng.c sdp.c janus.c
endif
OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) $(DAEMONSRCS:.c=.o) $(HASHSRCS:.c=.strhash.o)
@ -111,7 +113,7 @@ daemon-tests: tests-preload.so
$(MAKE) -C ../daemon
$(MAKE) all-daemon-tests
all-daemon-tests: daemon-tests-main daemon-tests-jb daemon-tests-pubsub
all-daemon-tests: daemon-tests-main daemon-tests-jb daemon-tests-pubsub daemon-tests-websocket
daemon-tests-main:
rm -rf fake-$@-sockets
@ -153,6 +155,14 @@ daemon-tests-pubsub:
test "$$(ls fake-$@-sockets)" = ""
rmdir fake-$@-sockets
daemon-tests-websocket:
rm -rf fake-$@-sockets
mkdir fake-$@-sockets
LD_PRELOAD=../t/tests-preload.so RTPE_BIN=../daemon/rtpengine TEST_SOCKET_PATH=./fake-$@-sockets \
python3 auto-daemon-tests-websocket.py
test "$$(ls fake-$@-sockets)" = ""
rmdir fake-$@-sockets
test-bitstr: test-bitstr.o
spandsp_send_fax_pcm: spandsp_send_fax_pcm.o
@ -180,7 +190,8 @@ test-transcode: test-transcode.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr
rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \
control_ng.strhash.o \
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o websocket.o \
cli.o
test-resample: test-resample.o $(COMMONOBJS) codeclib.o resample.o dtmflib.o

File diff suppressed because it is too large Load Diff

@ -9,6 +9,7 @@ int _log_facility_rtcp;
int _log_facility_cdr;
int _log_facility_dtmf;
struct rtpengine_config rtpe_config;
struct rtpengine_config initial_rtpe_config;
struct poller *rtpe_poller;
struct poller_map *rtpe_poller_map;
GString *dtmf_logs;

Loading…
Cancel
Save