MT#61625 add "transform" transcode handler

Change-Id: I9b9ef39859f196a9c0ea8a7aee657311a70a5d03
pull/1918/head
Richard Fuchs 2 months ago
parent 6b5fac9dc7
commit 7dbcb578ab

@ -3314,7 +3314,7 @@ static void __unsubscribe_media_link(struct call_media * which, subscription_lis
/**
* Unsubscribe one particular media subscriber from this call media.
*/
static bool __unsubscribe_media(struct call_media * which, struct call_media * from)
bool __unsubscribe_media(struct call_media * which, struct call_media * from)
{
if (!t_hash_table_is_set(which->media_subscriptions_ht)
|| !t_hash_table_is_set(from->media_subscribers_ht))
@ -5401,7 +5401,7 @@ struct call_media *call_make_transform_media(struct call_monologue *ml, const st
generate_mid(ret, ret->unique_id);
ret->protocol = &transport_protocols[PROTO_RTP_AVP];
bf_set(&ret->media_flags, MEDIA_FLAG_SEND | MEDIA_FLAG_RECV);
bf_set(&ret->media_flags, MEDIA_FLAG_SEND | MEDIA_FLAG_RECV | MEDIA_FLAG_RTCP_MUX);
ret->desired_family = remote->address.family;
__init_interface(ret, interface, 1);

@ -24,6 +24,7 @@
#include "fix_frame_channel_layout.h"
#endif
#include "bufferpool.h"
#include "ng_client.h"
struct codec_timer {
struct timerthread_obj tt_obj;
@ -261,6 +262,19 @@ struct rtcp_timer {
struct call_media *media;
};
struct transform_handler {
struct obj obj;
call_t *call;
struct call_media *source_media;
struct call_monologue *transform_ml;
struct call_media *transform_media; // points to the remote rtpengine
const struct transcode_config *tcc;
struct codec_handler *handler;
endpoint_t remote;
str call_id;
str tag;
};
static mutex_t transcode_lock = MUTEX_STATIC_INIT;
@ -304,6 +318,8 @@ static struct ssrc_entry *__ssrc_handler_decode_new(void *p);
static struct ssrc_entry *__ssrc_handler_new(void *p);
static void __ssrc_handler_stop(void *p, void *dummy);
static void __free_ssrc_handler(struct codec_ssrc_handler *);
static struct codec_handler *__get_pt_handler(struct call_media *receiver, rtp_payload_type *pt,
struct call_media *sink);
static void __transcode_packet_free(struct transcode_packet *);
@ -359,7 +375,45 @@ static struct codec_handler codec_handler_stub_blackhole = {
static void __transform_handler_shutdown(struct transform_handler *tfh) {
if (!tfh)
return;
if (!tfh->call)
return;
obj_release(tfh->call);
__unsubscribe_media(tfh->transform_media, tfh->source_media);
}
#define BENC_GS_APPEND(s) g_string_append_printf(req, "%zu:" STR_FORMAT, (s)->len, STR_FMT(s))
static void __transform_handler_destroy(struct transform_handler *tfh) {
if (!tfh)
return;
__transform_handler_shutdown(tfh);
__auto_type tcc = tfh->tcc;
if (tcc && tfh->call_id.len) {
// manually construct the request
g_autoptr(GString) req = g_string_new("d7:command6:delete7:call-id");
BENC_GS_APPEND(&tfh->call_id);
g_string_append(req, "8:from-tag");
BENC_GS_APPEND(&tfh->tag);
g_string_append(req, "12:delete delayi0ee");
str result;
__auto_type ret = ng_client_request(&tcc->transform, &STR_LEN(req->str, req->len), memory_arena);
if (!ret || !bencode_dictionary_get_str(ret, "result", &result) || str_cmp(&result, "ok"))
ilog(LOG_WARN, "Failed to delete remote 'transform' session");
}
}
static void __handler_shutdown(struct codec_handler *handler) {
ilogs(codec, LOG_DEBUG, "Shutting down codec handler for " STR_FORMAT,
STR_FMT(&handler->source_pt.encoding_with_full_params));
__transform_handler_shutdown(handler->transform);
handler->tcc = NULL;
obj_release(handler->transform);
ssrc_hash_foreach(handler->ssrc_hash, __ssrc_handler_stop, NULL);
free_ssrc_hash(&handler->ssrc_hash);
if (handler->delay_buffer) {
@ -499,6 +553,187 @@ static int handler_func_blackhole(struct codec_handler *h, struct media_packet *
return 0;
}
static void __transform_subscriptions(struct codec_handler *handler) {
__auto_type tfh = handler->transform;
// subscribe source to destination: send media to the remote
__add_media_subscription(tfh->transform_media, tfh->source_media, NULL);
// subscribe destination to output: forward received media to output
__add_media_subscription(handler->i.sink, tfh->transform_media, NULL);
}
static void append_pt(GString *req, const rtp_payload_type *pt) {
BENC_GS_APPEND(&pt->encoding);
g_string_append(req, "12:payload type");
g_string_append_printf(req, "i%ie", pt->payload_type);
g_string_append(req, "10:clock rate");
g_string_append_printf(req, "i%ue", pt->clock_rate);
g_string_append(req, "8:channels");
g_string_append_printf(req, "i%ue", pt->channels);
g_string_append(req, "6:format");
BENC_GS_APPEND(&pt->format_parameters);
g_string_append(req, "7:options");
BENC_GS_APPEND(&pt->codec_opts);
}
// returns NULL if transform handler was successfully set up
// returns empty string "" if no transform handler is requested
// returns error string on error
static const char *__make_transform_handler(struct codec_handler *handler) {
__auto_type tcc = handler->tcc;
if (!tcc)
return "";
if (!tcc->transform.address.family)
return "";
__auto_type tfh = handler->transform;
__auto_type media = handler->media;
__auto_type call = media->call;
if (tfh) {
if (handler->transform->tcc != tcc) {
ilogs(codec, LOG_DEBUG, "Transform handler mismatched, resetting");
obj_release(handler->transform);
tfh = handler->transform = NULL;
}
else {
ilogs(codec, LOG_DEBUG, "Leaving existing transform handler intact");
// restore reference in case it has been released
obj_release(tfh->call);
tfh->call = obj_get(call);
__transform_subscriptions(handler);
return NULL;
}
}
if (!handler->ssrc_hash)
handler->ssrc_hash = create_ssrc_hash_full(__ssrc_handler_new, handler);
if (!tfh) {
ilogs(codec, LOG_DEBUG, "Creating new transform handler");
tfh = handler->transform = obj_alloc0(struct transform_handler, __transform_handler_destroy);
tfh->call = obj_get(call);
tfh->source_media = media;
tfh->handler = handler;
tfh->tcc = tcc;
// create dedicated monologue and dedicated call_media to send media to the
// remote rtpengine and forward received media to its designated destination
tfh->transform_ml = call_get_or_create_monologue(call, STR_PTR("transform handler"));
tfh->transform_media = call_make_transform_media(tfh->transform_ml, &media->type, media->type_id,
&STR_NULL, &tcc->transform, &tcc->local_interface);
codec_store_add_raw(&tfh->transform_media->codecs, rtp_payload_type_dup(&handler->dest_pt));
__transform_subscriptions(handler);
}
// manually construct the request
g_autoptr(GString) req = g_string_new("d7:command9:transform5:mediald4:type");
BENC_GS_APPEND(&media->type);
g_string_append(req, "5:codecld5:inputd5:codec");
append_pt(req, &handler->source_pt);
g_string_append(req, "e6:outputd5:codec");
append_pt(req, &handler->dest_pt);
__auto_type ps = tfh->transform_media->streams.head->data;
__auto_type sock = &ps->selected_sfd->socket;
g_string_append(req, "eee11:destinationd");
g_string_append(req, "6:family");
BENC_GS_APPEND(STR_PTR(sock->local.address.family->rfc_name));
g_string_append(req, "7:address");
BENC_GS_APPEND(STR_PTR(sockaddr_print_buf(&sock->local.address)));
g_string_append_printf(req, "4:porti%ue", sock->local.port);
g_string_append(req, "eee");
if (tcc->remote_interface.len) {
g_string_append(req, "9:interface");
BENC_GS_APPEND(&tcc->remote_interface);
}
g_string_append(req, "8:instance");
BENC_GS_APPEND(&rtpe_instance_id);
g_string_append(req, "e");
// send out request
__auto_type ret = ng_client_request(&tcc->transform, &STR_LEN(req->str, req->len), memory_arena);
if (!ret)
return "'transform' request to remote peer failed";
// process response
str result;
if (!bencode_dictionary_get_str(ret, "result", &result))
return "'transform' response didn't contain 'result'";
if (str_cmp(&result, "ok"))
return "'transform' response didn't indicate success";
str s;
if (!bencode_dictionary_get_str(ret, "call-id", &s))
return "'transform' response didn't contain 'call-id'";
tfh->call_id = call_str_cpy(&s);
if (!bencode_dictionary_get_str(ret, "from-tag", &s))
return "'transform' response didn't contain 'from-tag'";
tfh->tag = call_str_cpy(&s);
__auto_type remote_media = bencode_dictionary_get_expect(ret, "media", BENCODE_LIST);
if (!remote_media)
return "'transform' response didn't contain 'media'";
if (!remote_media->child)
return "'transform' response contained empty 'media'";
__auto_type rm = remote_media->child; // just a single entry for now
if (rm->type != BENCODE_DICTIONARY)
return "incorrect type contained in 'media'";
str family, address;
int port = bencode_dictionary_get_integer(rm, "port", 0);
if (port <= 0 || port > 65535)
return "invalid port";
if (!bencode_dictionary_get_str(rm, "family", &family))
return "'transform' response media didn't contain 'family'";
if (!bencode_dictionary_get_str(rm, "address", &address))
return "'transform' response media didn't contain 'address'";
__auto_type fam = get_socket_family_rfc(&family);
if (!fam)
return "'transform' response media contained invalid 'family'";
if (!sockaddr_parse_str(&tfh->remote.address, fam, &address))
return "'transform' response media contained invalid 'address'";
tfh->remote.port = port;
ps = tfh->transform_media->streams.head->data;
ps->advertised_endpoint = ps->endpoint = tfh->remote;
PS_SET(ps, FILLED);
// don't send the affected PT to the destination, and make sure
// to only send the affected PT to the transform remote
handler->handler_func = handler_func_blackhole;
handler->kernelize = true;
handler->blackhole = true;
MEDIA_SET(tfh->transform_media, SELECT_PT);
__auto_type tfm_handler = __get_pt_handler(handler->media, &handler->source_pt, tfh->transform_media);
__make_passthrough(tfm_handler, -1, -1);
handler->stats_chain_suffix = " (TFM)";
handler->stats_chain_suffix_brief = "_tfm";
__handler_stats_entry(handler);
return NULL;
}
#undef BENC_GS_APPEND
static void __reset_sequencer(void *p, void *dummy) {
struct ssrc_entry_call *s = p;
if (s->sequencers)
@ -559,6 +794,15 @@ reset:
if (handler->source_pt.codec_def && handler->source_pt.codec_def->dtmf)
handler->handler_func = handler_func_dtmf;
handler->tcc = t_hash_table_lookup(rtpe_transcode_config, &handler->pi);
const char *err = __make_transform_handler(handler);
if (!err)
return true; // delegated to transcode handler
if (err[0] != '\0')
ilogs(codec, LOG_ERR, "Failed to set up transform handler: %s", err);
handler->ssrc_hash = create_ssrc_hash_full(ssrc_handler_new_func, handler);
ilogs(codec, LOG_DEBUG, "Created transcode context for " STR_FORMAT "/" STR_FORMAT " (%i) -> " STR_FORMAT
"/" STR_FORMAT " (%i) with DTMF output %i and CN output %i",
STR_FMT(&handler->source_pt.encoding_with_params),
@ -569,8 +813,6 @@ reset:
dest->payload_type,
dtmf_payload_type, cn_payload_type);
handler->ssrc_hash = create_ssrc_hash_full(ssrc_handler_new_func, handler);
if (handler->ssrc_hash->precreat
&& ((struct codec_ssrc_handler *) handler->ssrc_hash->precreat)->chain)
{
@ -585,6 +827,10 @@ reset:
no_handler_reset:
__delay_buffer_setup(&handler->delay_buffer, handler, handler->media->call, handler->media->buffer_delay);
__dtx_restart(handler);
// double-check transform handler in case this comes after a no-op reset (shutdown + no_handler_reset)
__make_transform_handler(handler);
// check if we have multiple decoders transcoding to the same output PT
struct codec_handler *output_handler = NULL;
if (output_transcoders)
@ -1178,6 +1424,7 @@ void __codec_handlers_update(struct call_media *receiver, struct call_media *sin
__generator_stop(receiver);
__generator_stop(sink);
codec_handlers_stop(&receiver->codec_handlers_store, sink);
// XXX this can cause unnecessary shutdown/resets of codec handlers
bool is_transcoding = false;
receiver->rtcp_handler = NULL;
@ -1664,6 +1911,7 @@ bool codec_handler_transform(struct call_media *receiver, ng_codecs_q *q) {
#ifdef WITH_TRANSCODING
if (!codec_def_supported(input->codec_def) || !codec_def_supported(output->codec_def))
return false;
codec_store_add_raw(&receiver->codecs, rtp_payload_type_dup(input));
__make_transcoder(handler, output, NULL, -1, false, -1);
#else
return false;
@ -3788,6 +4036,7 @@ void codec_handlers_stop(codec_handlers_q *q, struct call_media *sink) {
delay_buffer_stop(&h->delay_buffer);
}
ssrc_hash_foreach(h->ssrc_hash, __ssrc_handler_stop, NULL);
__transform_handler_shutdown(h->transform);
}
}

@ -438,7 +438,20 @@ static void do_transcode_config(const char *name, charp_ht ht, struct transcode_
if (!codec_parse_payload_type(&tc->i.dst, STR_PTR(tc->dst)))
die("Failed to parse source codec '%s' in transcode config '%s'", src, name);
die("Transcode config '%s' has no verdict", name);
char *tfm = t_hash_table_lookup(ht, "transform");
if (tfm) {
if (!endpoint_parse_any_getaddrinfo_full(&tc->transform, tfm))
die("Failed to parse transform endpoint '%s' in transcode config '%s'", tfm, name);
char *iface = t_hash_table_lookup(ht, "local-interface");
if (iface)
tc->local_interface = STR(iface);
iface = t_hash_table_lookup(ht, "remote-interface");
if (iface)
tc->remote_interface = STR(iface);
return;
}
else
die("Transcode config '%s' has no verdict", name);
}
static bool if_addr_parse(intf_config_q *q, char *s, struct ifaddrs *ifas) {

@ -1848,7 +1848,24 @@ For example, if __source=PCMU__ and __destination=PCMA__ are set, then this
particular transcoding config section would be considered if the received codec
is PCMU and the requested output codec is PCMA.
No verdicts are supported at this time.
### __transform__ Verdict
To use the __transform__ verdict, the config key __transform=...__ must be set.
The value is a remote NG peer endpoint, such as the NG control port of another
*rtpengine* instance, in the usual *address:port* notation. This tells
*rtpengine* to send a __transform__ NG request to the remote peer, to request
remotely transcoding the media. If successful, *rtpengine* will forward the
unmodified media stream to the remote peer, to the address and port received
from the NG response, and will expect the transcoded media to be sent back in
return, which it will then forward to its appropriate destination. In other
words, it facilitates a designated remote transcoding node, instead of doing
the transcoding locally.
Optional config keys that can be set are __local-interface=...__ to select the
local interface to use to communicate with the remote transform gateway,
__remote-interface=...__ to tell the remote gateway to use a particular
interface on its side, and __address-family=...__ to select between IPv4 and
IPv6.
## EXIT STATUS

@ -175,6 +175,24 @@ recording-method = proc
# preload-db-cache = 1; 2; 3; 4; on-demand
# cache-media-reload = 3600
transcode-config = transcode
[transcode-1]
source = opus
destination = PCMA
transform = localhost:2228
# local-interface = ...
# remote-interface = ...
# address-family = ...
[transcode-2]
source = PCMA
destination = opus
transform = localhost:2228
# local-interface = ...
# remote-interface = ...
# address-family = ...
# signalling templates (see key `templates` above)
[templates]
WebRTC = transport-protocol=UDP/TLS/RTP/SAVPF ICE=force trickle-ICE rtcp-mux=[offer require] no-rtcp-attribute SDES=off generate-mid

@ -831,6 +831,7 @@ struct packet_stream *__packet_stream_new(call_t *call);
__attribute__((nonnull(1, 2)))
struct media_subscription *__add_media_subscription(struct call_media * which, struct call_media * to,
const struct sink_attrs *attrs);
bool __unsubscribe_media(struct call_media * which, struct call_media * from);
struct media_subscription *call_ml_get_top_ms(struct call_monologue *ml);
bool call_ml_sendonly_inactive(struct call_monologue *ml);
struct media_subscription *call_media_get_top_ms(struct call_media * cm);
@ -854,6 +855,9 @@ int call_get_mono_dialogue(struct call_monologue *monologues[2], call_t *call,
sdp_ng_flags *);
struct call_monologue *call_get_monologue(call_t *call, const str *fromtag);
struct call_monologue *call_get_or_create_monologue(call_t *call, const str *fromtag);
__attribute__((nonnull(1, 2, 4, 5, 6)))
struct call_media *call_make_transform_media(struct call_monologue *ml, const str *type, enum media_type type_id,
const str *media_id, const endpoint_t *remote, const str *interface);
__attribute__((nonnull(1)))
call_t *call_get(const str *callid);
typedef enum { CG2_OK, CG2_NF1, CG2_NF2, CG2_SAME } call_get2_ret_t;

@ -76,6 +76,9 @@ struct codec_handler {
// for DTMF injection
struct codec_handler *dtmf_injector;
struct delay_buffer *delay_buffer;
// for remote transcoding
struct transcode_config *tcc;
struct transform_handler *transform;
// stats entry
const char *stats_chain_suffix;
@ -111,6 +114,12 @@ struct transcode_config {
char *name; // of the config section
struct codec_pipeline_index i; // parsed
// verdict
endpoint_t transform;
str local_interface;
str remote_interface;
//const sockfamily_t *address_family;
};
typedef union {

@ -87,7 +87,7 @@ DAEMONSRCS+= control_ng_flags_parser.c codec.c call.c ice.c kernel.c media_socke
dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \
cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \
media_player.c jitter_buffer.c t38.c tcp_listener.c mqtt.c websocket.c \
audio_player.c arena.c
audio_player.c arena.c ng_client.c
HASHSRCS+= call_interfaces.c control_ng.c sdp.c janus.c
LIBASM= mvr2s_x64_avx2.S mvr2s_x64_avx512.S mix_in_x64_avx2.S mix_in_x64_avx512bw.S mix_in_x64_sse2.S
endif
@ -285,7 +285,7 @@ test-stats: test-stats.o $(COMMONOBJS) codeclib.strhash.o resample.o codec.o ssr
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o \
websocket.o cli.strhash.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o audio_player.o mix_buffer.o \
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o uring.o arena.o
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o uring.o arena.o ng_client.o
test-transcode: test-transcode.o $(COMMONOBJS) codeclib.strhash.o resample.o codec.o ssrc.o call.o ice.o helpers.o \
kernel.o media_socket.o stun.o bencode.o socket.o poller.o dtls.o recording.o statistics.o \
@ -294,7 +294,7 @@ test-transcode: test-transcode.o $(COMMONOBJS) codeclib.strhash.o resample.o cod
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o websocket.o \
cli.strhash.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o audio_player.o mix_buffer.o \
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o uring.o arena.o
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o uring.o arena.o ng_client.o
test-resample: test-resample.o $(COMMONOBJS) codeclib.strhash.o resample.o dtmflib.o mvr2s_x64_avx2.o \
mvr2s_x64_avx512.o bencode.o

Loading…
Cancel
Save