diff --git a/daemon/call.c b/daemon/call.c index 000afbba3..e117adfa2 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -176,6 +176,8 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) { && rtpe_now.tv_sec - c->last_signal <= atomic_get_na(&rtpe_config.timeout)) goto out; + ice_fragments_cleanup(c->sdp_fragments, false); + for (__auto_type it = c->streams.head; it; it = it->next) { ps = it->data; @@ -4040,7 +4042,6 @@ void __monologue_free(struct call_monologue *m) { free_ssrc_hash(&m->ssrc_hash); if (m->last_out_sdp) g_string_free(m->last_out_sdp, TRUE); - str_free_dup(&m->last_in_sdp); if (m->session_sdp_orig) sdp_orig_free(m->session_sdp_orig); if (m->session_last_sdp_orig) @@ -4101,6 +4102,8 @@ static void __call_free(void *p) { } call_buffer_free(&c->buffer); + ice_fragments_cleanup(c->sdp_fragments, true); + t_hash_table_destroy(c->sdp_fragments); rwlock_destroy(&c->master_lock); assert(c->stream_fds.head == NULL); @@ -4122,6 +4125,7 @@ static call_t *call_create(const str *callid) { c->dtls_cert = dtls_cert(); c->tos = rtpe_config.default_tos; c->poller = rtpe_get_poller(); + c->sdp_fragments = fragments_ht_new(); if (rtpe_config.cpu_affinity) c->cpu_affinity = call_socket_cpu_affinity++ % rtpe_config.cpu_affinity; else diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 547016e9c..6a294a0ac 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -2104,9 +2104,7 @@ static enum load_limit_reasons call_offer_session_limit(void) { void save_last_sdp(struct call_monologue *ml, str *sdp, sdp_sessions_q *parsed, sdp_streams_q *streams) { - str_free_dup(&ml->last_in_sdp); ml->last_in_sdp = *sdp; - *sdp = STR_NULL; sdp_sessions_clear(&ml->last_in_sdp_parsed); ml->last_in_sdp_parsed = *parsed; @@ -2131,11 +2129,34 @@ static enum basic_errors call_ng_basic_checks(sdp_ng_flags *flags, enum call_opm return 0; } +static const char *call_offer_get_call(call_t **callp, sdp_ng_flags *flags) { + // are we allowed to create a call? use `errstr` to determine + const char *errstr = NULL; // creation is allowed + enum load_limit_reasons limit = call_offer_session_limit(); + if (limit != LOAD_LIMIT_NONE) { + if (!flags->supports_load_limit) + errstr = "Parallel session limit reached"; // legacy protocol + else + errstr = magic_load_limit_strings[limit]; + // errstr is set, creation not allowed + } + + if (!errstr) + *callp = call_get_or_create(&flags->call_id, false); + else { + *callp = call_get(&flags->call_id); + if (!*callp) + return errstr; + } + + return NULL; +} + static const char *call_offer_answer_ng(ng_command_ctx_t *ctx, enum call_opmode opmode, const char* addr, const endpoint_t *sin) { const char *errstr; - g_auto(str) sdp = STR_NULL; + str sdp = STR_NULL; g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT; g_auto(sdp_streams_q) streams = TYPED_GQUEUE_INIT; g_autoptr(call_t) call = NULL; @@ -2151,11 +2172,23 @@ static const char *call_offer_answer_ng(ng_command_ctx_t *ctx, enum call_opmode if ((ret = call_ng_basic_checks(&flags, opmode)) > 0) return _ng_basic_errors[ret]; - /* for answer: swap To against From tag */ - if (opmode == OP_ANSWER) + if (opmode == OP_OFFER) { + errstr = call_offer_get_call(&call, &flags); + if (errstr) + goto out; + } + else if (opmode == OP_ANSWER) { + call = call_get(&flags.call_id); + + errstr = "Unknown call-id"; + if (!call) + goto out; + + /* for answer: swap To against From tag */ str_swap(&flags.to_tag, &flags.from_tag); + } - sdp = str_dup_str(&flags.sdp); + sdp = call_str_cpy(&flags.sdp); errstr = "Failed to parse SDP"; if (sdp_parse(&sdp, &parsed, &flags)) @@ -2168,37 +2201,17 @@ static const char *call_offer_answer_ng(ng_command_ctx_t *ctx, enum call_opmode goto out; } - /* OP_ANSWER; OP_OFFER && !IS_FOREIGN_CALL */ - call = call_get(&flags.call_id); + errstr = "Incomplete SDP specification"; + if (sdp_streams(&parsed, &streams, &flags)) + goto out; // SDP fragments for trickle ICE must always operate on an existing call - if (opmode == OP_OFFER && trickle_ice_update(ctx->ngbuf, call, &flags, NULL, &parsed)) { + if (opmode == OP_OFFER && trickle_ice_update(ctx->ngbuf, call, &flags, &streams)) { errstr = NULL; // SDP fragments for trickle ICE are consumed with no replacement returned goto out; } - if (opmode == OP_OFFER && !call) { - enum load_limit_reasons limit = call_offer_session_limit(); - if (limit != LOAD_LIMIT_NONE) { - if (!flags.supports_load_limit) - errstr = "Parallel session limit reached"; // legacy protocol - else - errstr = magic_load_limit_strings[limit]; - goto out; - } - - call = call_get_or_create(&flags.call_id, false); - } - - errstr = "Unknown call-id"; - if (!call) - goto out; - - errstr = "Incomplete SDP specification"; - if (sdp_streams(&parsed, &streams, &flags)) - goto out; - if (flags.debug) CALL_SET(call, DEBUG); @@ -3677,7 +3690,7 @@ const char *call_publish_ng(ng_command_ctx_t *ctx, g_auto(sdp_ng_flags) flags; g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT; g_auto(sdp_streams_q) streams = TYPED_GQUEUE_INIT; - g_auto(str) sdp_in = STR_NULL; + str sdp_in = STR_NULL; g_auto(str) sdp_out = STR_NULL; g_autoptr(call_t) call = NULL; int ret; @@ -3688,22 +3701,19 @@ const char *call_publish_ng(ng_command_ctx_t *ctx, if ((ret = call_ng_basic_checks(&flags, OP_PUBLISH)) > 0) return _ng_basic_errors[ret]; - sdp_in = str_dup_str(&flags.sdp); + call = call_get_or_create(&flags.call_id, false); + + sdp_in = call_str_cpy(&flags.sdp); if (sdp_parse(&sdp_in, &parsed, &flags)) return "Failed to parse SDP"; - call = call_get(&flags.call_id); - - if (trickle_ice_update(ctx->ngbuf, call, &flags, NULL, &parsed)) - return NULL; - - if (!call) - call = call_get_or_create(&flags.call_id, false); - if (sdp_streams(&parsed, &streams, &flags)) return "Incomplete SDP specification"; + if (trickle_ice_update(ctx->ngbuf, call, &flags, &streams)) + return NULL; + updated_created_from(call, addr, sin); struct call_monologue *ml = call_get_or_create_monologue(call, &flags.from_tag); @@ -3858,7 +3868,10 @@ const char *call_subscribe_answer_ng(ng_command_ctx_t *ctx) { if (sdp_parse(&flags.sdp, &parsed, &flags)) return "Failed to parse SDP"; - if (trickle_ice_update(ctx->ngbuf, call, &flags, NULL, &parsed)) + if (sdp_streams(&parsed, &streams, &flags)) + return "Incomplete SDP specification"; + + if (trickle_ice_update(ctx->ngbuf, call, &flags, &streams)) return NULL; if (!flags.to_tag.s) @@ -3871,9 +3884,6 @@ const char *call_subscribe_answer_ng(ng_command_ctx_t *ctx) { if (!dest_ml) return "To-tag not found"; - if (sdp_streams(&parsed, &streams, &flags)) - return "Incomplete SDP specification"; - int ret = monologue_subscribe_answer(dest_ml, &flags, &streams); if (ret) return "Failed to process subscription answer"; diff --git a/daemon/ice.c b/daemon/ice.c index 34739c98b..a546318a1 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -33,15 +33,10 @@ STR_FMT_M(&(p)->remote_candidate->foundation), \ (p)->remote_candidate->component_id -struct fragment_key { - str call_id; - str from_tag; -}; struct sdp_fragment { ng_buffer *ngbuf; struct timeval received; sdp_streams_q streams; - sdp_sessions_q sdp; sdp_ng_flags flags; }; @@ -88,35 +83,13 @@ const char * const ice_type_strings[] = { -static unsigned int frag_key_hash(const struct fragment_key *A); -static int frag_key_eq(const struct fragment_key *A, const struct fragment_key *B); -static void fragment_key_free(struct fragment_key *); - -TYPED_GQUEUE(fragment, struct sdp_fragment) -TYPED_GHASHTABLE(fragments_ht, struct fragment_key, fragment_q, frag_key_hash, frag_key_eq, - fragment_key_free, NULL) -TYPED_GHASHTABLE_LOOKUP_INSERT(fragments_ht, fragment_key_free, fragment_q_new) - -static mutex_t sdp_fragments_lock; -static fragments_ht sdp_fragments; +TYPED_GHASHTABLE_LOOKUP_INSERT(fragments_ht, NULL, fragment_q_new) -static void ice_update_media_streams(struct call_monologue *ml, sdp_streams_q *streams, sdp_sessions_q *sdp, +static void ice_update_media_streams(struct call_monologue *ml, sdp_streams_q *streams, sdp_ng_flags *flags) { - g_auto(sdp_streams_q) streams_local = TYPED_GQUEUE_INIT; - - if (!streams) - streams = &streams_local; - - if (!streams->head) { - if (sdp_streams(sdp, streams, flags)) { - ilogs(ice, LOG_WARN, "Incomplete SDP specification for tricle ICE"); - return; - } - } - for (__auto_type l = streams->head; l; l = l->next) { struct stream_params *sp = l->data; struct call_media *media = NULL; @@ -148,41 +121,19 @@ static void ice_update_media_streams(struct call_monologue *ml, sdp_streams_q *s } -static unsigned int frag_key_hash(const struct fragment_key *a) { - return str_hash(&a->call_id) ^ str_hash(&a->from_tag); -} -static int frag_key_eq(const struct fragment_key *a, const struct fragment_key *b) { - return str_equal(&a->call_id, &b->call_id) - && str_equal(&a->from_tag, &b->from_tag); -} - static void fragment_free(struct sdp_fragment *frag) { sdp_streams_clear(&frag->streams); - sdp_sessions_clear(&frag->sdp); call_ng_free_flags(&frag->flags); obj_put(frag->ngbuf); g_slice_free1(sizeof(*frag), frag); } -static void fragment_key_free(struct fragment_key *k) { - g_free(k->call_id.s); - g_free(k->from_tag.s); - g_slice_free1(sizeof(*k), k); -} -static void queue_sdp_fragment(ng_buffer *ngbuf, sdp_streams_q *streams, sdp_sessions_q *sdp, sdp_ng_flags *flags) { +static void queue_sdp_fragment(ng_buffer *ngbuf, call_t *call, str *key, sdp_streams_q *streams, sdp_ng_flags *flags) { ilog(LOG_DEBUG, "Queuing up SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, STR_FMT_M(&flags->call_id), STR_FMT_M(&flags->from_tag)); - struct fragment_key *k = g_slice_alloc0(sizeof(*k)); - k->call_id = str_dup_str(&flags->call_id); - k->from_tag = str_dup_str(&flags->from_tag); - struct sdp_fragment *frag = g_slice_alloc0(sizeof(*frag)); frag->received = rtpe_now; frag->ngbuf = obj_get(ngbuf); - if (sdp) { - frag->sdp = *sdp; - t_queue_init(sdp); - } if (streams) { frag->streams = *streams; t_queue_init(streams); @@ -190,48 +141,36 @@ static void queue_sdp_fragment(ng_buffer *ngbuf, sdp_streams_q *streams, sdp_ses frag->flags = *flags; ZERO(*flags); - mutex_lock(&sdp_fragments_lock); - fragment_q *frags = fragments_ht_lookup_insert(sdp_fragments, k); + fragment_q *frags = fragments_ht_lookup_insert(call->sdp_fragments, call_str_dup(key)); t_queue_push_tail(frags, frag); - mutex_unlock(&sdp_fragments_lock); } bool trickle_ice_update(ng_buffer *ngbuf, call_t *call, sdp_ng_flags *flags, - sdp_streams_q *streams, sdp_sessions_q *sdp) + sdp_streams_q *streams) { if (!flags->fragment) return false; - if (!call) { - queue_sdp_fragment(ngbuf, streams, sdp, flags); - return true; - } struct call_monologue *ml = call_get_monologue(call, &flags->from_tag); if (!ml) { - queue_sdp_fragment(ngbuf, streams, sdp, flags); + queue_sdp_fragment(ngbuf, call, &flags->from_tag, streams, flags); return true; } - ice_update_media_streams(ml, streams, sdp, flags); + ice_update_media_streams(ml, streams, flags); return true; } #define MAX_FRAG_AGE 3000000 void dequeue_sdp_fragments(struct call_monologue *monologue) { - struct fragment_key k; - ZERO(k); - k.call_id = monologue->call->callid; - k.from_tag = monologue->tag; + call_t *call = monologue->call; fragment_q *frags = NULL; - { - LOCK(&sdp_fragments_lock); - t_hash_table_steal_extended(sdp_fragments, &k, NULL, &frags); - if (!frags) - return; + t_hash_table_steal_extended(call->sdp_fragments, &monologue->tag, NULL, &frags); + if (!frags) + return; - // we own the queue now - } + // we own the queue now struct sdp_fragment *frag; while ((frag = t_queue_pop_head(frags))) { @@ -239,9 +178,9 @@ void dequeue_sdp_fragments(struct call_monologue *monologue) { goto next; ilog(LOG_DEBUG, "Dequeuing SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M, - STR_FMT_M(&k.call_id), STR_FMT_M(&k.from_tag)); + STR_FMT_M(&call->callid), STR_FMT_M(&monologue->tag)); - ice_update_media_streams(monologue, &frag->streams, &frag->sdp, &frag->flags); + ice_update_media_streams(monologue, &frag->streams, &frag->flags); next: fragment_free(frag); @@ -249,7 +188,7 @@ next: t_queue_free(frags); } -static gboolean fragment_check_cleanup(struct fragment_key *key, fragment_q *frags, void *p) { +static gboolean fragment_check_cleanup(str *key, fragment_q *frags, void *p) { bool all = GPOINTER_TO_INT(p); if (!key || !frags) return TRUE; @@ -266,10 +205,8 @@ static gboolean fragment_check_cleanup(struct fragment_key *key, fragment_q *fra } return FALSE; } -static void fragments_cleanup(bool all) { - mutex_lock(&sdp_fragments_lock); - t_hash_table_foreach_remove(sdp_fragments, fragment_check_cleanup, GINT_TO_POINTER(all)); - mutex_unlock(&sdp_fragments_lock); +void ice_fragments_cleanup(fragments_ht ht, bool all) { + t_hash_table_foreach_remove(ht, fragment_check_cleanup, GINT_TO_POINTER(all)); } @@ -764,22 +701,10 @@ static void __agent_deschedule(struct ice_agent *ag) { void ice_init(void) { random_string((void *) &tie_breaker, sizeof(tie_breaker)); timerthread_init(&ice_agents_timer_thread, 1, ice_agents_timer_run); - - sdp_fragments = fragments_ht_new(); - mutex_init(&sdp_fragments_lock); } void ice_free(void) { timerthread_free(&ice_agents_timer_thread); - - fragments_cleanup(true); - t_hash_table_destroy_ptr(&sdp_fragments); - mutex_destroy(&sdp_fragments_lock); -} - -enum thread_looper_action ice_slow_timer(void) { - fragments_cleanup(false); - return TLA_CONTINUE; } static void __fail_pair(struct ice_candidate_pair *pair) { diff --git a/daemon/janus.c b/daemon/janus.c index 1d78d8d02..0d838839f 100644 --- a/daemon/janus.c +++ b/daemon/janus.c @@ -851,7 +851,7 @@ static const char *janus_videoroom_configure(struct websocket_message *wm, struc if (strcmp(jsep_type, "offer")) return "Not an offer"; - g_auto(str) sdp_in = STR_DUP(jsep_sdp); + str sdp_in = call_str_cpy_c(jsep_sdp); g_auto(sdp_ng_flags) flags; g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT; @@ -956,7 +956,15 @@ static const char *janus_videoroom_start(struct websocket_message *wm, struct ja if (strcmp(jsep_type, "answer")) return "Not an answer"; - g_auto(str) sdp_in = STR_DUP(jsep_sdp); + struct janus_room *room = t_hash_table_lookup(janus_rooms, &room_id); + *retcode = 426; + if (!room) + return "No such room"; + g_autoptr(call_t) call = call_get(&room->call_id); + if (!call) + return "No such room"; + + str sdp_in = call_str_cpy_c(jsep_sdp); g_auto(sdp_ng_flags) flags; g_auto(sdp_sessions_q) parsed = TYPED_GQUEUE_INIT; @@ -966,14 +974,6 @@ static const char *janus_videoroom_start(struct websocket_message *wm, struct ja if (sdp_parse(&sdp_in, &parsed, &flags)) return "Failed to parse SDP"; - struct janus_room *room = t_hash_table_lookup(janus_rooms, &room_id); - *retcode = 426; - if (!room) - return "No such room"; - g_autoptr(call_t) call = call_get(&room->call_id); - if (!call) - return "No such room"; - *retcode = 512; if (sdp_streams(&parsed, &streams, &flags)) return "Incomplete SDP specification"; @@ -1677,7 +1677,7 @@ static const char *janus_trickle(JsonReader *reader, struct janus_session *sessi bencode_strdup_str(&ngbuf->buffer, &sp->ice_ufrag, ufrag); // finally do the update - trickle_ice_update(ngbuf, call, &flags, &streams, NULL); + trickle_ice_update(ngbuf, call, &flags, &streams); return NULL; } diff --git a/daemon/main.c b/daemon/main.c index bc1f2de7a..b616790c0 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1521,10 +1521,6 @@ int main(int argc, char **argv) { thread_create_looper(call_rate_stats_updater, rtpe_config.idle_scheduling, rtpe_config.idle_priority, "call stats", 1000000); - /* separate thread for ice slow timer functionality */ - thread_create_looper(ice_slow_timer, rtpe_config.idle_scheduling, - rtpe_config.idle_priority, "ICE slow", 1000000); - /* thread to close expired call */ thread_create_looper(call_timer, rtpe_config.idle_scheduling, rtpe_config.idle_priority, "kill calls", 1000000); diff --git a/include/call.h b/include/call.h index 65c40e0b1..0e5b6e789 100644 --- a/include/call.h +++ b/include/call.h @@ -634,6 +634,11 @@ TYPED_GQUEUE(monologues, struct call_monologue) G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(monologues_q, monologues_q_clear) TYPED_GHASHTABLE(tags_ht, str, struct call_monologue, str_hash, str_equal, NULL, NULL) +struct sdp_fragment; +TYPED_GQUEUE(fragment, struct sdp_fragment) +TYPED_GHASHTABLE(fragments_ht, str, fragment_q, str_hash, str_equal, NULL, NULL) + + struct call_iterator_list { call_list *first; mutex_t lock; // protects .first and every entry's .data @@ -738,6 +743,7 @@ struct call { tags_ht tags; tags_ht viabranches; labels_ht labels; + fragments_ht sdp_fragments; packet_stream_q streams; stream_fd_q stream_fds; /* stream_fd */ endpoint_map_q endpoint_maps; diff --git a/include/ice.h b/include/ice.h index c2e3a4fda..4903ca8d6 100644 --- a/include/ice.h +++ b/include/ice.h @@ -14,6 +14,7 @@ #include "socket.h" #include "timerthread.h" #include "types.h" +#include "call.h" #define MAX_COMPONENTS 2 #define TIMER_RUN_INTERVAL 20 /* ms */ @@ -167,9 +168,8 @@ int ice_response(stream_fd *, const endpoint_t *src, void dequeue_sdp_fragments(struct call_monologue *); bool trickle_ice_update(ng_buffer *ngbuf, call_t *call, sdp_ng_flags *flags, - sdp_streams_q *streams, sdp_sessions_q *sdp); - -enum thread_looper_action ice_slow_timer(void); + sdp_streams_q *streams); +void ice_fragments_cleanup(fragments_ht ht, bool all); #include "call.h" diff --git a/t/test-stats.c b/t/test-stats.c index 1f3cb6121..e9b05f2a8 100644 --- a/t/test-stats.c +++ b/t/test-stats.c @@ -3240,7 +3240,6 @@ int main(void) { call_timer(); stats_counters_calc_rate(rtpe_stats, 150000000, &rtpe_stats_intv, &rtpe_stats_rate); stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate); - ice_slow_timer(); RTPE_STATS_ADD(ng_commands[NGC_OFFER], 100); rtpe_now.tv_sec += 2; @@ -3249,7 +3248,6 @@ int main(void) { call_timer(); stats_counters_calc_rate(rtpe_stats, 2000000, &rtpe_stats_intv, &rtpe_stats_rate); stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate); - ice_slow_timer(); // timer run time interval increased rtpe_now.tv_sec += 5; @@ -3258,7 +3256,6 @@ int main(void) { call_timer(); stats_counters_calc_rate(rtpe_stats, 5000000, &rtpe_stats_intv, &rtpe_stats_rate); stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate); - ice_slow_timer(); graph_str = print_graphite_data(); assert_g_string_eq(graph_str,