MT#55283 support buffered media player

Change-Id: I9e935b8561bb8710933fa11de383458896c0a5d9
pull/1592/head
Richard Fuchs 2 years ago
parent bec997590b
commit cf12ffc264

@ -520,6 +520,13 @@ struct codec_handler *codec_handler_make_playback(const struct rtp_payload_type
return handler;
}
struct codec_handler *codec_handler_make_dummy(const struct rtp_payload_type *dst_pt, struct call_media *media)
{
struct codec_handler *handler = __handler_new(NULL, media, NULL);
rtp_payload_type_copy(&handler->dest_pt, dst_pt);
return handler;
}
// does not init/parse a=fmtp
static void ensure_codec_def_type(struct rtp_payload_type *pt, enum media_type type) {

@ -555,6 +555,7 @@ static void options(int *argc, char ***argv) {
{ "amr-dtx", 0,0, G_OPTION_ARG_STRING, &amr_dtx, "DTX mechanism to use for AMR and AMR-WB","native|CN"},
{ "silence-detect",0,0, G_OPTION_ARG_DOUBLE, &silence_detect, "Audio level threshold in percent for silence detection","FLOAT"},
{ "cn-payload",0,0, G_OPTION_ARG_STRING_ARRAY,&cn_payload, "Comfort noise parameters to replace silence with","INT INT INT ..."},
{ "player-cache",0,0, G_OPTION_ARG_NONE, &rtpe_config.player_cache,"Cache media files for playback in memory",NULL},
#endif
#ifdef HAVE_MQTT
{ "mqtt-host",0,0, G_OPTION_ARG_STRING, &rtpe_config.mqtt_host, "Mosquitto broker host or address", "HOST|IP"},

@ -30,6 +30,35 @@
static struct timerthread media_player_thread;
static __thread MYSQL *mysql_conn;
struct media_player_cache_index {
struct media_player_content_index index;
struct rtp_payload_type dst_pt;
};
struct media_player_cache_entry {
bool finished;
// "unfinished" elements, only used while decoding is active:
mutex_t lock;
cond_t cond; // to wait for more data to be decoded
GPtrArray *packets; // read-only except for decoder thread, which uses finished flags and locks
struct codec_scheduler csch;
struct media_player_coder coder; // de/encoder data
char *info_str; // for logging
};
struct media_player_cache_packet {
char *buf;
str s;
long long pts;
long long duration;
long long duration_ts;
};
static mutex_t media_player_cache_lock;
static GHashTable *media_player_cache; // keys and values only ever freed at shutdown
static void media_player_read_packet(struct media_player *mp);
#endif
@ -85,6 +114,13 @@ static void media_player_shutdown(struct media_player *mp) {
mp->media = NULL;
media_player_coder_shutdown(&mp->coder);
mp->cache_index.type = MP_OTHER;
if (mp->cache_index.file.s)
g_free(mp->cache_index.file.s);
mp->cache_index.file = STR_NULL;
mp->cache_entry = NULL;
mp->cache_read_idx = 0;
}
#endif
@ -103,13 +139,13 @@ long long media_player_stop(struct media_player *mp) {
static void __media_player_free(void *p) {
struct media_player *mp = p;
//ilog(LOG_DEBUG, "freeing media_player");
media_player_shutdown(mp);
ssrc_ctx_put(&mp->ssrc_out);
mutex_destroy(&mp->lock);
obj_put(mp->call);
av_packet_free(&mp->coder.pkt);
if (mp->cache_index.file.s)
g_free(mp->cache_index.file.s);
}
#endif
@ -134,6 +170,7 @@ struct media_player *media_player_new(struct call_monologue *ml) {
mp->call = obj_get(ml->call);
mp->ml = ml;
mp->seq = ssl_random();
mp->buffer_ts = ssl_random();
mp->ssrc_out = ssrc_ctx;
mp->coder.pkt = av_packet_alloc();
@ -149,9 +186,6 @@ struct media_player *media_player_new(struct call_monologue *ml) {
static void __send_timer_free(void *p) {
struct send_timer *st = p;
//ilog(LOG_DEBUG, "freeing send_timer");
obj_put(st->call);
}
@ -165,8 +199,6 @@ static void __send_timer_send_later(struct timerthread_queue *ttq, void *p) {
// call->master_lock held in W
struct send_timer *send_timer_new(struct packet_stream *ps) {
//ilog(LOG_DEBUG, "creating send_timer");
struct send_timer *st = timerthread_queue_new("send_timer", sizeof(*st),
&send_timer_thread,
__send_timer_send_now,
@ -321,6 +353,296 @@ static void media_player_coder_add_packet(struct media_player_coder *c,
}
static void media_player_read_decoded_packet(struct media_player *mp) {
struct media_player_cache_entry *entry = mp->cache_entry;
unsigned int read_idx = mp->cache_read_idx;
ilog(LOG_DEBUG, "Buffered media player reading packet #%u", read_idx);
retry:;
bool finished = entry->finished; // hold lock or not
if (!finished) {
// slow track with locking
mutex_lock(&entry->lock);
// confirm that we are indeed not finished
if (entry->finished) {
// preempted, good to go
mutex_unlock(&entry->lock);
finished = true;
}
}
if (read_idx >= entry->packets->len) {
if (!finished) {
// wait for more
cond_wait(&entry->cond, &entry->lock);
mutex_unlock(&entry->lock);
goto retry;
}
// EOF
if (mp->repeat <= 1) {
ilog(LOG_DEBUG, "EOF reading from media buffer (%s), stopping playback",
entry->info_str);
return;
}
ilog(LOG_DEBUG, "EOF reading from media buffer (%s) but will repeat %li time",
entry->info_str, mp->repeat);
mp->repeat--;
read_idx = mp->cache_read_idx = 0;
goto retry;
}
// got a packet
struct media_player_cache_packet *pkt = entry->packets->pdata[read_idx];
long long us_dur = pkt->duration;
mp->cache_read_idx++;
if (!finished)
mutex_unlock(&entry->lock);
// make a copy to send out
size_t len = pkt->s.len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM;
char *buf = g_malloc(len);
memcpy(buf, pkt->buf, len);
struct media_packet packet = {
.tv = rtpe_now,
.call = mp->call,
.media = mp->media,
.media_out = mp->media,
.rtp = (void *) buf,
.ssrc_out = mp->ssrc_out,
};
mp->last_frame_ts = pkt->pts;
codec_output_rtp(&packet, &entry->csch, mp->coder.handler, buf, pkt->s.len, mp->buffer_ts,
read_idx == 0, mp->seq++, 0, -1, 0);
mp->buffer_ts += pkt->duration_ts;
mp->sync_ts_tv = rtpe_now;
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
// schedule our next run
timeval_add_usec(&mp->next_run, us_dur);
timerthread_obj_schedule_abs(&mp->tt_obj, &mp->next_run);
}
static void media_player_cached_reader_start(struct media_player *mp, const struct rtp_payload_type *dst_pt,
long long repeat)
{
struct media_player_cache_entry *entry = mp->cache_entry;
// create dummy codec handler and start timer
mp->coder.handler = codec_handler_make_dummy(&entry->coder.handler->dest_pt, mp->media);
mp->run_func = media_player_read_decoded_packet;
mp->next_run = rtpe_now;
mp->coder.duration = entry->coder.duration;
// if we played anything before, scale our sync TS according to the time
// that has passed
if (mp->sync_ts_tv.tv_sec) {
long long ts_diff_us = timeval_diff(&rtpe_now, &mp->sync_ts_tv);
mp->buffer_ts += fraction_divl(ts_diff_us * dst_pt->clock_rate / 1000000, &dst_pt->codec_def->default_clockrate_fact);
}
mp->sync_ts_tv = rtpe_now;
mp->repeat = repeat;
media_player_read_decoded_packet(mp);
}
static void cache_packet_free(void *ptr) {
struct media_player_cache_packet *p = ptr;
g_free(p->buf);
g_slice_free1(sizeof(*p), p);
}
// returns: true = entry exists, decoding handled separately, use entry for playback
// false = no entry exists, OR entry is a new one, proceed to open decoder, then call _play_start
static bool media_player_cache_get_entry(struct media_player *mp,
const struct rtp_payload_type *dst_pt, long long repeat)
{
if (!rtpe_config.player_cache)
return false;
if (mp->cache_index.type <= 0)
return false;
struct media_player_cache_index lookup;
lookup.index = mp->cache_index;
lookup.dst_pt = *dst_pt;
mutex_lock(&media_player_cache_lock);
struct media_player_cache_entry *entry = mp->cache_entry
= g_hash_table_lookup(media_player_cache, &lookup);
bool ret = true; // entry exists, use cached data
if (entry) {
media_player_cached_reader_start(mp, dst_pt, repeat);
goto out;
}
ret = false; // new entry, open decoder, then call media_player_play_start
// initialise object
struct media_player_cache_index *ins_key = g_slice_alloc(sizeof(*ins_key));
*ins_key = lookup;
str_init_dup_str(&ins_key->index.file, &lookup.index.file);
codec_init_payload_type(&ins_key->dst_pt, MT_UNKNOWN); // duplicate contents
entry = mp->cache_entry = g_slice_alloc0(sizeof(*entry));
mutex_init(&entry->lock);
cond_init(&entry->cond);
entry->packets = g_ptr_array_new_full(64, cache_packet_free);
switch (lookup.index.type) {
case MP_DB:
entry->info_str = g_strdup_printf("DB media file #%llu", lookup.index.db_id);
break;
case MP_FILE:
entry->info_str = g_strdup_printf("media file '" STR_FORMAT "'",
STR_FMT(&lookup.index.file));
break;
case MP_BLOB:
entry->info_str = g_strdup_printf("binary media blob");
break;
default:;
}
g_hash_table_insert(media_player_cache, ins_key, entry);
out:
mutex_unlock(&media_player_cache_lock);
return ret;
}
static void media_player_cache_packet(struct media_player_cache_entry *entry, char *buf, size_t len,
long long us_dur, unsigned long long pts)
{
// synthesise fake RTP header and media_packet context
struct rtp_header rtp = {
.timestamp = pts, // taken verbatim by handler_func_playback w/o byte swap
};
struct media_packet packet = {
.rtp = &rtp,
.cache_entry = entry,
};
str_init_len(&packet.raw, buf, len);
packet.payload = packet.raw;
entry->coder.handler->handler_func(entry->coder.handler, &packet);
}
static void media_player_cache_entry_decoder_thread(void *p) {
struct media_player_cache_entry *entry = p;
ilog(LOG_DEBUG, "Launching media decoder thread for %s", entry->info_str);
while (true) {
// let us be cancelled
thread_cancel_enable();
pthread_testcancel();
thread_cancel_disable();
int ret = av_read_frame(entry->coder.fmtctx, entry->coder.pkt);
if (ret < 0) {
if (ret != AVERROR_EOF)
ilog(LOG_ERR, "Error while reading from media stream");
break;
}
media_player_coder_add_packet(&entry->coder, (void *) media_player_cache_packet, entry);
av_packet_unref(entry->coder.pkt);
}
mutex_lock(&entry->lock);
entry->finished = true;
cond_broadcast(&entry->cond);
mutex_unlock(&entry->lock);
ilog(LOG_DEBUG, "Decoder thread for %s finished", entry->info_str);
}
static void packet_encoded_cache(encoder_t *enc, struct codec_ssrc_handler *ch, struct media_packet *mp,
str *s, char *buf, unsigned int pkt_len)
{
struct media_player_cache_entry *entry = mp->cache_entry;
struct media_player_cache_packet *ep = g_slice_alloc0(sizeof(*ep));
*ep = (__typeof__(*ep)) {
.buf = buf,
.s = *s,
.pts = enc->avpkt->pts,
.duration_ts = enc->avpkt->duration,
.duration = (long long) enc->avpkt->duration * 1000000LL
/ entry->coder.handler->dest_pt.clock_rate,
};
mutex_lock(&entry->lock);
g_ptr_array_add(entry->packets, ep);
cond_broadcast(&entry->cond);
mutex_unlock(&entry->lock);
}
static int media_player_packet_cache(encoder_t *enc, void *u1, void *u2) {
struct codec_ssrc_handler *ch = u1;
struct media_packet *mp = u2;
packet_encoded_packetize(enc, ch, mp, packet_encoded_cache);
return 0;
}
// called from media_player_play_start, which is called after media_player_cache_get_entry returned true.
// this can mean that either we don't have a cache entry and should continue normally, or if we
// do have a cache entry, initialise it, set up the thread, take over decoding, and then proceed as a
// media player consuming the data from the decoder thread.
// returns: false = continue normally decode in-thread, true = take data from other thread
static bool media_player_cache_entry_init(struct media_player *mp, const struct rtp_payload_type *dst_pt,
long long repeat)
{
struct media_player_cache_entry *entry = mp->cache_entry;
if (!entry)
return false;
// steal coder data
entry->coder = mp->coder;
ZERO(mp->coder);
mp->coder.duration = entry->coder.duration; // retain this for reporting
entry->coder.handler->packet_encoded = media_player_packet_cache;
// use low priority (10 nice)
thread_create_detach_prio(media_player_cache_entry_decoder_thread, entry, NULL, 10, "mp decoder");
media_player_cached_reader_start(mp, dst_pt, repeat);
return true;
}
// find suitable output payload type
static struct rtp_payload_type *media_player_get_dst_pt(struct media_player *mp) {
struct rtp_payload_type *dst_pt = NULL;
@ -541,6 +863,9 @@ static void media_player_play_start(struct media_player *mp, const struct rtp_pa
if (__ensure_codec_handler(mp, dst_pt))
return;
if (media_player_cache_entry_init(mp, dst_pt, repeat))
return;
mp->next_run = rtpe_now;
// give ourselves a bit of a head start with decoding
timeval_add_usec(&mp->next_run, -50000);
@ -563,6 +888,12 @@ int media_player_play_file(struct media_player *mp, const str *file, long long r
if (!dst_pt)
return -1;
mp->cache_index.type = MP_FILE;
str_init_dup_str(&mp->cache_index.file, file);
if (media_player_cache_get_entry(mp, dst_pt, repeat))
return 0;
char file_s[PATH_MAX];
snprintf(file_s, sizeof(file_s), STR_FORMAT, STR_FMT(file));
@ -574,7 +905,6 @@ int media_player_play_file(struct media_player *mp, const str *file, long long r
media_player_play_start(mp, dst_pt, repeat, start_pos);
return 0;
#else
return -1;
@ -625,13 +955,14 @@ static int64_t __mp_avio_seek(void *opaque, int64_t offset, int whence) {
return __mp_avio_seek_set(c, ((int64_t) c->blob->len) + offset);
return AVERROR(EINVAL);
}
#endif
// call->master_lock held in W
int media_player_play_blob(struct media_player *mp, const str *blob, long long repeat, long long start_pos) {
#ifdef WITH_TRANSCODING
static int media_player_play_blob_id(struct media_player *mp, const str *blob, long long repeat,
long long start_pos, long long db_id)
{
const char *err;
int av_ret = 0;
@ -639,6 +970,21 @@ int media_player_play_blob(struct media_player *mp, const str *blob, long long r
if (!dst_pt)
return -1;
if (db_id >= 0) {
mp->cache_index.type = MP_DB;
mp->cache_index.db_id = db_id;
if (media_player_cache_get_entry(mp, dst_pt, repeat))
return 0;
}
else {
mp->cache_index.type = MP_BLOB;
str_init_dup_str(&mp->cache_index.file, blob);
if (media_player_cache_get_entry(mp, dst_pt, repeat))
return 0;
}
mp->coder.blob = str_dup(blob);
err = "out of memory";
if (!mp->coder.blob)
@ -677,8 +1023,18 @@ err:
ilog(LOG_ERR, "Failed to start media playback from memory: %s", err);
if (av_ret)
ilog(LOG_ERR, "Error returned from libav: %s", av_error(av_ret));
return -1;
}
#endif
// call->master_lock held in W
int media_player_play_blob(struct media_player *mp, const str *blob, long long repeat, long long start_pos) {
#ifdef WITH_TRANSCODING
return media_player_play_blob_id(mp, blob, repeat, start_pos, -1);
#else
return -1;
#endif
}
@ -753,7 +1109,7 @@ success:;
str blob;
str_init_len(&blob, row[0], lengths[0]);
int ret = media_player_play_blob(mp, &blob, repeat, start_pos);
int ret = media_player_play_blob_id(mp, &blob, repeat, start_pos, id);
mysql_free_result(res);
@ -786,11 +1142,71 @@ static void media_player_run(void *ptr) {
log_info_pop();
}
static unsigned int media_player_cache_entry_hash(const void *p) {
const struct media_player_cache_index *i = p;
unsigned int ret;
switch (i->index.type) {
case MP_DB:
ret = i->index.db_id;
break;
case MP_FILE:
case MP_BLOB:
ret = str_hash(&i->index.file);
break;
default:
abort();
}
ret ^= str_hash(&i->dst_pt.encoding_with_full_params);
ret ^= i->index.type;
return ret;
}
static gboolean media_player_cache_entry_eq(const void *A, const void *B) {
const struct media_player_cache_index *a = A, *b = B;
if (a->index.type != b->index.type)
return FALSE;
switch (a->index.type) {
case MP_DB:
if (a->index.db_id != b->index.db_id)
return FALSE;
break;
case MP_FILE:
case MP_BLOB:
if (!str_equal(&a->index.file, &b->index.file))
return FALSE;
break;
default:
abort();
}
return str_equal(&a->dst_pt.encoding_with_full_params, &b->dst_pt.encoding_with_full_params);
}
static void media_player_cache_index_free(void *p) {
struct media_player_cache_index *i = p;
g_free(i->index.file.s);
payload_type_clear(&i->dst_pt);
g_slice_free1(sizeof(*i), i);
}
static void media_player_cache_entry_free(void *p) {
struct media_player_cache_entry *e = p;
g_ptr_array_free(e->packets, TRUE);
mutex_destroy(&e->lock);
g_free(e->info_str);
media_player_coder_shutdown(&e->coder);
av_packet_free(&e->coder.pkt);
g_slice_free1(sizeof(*e), e);
}
#endif
void media_player_init(void) {
#ifdef WITH_TRANSCODING
if (rtpe_config.player_cache) {
media_player_cache = g_hash_table_new_full(media_player_cache_entry_hash,
media_player_cache_entry_eq, media_player_cache_index_free,
media_player_cache_entry_free);
mutex_init(&media_player_cache_lock);
}
timerthread_init(&media_player_thread, media_player_run);
#endif
timerthread_init(&send_timer_thread, timerthread_queue_run);
@ -799,6 +1215,11 @@ void media_player_init(void) {
void media_player_free(void) {
#ifdef WITH_TRANSCODING
timerthread_free(&media_player_thread);
if (media_player_cache) {
mutex_destroy(&media_player_cache_lock);
g_hash_table_destroy(media_player_cache);
}
#endif
timerthread_free(&send_timer_thread);
}

@ -930,6 +930,28 @@ directly as payload of B<CN> packets sent by B<rtpengine>.
The default values are 32 (-32 dBov) for the noise level and no spectral
information.
=item B<--player-cache>
Enable caching of encoded media packets for media player. This is applicable
for media playback initiated through the I<play media> command. When enabled
B<rtpengine> will not simply decode given media files and then encode the media
to RTP on demand and on the fly, but will rather decode and encode each media
file in full the first time playback is requested, and then cache the resulting
RTP packets in memory. This is done once for each media file and for each
output RTP codec requested.
Caching is done based on unique file name (with no consideration given to
different file names that may point to the same file), or integer index for
media files played from database. No verification of changing content of files
or database entries is done. Media files provided as binary I<blob> are also
cached, although in this case a hash over the entire media file must be
performed, therefore this usage is not recommended.
It's not possible to choose a different I<start-pos> for playback with this
option enabled.
RTP data is cached and retained in memory for the lifetime of the process.
=item B<--poller-per-thread>
Enable 'poller per thread' functionality: for every worker thread (see the

@ -99,6 +99,7 @@ struct codec_handler *codec_handler_get(struct call_media *, int payload_type, s
void codec_handlers_free(struct call_media *);
struct codec_handler *codec_handler_make_playback(const struct rtp_payload_type *src_pt,
const struct rtp_payload_type *dst_pt, unsigned long ts, struct call_media *);
struct codec_handler *codec_handler_make_dummy(const struct rtp_payload_type *dst_pt, struct call_media *media);
void codec_calc_jitter(struct ssrc_ctx *, unsigned long ts, unsigned int clockrate, const struct timeval *);
void codec_update_all_handlers(struct call_monologue *ml);

@ -131,6 +131,7 @@ struct rtpengine_config {
double silence_detect_double;
uint32_t silence_detect_int;
str cn_payload;
int player_cache;
char *software_id;
int poller_per_thread;
char *mqtt_host;

@ -25,6 +25,15 @@ struct rtp_payload_type;
#include <libavcodec/avcodec.h>
struct media_player_cache_entry;
struct media_player_content_index {
enum { MP_OTHER = 0, MP_FILE = 1, MP_DB, MP_BLOB } type;
long long db_id;
str file; // file name or binary blob
};
typedef void (*media_player_run_func)(struct media_player *);
@ -53,9 +62,13 @@ struct media_player {
unsigned long repeat;
struct media_player_coder coder;
struct media_player_content_index cache_index;
struct media_player_cache_entry *cache_entry;
unsigned int cache_read_idx;
struct ssrc_ctx *ssrc_out;
unsigned long seq;
unsigned long buffer_ts;
unsigned long sync_ts;
struct timeval sync_ts_tv;
long long last_frame_ts;

@ -23,6 +23,7 @@ struct rtpengine_srtp;
struct jb_packet;
struct stream_fd;
struct poller;
struct media_player_cache_entry;
typedef int rtcp_filter_func(struct media_packet *, GQueue *);
typedef int (*rewrite_func)(str *, struct packet_stream *, struct stream_fd *, const endpoint_t *,
@ -188,6 +189,7 @@ struct media_packet {
struct call_media *media; // stream->media
struct call_media *media_out; // output media
struct sink_handler sink;
struct media_player_cache_entry *cache_entry;
struct rtp_header *rtp;
struct rtcp_packet *rtcp;

@ -91,7 +91,7 @@ include ../lib/common.Makefile
.PHONY: all-tests unit-tests daemon-tests daemon-tests \
daemon-tests-main daemon-tests-jb daemon-tests-dtx daemon-tests-dtx-cn daemon-tests-pubsub \
daemon-tests-intfs daemon-tests-stats daemon-tests-delay-buffer daemon-tests-delay-timing \
daemon-tests-evs
daemon-tests-evs daemon-tests-player-cache
TESTS= test-bitstr aes-crypt aead-aes-crypt test-const_str_hash.strhash
ifeq ($(with_transcoding),yes)
@ -127,7 +127,7 @@ unit-tests: $(TESTS)
daemon-tests: daemon-tests-main daemon-tests-jb daemon-tests-pubsub daemon-tests-websocket \
daemon-tests-evs \
daemon-tests-intfs daemon-tests-stats # daemon-tests-delay-buffer daemon-tests-delay-timing
daemon-tests-intfs daemon-tests-stats daemon-tests-player-cache
daemon-test-deps: tests-preload.so
$(MAKE) -C ../daemon
@ -220,6 +220,14 @@ daemon-tests-evs: daemon-test-deps
test "$$(ls fake-$@-sockets)" = ""
rmdir fake-$@-sockets
daemon-tests-player-cache: daemon-test-deps
rm -rf fake-$@-sockets
mkdir fake-$@-sockets
LD_PRELOAD=../t/tests-preload.so RTPE_BIN=../daemon/rtpengine TEST_SOCKET_PATH=./fake-$@-sockets \
perl -I../perl auto-daemon-tests-player-cache.pl
test "$$(ls fake-$@-sockets)" = ""
rmdir fake-$@-sockets
test-bitstr: test-bitstr.o
spandsp_send_fax_pcm: spandsp_send_fax_pcm.o

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save