From 64e6bf440b26e0638d34da40b32e0282c421fe7a Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 26 May 2020 12:26:49 -0400 Subject: [PATCH] TT#82203 add detailed transcoding stats Change-Id: Ib04767c38b00b17ef5844a9f6649e009270f8f82 --- daemon/cli.c | 28 +++++++++++++++++++++ daemon/codec.c | 58 +++++++++++++++++++++++++++++++++++++++++++- daemon/graphite.c | 25 +++++++++++++++++++ daemon/statistics.c | 7 ++++++ include/codec.h | 4 +++ include/statistics.h | 15 ++++++++++++ t/transcode-test.c | 1 + 7 files changed, 137 insertions(+), 1 deletion(-) diff --git a/daemon/cli.c b/daemon/cli.c index 6233153e0..af05d7bc5 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -89,6 +89,7 @@ static void cli_incoming_list_rediscmdtimeout(str *instr, struct streambuf *repl static void cli_incoming_list_controltos(str *instr, struct streambuf *replybuffer); static void cli_incoming_list_interfaces(str *instr, struct streambuf *replybuffer); static void cli_incoming_list_jsonstats(str *instr, struct streambuf *replybuffer); +static void cli_incoming_list_transcoders(str *instr, struct streambuf *replybuffer); static const cli_handler_t cli_top_handlers[] = { { "list", cli_incoming_list }, @@ -141,6 +142,7 @@ static const cli_handler_t cli_list_handlers[] = { { "controltos", cli_incoming_list_controltos }, { "interfaces", cli_incoming_list_interfaces }, { "jsonstats", cli_incoming_list_jsonstats }, + { "transcoders", cli_incoming_list_transcoders }, { NULL, }, }; @@ -1532,6 +1534,32 @@ static void cli_incoming_list_jsonstats(str *instr, struct streambuf *replybuffe g_list_free(list); } +static void cli_incoming_list_transcoders(str *instr, struct streambuf *replybuffer) { + mutex_lock(&rtpe_codec_stats_lock); + + GList *chains = g_hash_table_get_keys(rtpe_codec_stats); + if (!chains) + streambuf_printf(replybuffer, "No stats entries\n"); + else { + int last_tv_sec = rtpe_now.tv_sec - 1; + unsigned int idx = last_tv_sec & 1; + for (GList *l = chains; l; l = l->next) { + char *chain = l->data; + struct codec_stats *stats_entry = g_hash_table_lookup(rtpe_codec_stats, chain); + streambuf_printf(replybuffer, "%s: %i transcoders\n", chain, g_atomic_int_get(&stats_entry->num_transcoders)); + if (g_atomic_int_get(&stats_entry->last_tv_sec[idx]) != last_tv_sec) + continue; + streambuf_printf(replybuffer, " " UINT64F " packets/s\n", atomic64_get(&stats_entry->packets_input[idx])); + streambuf_printf(replybuffer, " " UINT64F " bytes/s\n", atomic64_get(&stats_entry->bytes_input[idx])); + streambuf_printf(replybuffer, " " UINT64F " samples/s\n", atomic64_get(&stats_entry->pcm_samples[idx])); + } + } + + mutex_unlock(&rtpe_codec_stats_lock); + + g_list_free(chains); +} + static void cli_incoming_list_controltos(str *instr, struct streambuf *replybuffer) { rwlock_lock_r(&rtpe_config.config_lock); streambuf_printf(replybuffer, "%d\n", rtpe_config.control_tos); diff --git a/daemon/codec.c b/daemon/codec.c index 02457e66a..b62eb87be 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -139,6 +139,12 @@ static void __handler_shutdown(struct codec_handler *handler) { handler->output_handler = handler; // reset to default handler->dtmf_payload_type = -1; handler->pcm_dtmf_detect = 0; + + if (handler->stats_entry) { + g_atomic_int_add(&handler->stats_entry->num_transcoders, -1); + handler->stats_entry = NULL; + free(handler->stats_chain); + } } static void __codec_handler_free(void *pp) { @@ -240,6 +246,30 @@ reset: handler->ssrc_hash = create_ssrc_hash_full(__ssrc_handler_transcode_new, handler); + // stats entry + if (asprintf(&handler->stats_chain, STR_FORMAT " -> " STR_FORMAT, + STR_FMT(&handler->source_pt.encoding_with_params), + STR_FMT(&dest->encoding_with_params)) < 0) + ilog(LOG_ERR, "asprintf error"); + else { + mutex_lock(&rtpe_codec_stats_lock); + struct codec_stats *stats_entry = + g_hash_table_lookup(rtpe_codec_stats, handler->stats_chain); + if (!stats_entry) { + stats_entry = g_slice_alloc0(sizeof(*stats_entry)); + stats_entry->chain = strdup(handler->stats_chain); + g_hash_table_insert(rtpe_codec_stats, stats_entry->chain, stats_entry); + if (asprintf(&stats_entry->chain_brief, STR_FORMAT "_" STR_FORMAT, + STR_FMT(&handler->source_pt.encoding_with_params), + STR_FMT(&dest->encoding_with_params)) < 0) + stats_entry->chain_brief = "xxx"; + } + handler->stats_entry = stats_entry; + mutex_unlock(&rtpe_codec_stats_lock); + + g_atomic_int_inc(&stats_entry->num_transcoders); + } + check_output:; // check if we have multiple decoders transcoding to the same output PT struct codec_handler *output_handler = NULL; @@ -1903,6 +1933,13 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v ch = new_ch; } + struct codec_handler *h = ch->handler; + if (h->stats_entry) { + int idx = rtpe_now.tv_sec & 1; + atomic64_add(&h->stats_entry->pcm_samples[idx], frame->nb_samples); + atomic64_add(&h->stats_entry->pcm_samples[2], frame->nb_samples); + } + if (ch->skip_pts) { if (frame->nb_samples <= 0) ; @@ -1923,7 +1960,7 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v __dtmf_detect(ch, frame); - input_func(ch->encoder, frame, ch->handler->packet_encoded, ch, mp); + input_func(ch->encoder, frame, h->packet_encoded, ch, mp); discard: av_frame_free(&frame); @@ -1962,6 +1999,25 @@ static int handler_func_transcode(struct codec_handler *h, struct media_packet * ntohl(mp->rtp->ssrc), mp->rtp->m_pt, ntohs(mp->rtp->seq_num), ntohl(mp->rtp->timestamp), mp->payload.len); + if (h->stats_entry) { + unsigned int idx = rtpe_now.tv_sec & 1; + int last_tv_sec = g_atomic_int_get(&h->stats_entry->last_tv_sec[idx]); + if (last_tv_sec != (int) rtpe_now.tv_sec) { + if (g_atomic_int_compare_and_exchange(&h->stats_entry->last_tv_sec[idx], + last_tv_sec, rtpe_now.tv_sec)) + { + // new second - zero out stats. slight race condition here + atomic64_set(&h->stats_entry->packets_input[idx], 0); + atomic64_set(&h->stats_entry->bytes_input[idx], 0); + atomic64_set(&h->stats_entry->pcm_samples[idx], 0); + } + } + atomic64_inc(&h->stats_entry->packets_input[idx]); + atomic64_add(&h->stats_entry->bytes_input[idx], mp->payload.len); + atomic64_inc(&h->stats_entry->packets_input[2]); + atomic64_add(&h->stats_entry->bytes_input[2], mp->payload.len); + } + struct transcode_packet *packet = g_slice_alloc0(sizeof(*packet)); packet->func = packet_decode; packet->rtp = *mp->rtp; diff --git a/daemon/graphite.c b/daemon/graphite.c index d4fca75eb..a72811395 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -228,6 +228,31 @@ static int send_graphite_data(struct totalstats *sent_data) { num_ports - g_atomic_int_get(&lif->spec->port_pool.free_ports)); } + mutex_lock(&rtpe_codec_stats_lock); + + GList *chains = g_hash_table_get_keys(rtpe_codec_stats); + int last_tv_sec = rtpe_now.tv_sec - 1; + unsigned int idx = last_tv_sec & 1; + for (GList *l = chains; l; l = l->next) { + char *chain = l->data; + struct codec_stats *stats_entry = g_hash_table_lookup(rtpe_codec_stats, chain); + GPF("transcoder_%s %i", stats_entry->chain_brief, + g_atomic_int_get(&stats_entry->num_transcoders)); + if (g_atomic_int_get(&stats_entry->last_tv_sec[idx]) != last_tv_sec) + continue; + GPF("transcoder_%s_packets %llu", stats_entry->chain_brief, + (unsigned long long) atomic64_get(&stats_entry->packets_input[idx])); + GPF("transcoder_%s_bytes %llu", stats_entry->chain_brief, + (unsigned long long) atomic64_get(&stats_entry->bytes_input[idx])); + GPF("transcoder_%s_samples %llu", stats_entry->chain_brief, + (unsigned long long) atomic64_get(&stats_entry->pcm_samples[idx])); + } + + mutex_unlock(&rtpe_codec_stats_lock); + + g_list_free(chains); + + ilog(LOG_DEBUG, "min_sessions:%llu max_sessions:%llu, call_dur_per_interval:%llu.%06llu at time %llu\n", (unsigned long long) ts->managed_sess_min, (unsigned long long) ts->managed_sess_max, diff --git a/daemon/statistics.c b/daemon/statistics.c index 0f50e2a18..7caa1df6c 100644 --- a/daemon/statistics.c +++ b/daemon/statistics.c @@ -10,6 +10,10 @@ mutex_t rtpe_totalstats_lastinterval_lock; struct totalstats rtpe_totalstats_lastinterval; +mutex_t rtpe_codec_stats_lock; +GHashTable *rtpe_codec_stats; + + static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) { struct timeval dp, oa; @@ -242,4 +246,7 @@ void statistics_init() { mutex_init(&rtpe_totalstats_interval.offers_ps.lock); mutex_init(&rtpe_totalstats_interval.answers_ps.lock); mutex_init(&rtpe_totalstats_interval.deletes_ps.lock); + + mutex_init(&rtpe_codec_stats_lock); + rtpe_codec_stats = g_hash_table_new(g_str_hash, g_str_equal); } diff --git a/include/codec.h b/include/codec.h index d97a7ca8c..054f0a2f0 100644 --- a/include/codec.h +++ b/include/codec.h @@ -43,6 +43,10 @@ struct codec_handler { // for media playback struct codec_ssrc_handler *ssrc_handler; + + // stats entry + char *stats_chain; + struct codec_stats *stats_entry; }; struct codec_packet { diff --git a/include/statistics.h b/include/statistics.h index a2a4b3ab2..bca5e6be3 100644 --- a/include/statistics.h +++ b/include/statistics.h @@ -80,6 +80,18 @@ struct rtp_stats { atomic64 in_tos_tclass; }; +struct codec_stats { + char *chain; + char *chain_brief; + int num_transcoders; + // 3 entries: [0] and [1] for per-second stats, [2] for total count + // last_tv_sec keeps track of rollovers + int last_tv_sec[2]; + atomic64 packets_input[3]; + atomic64 bytes_input[3]; + atomic64 pcm_samples[3]; +}; + struct call_stats { time_t last_packet; @@ -91,6 +103,9 @@ extern struct totalstats rtpe_totalstats_interval; extern mutex_t rtpe_totalstats_lastinterval_lock; extern struct totalstats rtpe_totalstats_lastinterval; +extern mutex_t rtpe_codec_stats_lock; +extern GHashTable *rtpe_codec_stats; + void statistics_update_oneway(struct call *); void statistics_update_foreignown_dec(struct call *); void statistics_update_foreignown_inc(struct call* c); diff --git a/t/transcode-test.c b/t/transcode-test.c index 120920bba..7c1473f8f 100644 --- a/t/transcode-test.c +++ b/t/transcode-test.c @@ -317,6 +317,7 @@ static void dtmf(const char *s) { int main(void) { codeclib_init(0); srandom(time(NULL)); + statistics_init(); // plain start();