diff --git a/daemon/call.c b/daemon/call.c index bbe7cdd31..65508eba0 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -3314,7 +3314,7 @@ static void __unsubscribe_media_link(struct call_media * which, subscription_lis /** * Unsubscribe one particular media subscriber from this call media. */ -static bool __unsubscribe_media(struct call_media * which, struct call_media * from) +bool __unsubscribe_media(struct call_media * which, struct call_media * from) { if (!t_hash_table_is_set(which->media_subscriptions_ht) || !t_hash_table_is_set(from->media_subscribers_ht)) @@ -5401,7 +5401,7 @@ struct call_media *call_make_transform_media(struct call_monologue *ml, const st generate_mid(ret, ret->unique_id); ret->protocol = &transport_protocols[PROTO_RTP_AVP]; - bf_set(&ret->media_flags, MEDIA_FLAG_SEND | MEDIA_FLAG_RECV); + bf_set(&ret->media_flags, MEDIA_FLAG_SEND | MEDIA_FLAG_RECV | MEDIA_FLAG_RTCP_MUX); ret->desired_family = remote->address.family; __init_interface(ret, interface, 1); diff --git a/daemon/codec.c b/daemon/codec.c index 834f7e6d8..d404ba357 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -24,6 +24,7 @@ #include "fix_frame_channel_layout.h" #endif #include "bufferpool.h" +#include "ng_client.h" struct codec_timer { struct timerthread_obj tt_obj; @@ -261,6 +262,19 @@ struct rtcp_timer { struct call_media *media; }; +struct transform_handler { + struct obj obj; + call_t *call; + struct call_media *source_media; + struct call_monologue *transform_ml; + struct call_media *transform_media; // points to the remote rtpengine + const struct transcode_config *tcc; + struct codec_handler *handler; + endpoint_t remote; + str call_id; + str tag; +}; + static mutex_t transcode_lock = MUTEX_STATIC_INIT; @@ -304,6 +318,8 @@ static struct ssrc_entry *__ssrc_handler_decode_new(void *p); static struct ssrc_entry *__ssrc_handler_new(void *p); static void __ssrc_handler_stop(void *p, void *dummy); static void __free_ssrc_handler(struct codec_ssrc_handler *); +static struct codec_handler *__get_pt_handler(struct call_media *receiver, rtp_payload_type *pt, + struct call_media *sink); static void __transcode_packet_free(struct transcode_packet *); @@ -359,7 +375,45 @@ static struct codec_handler codec_handler_stub_blackhole = { +static void __transform_handler_shutdown(struct transform_handler *tfh) { + if (!tfh) + return; + if (!tfh->call) + return; + obj_release(tfh->call); + __unsubscribe_media(tfh->transform_media, tfh->source_media); +} + + +#define BENC_GS_APPEND(s) g_string_append_printf(req, "%zu:" STR_FORMAT, (s)->len, STR_FMT(s)) + +static void __transform_handler_destroy(struct transform_handler *tfh) { + if (!tfh) + return; + __transform_handler_shutdown(tfh); + + __auto_type tcc = tfh->tcc; + if (tcc && tfh->call_id.len) { + // manually construct the request + g_autoptr(GString) req = g_string_new("d7:command6:delete7:call-id"); + BENC_GS_APPEND(&tfh->call_id); + g_string_append(req, "8:from-tag"); + BENC_GS_APPEND(&tfh->tag); + g_string_append(req, "12:delete delayi0ee"); + + str result; + __auto_type ret = ng_client_request(&tcc->transform, &STR_LEN(req->str, req->len), memory_arena); + if (!ret || !bencode_dictionary_get_str(ret, "result", &result) || str_cmp(&result, "ok")) + ilog(LOG_WARN, "Failed to delete remote 'transform' session"); + } +} + static void __handler_shutdown(struct codec_handler *handler) { + ilogs(codec, LOG_DEBUG, "Shutting down codec handler for " STR_FORMAT, + STR_FMT(&handler->source_pt.encoding_with_full_params)); + __transform_handler_shutdown(handler->transform); + handler->tcc = NULL; + obj_release(handler->transform); ssrc_hash_foreach(handler->ssrc_hash, __ssrc_handler_stop, NULL); free_ssrc_hash(&handler->ssrc_hash); if (handler->delay_buffer) { @@ -499,6 +553,187 @@ static int handler_func_blackhole(struct codec_handler *h, struct media_packet * return 0; } +static void __transform_subscriptions(struct codec_handler *handler) { + __auto_type tfh = handler->transform; + + // subscribe source to destination: send media to the remote + __add_media_subscription(tfh->transform_media, tfh->source_media, NULL); + + // subscribe destination to output: forward received media to output + __add_media_subscription(handler->i.sink, tfh->transform_media, NULL); +} + +static void append_pt(GString *req, const rtp_payload_type *pt) { + BENC_GS_APPEND(&pt->encoding); + g_string_append(req, "12:payload type"); + g_string_append_printf(req, "i%ie", pt->payload_type); + g_string_append(req, "10:clock rate"); + g_string_append_printf(req, "i%ue", pt->clock_rate); + g_string_append(req, "8:channels"); + g_string_append_printf(req, "i%ue", pt->channels); + g_string_append(req, "6:format"); + BENC_GS_APPEND(&pt->format_parameters); + g_string_append(req, "7:options"); + BENC_GS_APPEND(&pt->codec_opts); +} + +// returns NULL if transform handler was successfully set up +// returns empty string "" if no transform handler is requested +// returns error string on error +static const char *__make_transform_handler(struct codec_handler *handler) { + __auto_type tcc = handler->tcc; + if (!tcc) + return ""; + if (!tcc->transform.address.family) + return ""; + + __auto_type tfh = handler->transform; + __auto_type media = handler->media; + __auto_type call = media->call; + + if (tfh) { + if (handler->transform->tcc != tcc) { + ilogs(codec, LOG_DEBUG, "Transform handler mismatched, resetting"); + obj_release(handler->transform); + tfh = handler->transform = NULL; + } + else { + ilogs(codec, LOG_DEBUG, "Leaving existing transform handler intact"); + // restore reference in case it has been released + obj_release(tfh->call); + tfh->call = obj_get(call); + __transform_subscriptions(handler); + return NULL; + } + } + + if (!handler->ssrc_hash) + handler->ssrc_hash = create_ssrc_hash_full(__ssrc_handler_new, handler); + + if (!tfh) { + ilogs(codec, LOG_DEBUG, "Creating new transform handler"); + + tfh = handler->transform = obj_alloc0(struct transform_handler, __transform_handler_destroy); + tfh->call = obj_get(call); + tfh->source_media = media; + tfh->handler = handler; + tfh->tcc = tcc; + + // create dedicated monologue and dedicated call_media to send media to the + // remote rtpengine and forward received media to its designated destination + tfh->transform_ml = call_get_or_create_monologue(call, STR_PTR("transform handler")); + + tfh->transform_media = call_make_transform_media(tfh->transform_ml, &media->type, media->type_id, + &STR_NULL, &tcc->transform, &tcc->local_interface); + + codec_store_add_raw(&tfh->transform_media->codecs, rtp_payload_type_dup(&handler->dest_pt)); + + __transform_subscriptions(handler); + } + + // manually construct the request + g_autoptr(GString) req = g_string_new("d7:command9:transform5:mediald4:type"); + BENC_GS_APPEND(&media->type); + + g_string_append(req, "5:codecld5:inputd5:codec"); + append_pt(req, &handler->source_pt); + + g_string_append(req, "e6:outputd5:codec"); + append_pt(req, &handler->dest_pt); + + __auto_type ps = tfh->transform_media->streams.head->data; + __auto_type sock = &ps->selected_sfd->socket; + + g_string_append(req, "eee11:destinationd"); + g_string_append(req, "6:family"); + BENC_GS_APPEND(STR_PTR(sock->local.address.family->rfc_name)); + g_string_append(req, "7:address"); + BENC_GS_APPEND(STR_PTR(sockaddr_print_buf(&sock->local.address))); + g_string_append_printf(req, "4:porti%ue", sock->local.port); + + g_string_append(req, "eee"); + + if (tcc->remote_interface.len) { + g_string_append(req, "9:interface"); + BENC_GS_APPEND(&tcc->remote_interface); + } + + g_string_append(req, "8:instance"); + BENC_GS_APPEND(&rtpe_instance_id); + + g_string_append(req, "e"); + + // send out request + __auto_type ret = ng_client_request(&tcc->transform, &STR_LEN(req->str, req->len), memory_arena); + if (!ret) + return "'transform' request to remote peer failed"; + + // process response + str result; + if (!bencode_dictionary_get_str(ret, "result", &result)) + return "'transform' response didn't contain 'result'"; + if (str_cmp(&result, "ok")) + return "'transform' response didn't indicate success"; + + str s; + + if (!bencode_dictionary_get_str(ret, "call-id", &s)) + return "'transform' response didn't contain 'call-id'"; + tfh->call_id = call_str_cpy(&s); + + if (!bencode_dictionary_get_str(ret, "from-tag", &s)) + return "'transform' response didn't contain 'from-tag'"; + tfh->tag = call_str_cpy(&s); + + __auto_type remote_media = bencode_dictionary_get_expect(ret, "media", BENCODE_LIST); + if (!remote_media) + return "'transform' response didn't contain 'media'"; + if (!remote_media->child) + return "'transform' response contained empty 'media'"; + + __auto_type rm = remote_media->child; // just a single entry for now + + if (rm->type != BENCODE_DICTIONARY) + return "incorrect type contained in 'media'"; + str family, address; + int port = bencode_dictionary_get_integer(rm, "port", 0); + if (port <= 0 || port > 65535) + return "invalid port"; + if (!bencode_dictionary_get_str(rm, "family", &family)) + return "'transform' response media didn't contain 'family'"; + if (!bencode_dictionary_get_str(rm, "address", &address)) + return "'transform' response media didn't contain 'address'"; + __auto_type fam = get_socket_family_rfc(&family); + if (!fam) + return "'transform' response media contained invalid 'family'"; + if (!sockaddr_parse_str(&tfh->remote.address, fam, &address)) + return "'transform' response media contained invalid 'address'"; + tfh->remote.port = port; + + ps = tfh->transform_media->streams.head->data; + ps->advertised_endpoint = ps->endpoint = tfh->remote; + PS_SET(ps, FILLED); + + // don't send the affected PT to the destination, and make sure + // to only send the affected PT to the transform remote + handler->handler_func = handler_func_blackhole; + handler->kernelize = true; + handler->blackhole = true; + + MEDIA_SET(tfh->transform_media, SELECT_PT); + __auto_type tfm_handler = __get_pt_handler(handler->media, &handler->source_pt, tfh->transform_media); + __make_passthrough(tfm_handler, -1, -1); + + handler->stats_chain_suffix = " (TFM)"; + handler->stats_chain_suffix_brief = "_tfm"; + + __handler_stats_entry(handler); + + return NULL; +} + +#undef BENC_GS_APPEND + static void __reset_sequencer(void *p, void *dummy) { struct ssrc_entry_call *s = p; if (s->sequencers) @@ -559,6 +794,15 @@ reset: if (handler->source_pt.codec_def && handler->source_pt.codec_def->dtmf) handler->handler_func = handler_func_dtmf; + handler->tcc = t_hash_table_lookup(rtpe_transcode_config, &handler->pi); + const char *err = __make_transform_handler(handler); + if (!err) + return true; // delegated to transcode handler + if (err[0] != '\0') + ilogs(codec, LOG_ERR, "Failed to set up transform handler: %s", err); + + handler->ssrc_hash = create_ssrc_hash_full(ssrc_handler_new_func, handler); + ilogs(codec, LOG_DEBUG, "Created transcode context for " STR_FORMAT "/" STR_FORMAT " (%i) -> " STR_FORMAT "/" STR_FORMAT " (%i) with DTMF output %i and CN output %i", STR_FMT(&handler->source_pt.encoding_with_params), @@ -569,8 +813,6 @@ reset: dest->payload_type, dtmf_payload_type, cn_payload_type); - handler->ssrc_hash = create_ssrc_hash_full(ssrc_handler_new_func, handler); - if (handler->ssrc_hash->precreat && ((struct codec_ssrc_handler *) handler->ssrc_hash->precreat)->chain) { @@ -585,6 +827,10 @@ reset: no_handler_reset: __delay_buffer_setup(&handler->delay_buffer, handler, handler->media->call, handler->media->buffer_delay); __dtx_restart(handler); + + // double-check transform handler in case this comes after a no-op reset (shutdown + no_handler_reset) + __make_transform_handler(handler); + // check if we have multiple decoders transcoding to the same output PT struct codec_handler *output_handler = NULL; if (output_transcoders) @@ -1178,6 +1424,7 @@ void __codec_handlers_update(struct call_media *receiver, struct call_media *sin __generator_stop(receiver); __generator_stop(sink); codec_handlers_stop(&receiver->codec_handlers_store, sink); + // XXX this can cause unnecessary shutdown/resets of codec handlers bool is_transcoding = false; receiver->rtcp_handler = NULL; @@ -1664,6 +1911,7 @@ bool codec_handler_transform(struct call_media *receiver, ng_codecs_q *q) { #ifdef WITH_TRANSCODING if (!codec_def_supported(input->codec_def) || !codec_def_supported(output->codec_def)) return false; + codec_store_add_raw(&receiver->codecs, rtp_payload_type_dup(input)); __make_transcoder(handler, output, NULL, -1, false, -1); #else return false; @@ -3788,6 +4036,7 @@ void codec_handlers_stop(codec_handlers_q *q, struct call_media *sink) { delay_buffer_stop(&h->delay_buffer); } ssrc_hash_foreach(h->ssrc_hash, __ssrc_handler_stop, NULL); + __transform_handler_shutdown(h->transform); } } diff --git a/daemon/main.c b/daemon/main.c index dbb33d2ad..528055dd5 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -438,7 +438,20 @@ static void do_transcode_config(const char *name, charp_ht ht, struct transcode_ if (!codec_parse_payload_type(&tc->i.dst, STR_PTR(tc->dst))) die("Failed to parse source codec '%s' in transcode config '%s'", src, name); - die("Transcode config '%s' has no verdict", name); + char *tfm = t_hash_table_lookup(ht, "transform"); + if (tfm) { + if (!endpoint_parse_any_getaddrinfo_full(&tc->transform, tfm)) + die("Failed to parse transform endpoint '%s' in transcode config '%s'", tfm, name); + char *iface = t_hash_table_lookup(ht, "local-interface"); + if (iface) + tc->local_interface = STR(iface); + iface = t_hash_table_lookup(ht, "remote-interface"); + if (iface) + tc->remote_interface = STR(iface); + return; + } + else + die("Transcode config '%s' has no verdict", name); } static bool if_addr_parse(intf_config_q *q, char *s, struct ifaddrs *ifas) { diff --git a/docs/rtpengine.md b/docs/rtpengine.md index a533fc06f..5dbe10b78 100644 --- a/docs/rtpengine.md +++ b/docs/rtpengine.md @@ -1848,7 +1848,24 @@ For example, if __source=PCMU__ and __destination=PCMA__ are set, then this particular transcoding config section would be considered if the received codec is PCMU and the requested output codec is PCMA. -No verdicts are supported at this time. +### __transform__ Verdict + +To use the __transform__ verdict, the config key __transform=...__ must be set. +The value is a remote NG peer endpoint, such as the NG control port of another +*rtpengine* instance, in the usual *address:port* notation. This tells +*rtpengine* to send a __transform__ NG request to the remote peer, to request +remotely transcoding the media. If successful, *rtpengine* will forward the +unmodified media stream to the remote peer, to the address and port received +from the NG response, and will expect the transcoded media to be sent back in +return, which it will then forward to its appropriate destination. In other +words, it facilitates a designated remote transcoding node, instead of doing +the transcoding locally. + +Optional config keys that can be set are __local-interface=...__ to select the +local interface to use to communicate with the remote transform gateway, +__remote-interface=...__ to tell the remote gateway to use a particular +interface on its side, and __address-family=...__ to select between IPv4 and +IPv6. ## EXIT STATUS diff --git a/etc/rtpengine.conf b/etc/rtpengine.conf index a960cda28..914bc264e 100644 --- a/etc/rtpengine.conf +++ b/etc/rtpengine.conf @@ -175,6 +175,24 @@ recording-method = proc # preload-db-cache = 1; 2; 3; 4; on-demand # cache-media-reload = 3600 +transcode-config = transcode + +[transcode-1] +source = opus +destination = PCMA +transform = localhost:2228 +# local-interface = ... +# remote-interface = ... +# address-family = ... + +[transcode-2] +source = PCMA +destination = opus +transform = localhost:2228 +# local-interface = ... +# remote-interface = ... +# address-family = ... + # signalling templates (see key `templates` above) [templates] WebRTC = transport-protocol=UDP/TLS/RTP/SAVPF ICE=force trickle-ICE rtcp-mux=[offer require] no-rtcp-attribute SDES=off generate-mid diff --git a/include/call.h b/include/call.h index a010e37e9..822d90f7f 100644 --- a/include/call.h +++ b/include/call.h @@ -831,6 +831,7 @@ struct packet_stream *__packet_stream_new(call_t *call); __attribute__((nonnull(1, 2))) struct media_subscription *__add_media_subscription(struct call_media * which, struct call_media * to, const struct sink_attrs *attrs); +bool __unsubscribe_media(struct call_media * which, struct call_media * from); struct media_subscription *call_ml_get_top_ms(struct call_monologue *ml); bool call_ml_sendonly_inactive(struct call_monologue *ml); struct media_subscription *call_media_get_top_ms(struct call_media * cm); @@ -854,6 +855,9 @@ int call_get_mono_dialogue(struct call_monologue *monologues[2], call_t *call, sdp_ng_flags *); struct call_monologue *call_get_monologue(call_t *call, const str *fromtag); struct call_monologue *call_get_or_create_monologue(call_t *call, const str *fromtag); +__attribute__((nonnull(1, 2, 4, 5, 6))) +struct call_media *call_make_transform_media(struct call_monologue *ml, const str *type, enum media_type type_id, + const str *media_id, const endpoint_t *remote, const str *interface); __attribute__((nonnull(1))) call_t *call_get(const str *callid); typedef enum { CG2_OK, CG2_NF1, CG2_NF2, CG2_SAME } call_get2_ret_t; diff --git a/include/codec.h b/include/codec.h index 3ce297631..8e78d8c3b 100644 --- a/include/codec.h +++ b/include/codec.h @@ -76,6 +76,9 @@ struct codec_handler { // for DTMF injection struct codec_handler *dtmf_injector; struct delay_buffer *delay_buffer; + // for remote transcoding + struct transcode_config *tcc; + struct transform_handler *transform; // stats entry const char *stats_chain_suffix; @@ -111,6 +114,12 @@ struct transcode_config { char *name; // of the config section struct codec_pipeline_index i; // parsed + + // verdict + endpoint_t transform; + str local_interface; + str remote_interface; + //const sockfamily_t *address_family; }; typedef union { diff --git a/t/Makefile b/t/Makefile index 3586dd726..79bd45edc 100644 --- a/t/Makefile +++ b/t/Makefile @@ -87,7 +87,7 @@ DAEMONSRCS+= control_ng_flags_parser.c codec.c call.c ice.c kernel.c media_socke dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \ cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \ media_player.c jitter_buffer.c t38.c tcp_listener.c mqtt.c websocket.c \ - audio_player.c arena.c + audio_player.c arena.c ng_client.c HASHSRCS+= call_interfaces.c control_ng.c sdp.c janus.c LIBASM= mvr2s_x64_avx2.S mvr2s_x64_avx512.S mix_in_x64_avx2.S mix_in_x64_avx512bw.S mix_in_x64_sse2.S endif @@ -285,7 +285,7 @@ test-stats: test-stats.o $(COMMONOBJS) codeclib.strhash.o resample.o codec.o ssr streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o \ websocket.o cli.strhash.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o audio_player.o mix_buffer.o \ - mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o uring.o arena.o + mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o uring.o arena.o ng_client.o test-transcode: test-transcode.o $(COMMONOBJS) codeclib.strhash.o resample.o codec.o ssrc.o call.o ice.o helpers.o \ kernel.o media_socket.o stun.o bencode.o socket.o poller.o dtls.o recording.o statistics.o \ @@ -294,7 +294,7 @@ test-transcode: test-transcode.o $(COMMONOBJS) codeclib.strhash.o resample.o cod streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o websocket.o \ cli.strhash.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o audio_player.o mix_buffer.o \ - mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o uring.o arena.o + mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o uring.o arena.o ng_client.o test-resample: test-resample.o $(COMMONOBJS) codeclib.strhash.o resample.o dtmflib.o mvr2s_x64_avx2.o \ mvr2s_x64_avx512.o bencode.o