TT#82203 add detailed transcoding stats

Change-Id: Ib04767c38b00b17ef5844a9f6649e009270f8f82
changes/68/40268/9
Richard Fuchs 5 years ago
parent 5f3f203fb8
commit 64e6bf440b

@ -89,6 +89,7 @@ static void cli_incoming_list_rediscmdtimeout(str *instr, struct streambuf *repl
static void cli_incoming_list_controltos(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_interfaces(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_jsonstats(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_transcoders(str *instr, struct streambuf *replybuffer);
static const cli_handler_t cli_top_handlers[] = {
{ "list", cli_incoming_list },
@ -141,6 +142,7 @@ static const cli_handler_t cli_list_handlers[] = {
{ "controltos", cli_incoming_list_controltos },
{ "interfaces", cli_incoming_list_interfaces },
{ "jsonstats", cli_incoming_list_jsonstats },
{ "transcoders", cli_incoming_list_transcoders },
{ NULL, },
};
@ -1532,6 +1534,32 @@ static void cli_incoming_list_jsonstats(str *instr, struct streambuf *replybuffe
g_list_free(list);
}
static void cli_incoming_list_transcoders(str *instr, struct streambuf *replybuffer) {
mutex_lock(&rtpe_codec_stats_lock);
GList *chains = g_hash_table_get_keys(rtpe_codec_stats);
if (!chains)
streambuf_printf(replybuffer, "No stats entries\n");
else {
int last_tv_sec = rtpe_now.tv_sec - 1;
unsigned int idx = last_tv_sec & 1;
for (GList *l = chains; l; l = l->next) {
char *chain = l->data;
struct codec_stats *stats_entry = g_hash_table_lookup(rtpe_codec_stats, chain);
streambuf_printf(replybuffer, "%s: %i transcoders\n", chain, g_atomic_int_get(&stats_entry->num_transcoders));
if (g_atomic_int_get(&stats_entry->last_tv_sec[idx]) != last_tv_sec)
continue;
streambuf_printf(replybuffer, " " UINT64F " packets/s\n", atomic64_get(&stats_entry->packets_input[idx]));
streambuf_printf(replybuffer, " " UINT64F " bytes/s\n", atomic64_get(&stats_entry->bytes_input[idx]));
streambuf_printf(replybuffer, " " UINT64F " samples/s\n", atomic64_get(&stats_entry->pcm_samples[idx]));
}
}
mutex_unlock(&rtpe_codec_stats_lock);
g_list_free(chains);
}
static void cli_incoming_list_controltos(str *instr, struct streambuf *replybuffer) {
rwlock_lock_r(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "%d\n", rtpe_config.control_tos);

@ -139,6 +139,12 @@ static void __handler_shutdown(struct codec_handler *handler) {
handler->output_handler = handler; // reset to default
handler->dtmf_payload_type = -1;
handler->pcm_dtmf_detect = 0;
if (handler->stats_entry) {
g_atomic_int_add(&handler->stats_entry->num_transcoders, -1);
handler->stats_entry = NULL;
free(handler->stats_chain);
}
}
static void __codec_handler_free(void *pp) {
@ -240,6 +246,30 @@ reset:
handler->ssrc_hash = create_ssrc_hash_full(__ssrc_handler_transcode_new, handler);
// stats entry
if (asprintf(&handler->stats_chain, STR_FORMAT " -> " STR_FORMAT,
STR_FMT(&handler->source_pt.encoding_with_params),
STR_FMT(&dest->encoding_with_params)) < 0)
ilog(LOG_ERR, "asprintf error");
else {
mutex_lock(&rtpe_codec_stats_lock);
struct codec_stats *stats_entry =
g_hash_table_lookup(rtpe_codec_stats, handler->stats_chain);
if (!stats_entry) {
stats_entry = g_slice_alloc0(sizeof(*stats_entry));
stats_entry->chain = strdup(handler->stats_chain);
g_hash_table_insert(rtpe_codec_stats, stats_entry->chain, stats_entry);
if (asprintf(&stats_entry->chain_brief, STR_FORMAT "_" STR_FORMAT,
STR_FMT(&handler->source_pt.encoding_with_params),
STR_FMT(&dest->encoding_with_params)) < 0)
stats_entry->chain_brief = "xxx";
}
handler->stats_entry = stats_entry;
mutex_unlock(&rtpe_codec_stats_lock);
g_atomic_int_inc(&stats_entry->num_transcoders);
}
check_output:;
// check if we have multiple decoders transcoding to the same output PT
struct codec_handler *output_handler = NULL;
@ -1903,6 +1933,13 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v
ch = new_ch;
}
struct codec_handler *h = ch->handler;
if (h->stats_entry) {
int idx = rtpe_now.tv_sec & 1;
atomic64_add(&h->stats_entry->pcm_samples[idx], frame->nb_samples);
atomic64_add(&h->stats_entry->pcm_samples[2], frame->nb_samples);
}
if (ch->skip_pts) {
if (frame->nb_samples <= 0)
;
@ -1923,7 +1960,7 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v
__dtmf_detect(ch, frame);
input_func(ch->encoder, frame, ch->handler->packet_encoded, ch, mp);
input_func(ch->encoder, frame, h->packet_encoded, ch, mp);
discard:
av_frame_free(&frame);
@ -1962,6 +1999,25 @@ static int handler_func_transcode(struct codec_handler *h, struct media_packet *
ntohl(mp->rtp->ssrc), mp->rtp->m_pt, ntohs(mp->rtp->seq_num),
ntohl(mp->rtp->timestamp), mp->payload.len);
if (h->stats_entry) {
unsigned int idx = rtpe_now.tv_sec & 1;
int last_tv_sec = g_atomic_int_get(&h->stats_entry->last_tv_sec[idx]);
if (last_tv_sec != (int) rtpe_now.tv_sec) {
if (g_atomic_int_compare_and_exchange(&h->stats_entry->last_tv_sec[idx],
last_tv_sec, rtpe_now.tv_sec))
{
// new second - zero out stats. slight race condition here
atomic64_set(&h->stats_entry->packets_input[idx], 0);
atomic64_set(&h->stats_entry->bytes_input[idx], 0);
atomic64_set(&h->stats_entry->pcm_samples[idx], 0);
}
}
atomic64_inc(&h->stats_entry->packets_input[idx]);
atomic64_add(&h->stats_entry->bytes_input[idx], mp->payload.len);
atomic64_inc(&h->stats_entry->packets_input[2]);
atomic64_add(&h->stats_entry->bytes_input[2], mp->payload.len);
}
struct transcode_packet *packet = g_slice_alloc0(sizeof(*packet));
packet->func = packet_decode;
packet->rtp = *mp->rtp;

@ -228,6 +228,31 @@ static int send_graphite_data(struct totalstats *sent_data) {
num_ports - g_atomic_int_get(&lif->spec->port_pool.free_ports));
}
mutex_lock(&rtpe_codec_stats_lock);
GList *chains = g_hash_table_get_keys(rtpe_codec_stats);
int last_tv_sec = rtpe_now.tv_sec - 1;
unsigned int idx = last_tv_sec & 1;
for (GList *l = chains; l; l = l->next) {
char *chain = l->data;
struct codec_stats *stats_entry = g_hash_table_lookup(rtpe_codec_stats, chain);
GPF("transcoder_%s %i", stats_entry->chain_brief,
g_atomic_int_get(&stats_entry->num_transcoders));
if (g_atomic_int_get(&stats_entry->last_tv_sec[idx]) != last_tv_sec)
continue;
GPF("transcoder_%s_packets %llu", stats_entry->chain_brief,
(unsigned long long) atomic64_get(&stats_entry->packets_input[idx]));
GPF("transcoder_%s_bytes %llu", stats_entry->chain_brief,
(unsigned long long) atomic64_get(&stats_entry->bytes_input[idx]));
GPF("transcoder_%s_samples %llu", stats_entry->chain_brief,
(unsigned long long) atomic64_get(&stats_entry->pcm_samples[idx]));
}
mutex_unlock(&rtpe_codec_stats_lock);
g_list_free(chains);
ilog(LOG_DEBUG, "min_sessions:%llu max_sessions:%llu, call_dur_per_interval:%llu.%06llu at time %llu\n",
(unsigned long long) ts->managed_sess_min,
(unsigned long long) ts->managed_sess_max,

@ -10,6 +10,10 @@ mutex_t rtpe_totalstats_lastinterval_lock;
struct totalstats rtpe_totalstats_lastinterval;
mutex_t rtpe_codec_stats_lock;
GHashTable *rtpe_codec_stats;
static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) {
struct timeval dp, oa;
@ -242,4 +246,7 @@ void statistics_init() {
mutex_init(&rtpe_totalstats_interval.offers_ps.lock);
mutex_init(&rtpe_totalstats_interval.answers_ps.lock);
mutex_init(&rtpe_totalstats_interval.deletes_ps.lock);
mutex_init(&rtpe_codec_stats_lock);
rtpe_codec_stats = g_hash_table_new(g_str_hash, g_str_equal);
}

@ -43,6 +43,10 @@ struct codec_handler {
// for media playback
struct codec_ssrc_handler *ssrc_handler;
// stats entry
char *stats_chain;
struct codec_stats *stats_entry;
};
struct codec_packet {

@ -80,6 +80,18 @@ struct rtp_stats {
atomic64 in_tos_tclass;
};
struct codec_stats {
char *chain;
char *chain_brief;
int num_transcoders;
// 3 entries: [0] and [1] for per-second stats, [2] for total count
// last_tv_sec keeps track of rollovers
int last_tv_sec[2];
atomic64 packets_input[3];
atomic64 bytes_input[3];
atomic64 pcm_samples[3];
};
struct call_stats {
time_t last_packet;
@ -91,6 +103,9 @@ extern struct totalstats rtpe_totalstats_interval;
extern mutex_t rtpe_totalstats_lastinterval_lock;
extern struct totalstats rtpe_totalstats_lastinterval;
extern mutex_t rtpe_codec_stats_lock;
extern GHashTable *rtpe_codec_stats;
void statistics_update_oneway(struct call *);
void statistics_update_foreignown_dec(struct call *);
void statistics_update_foreignown_inc(struct call* c);

@ -317,6 +317,7 @@ static void dtmf(const char *s) {
int main(void) {
codeclib_init(0);
srandom(time(NULL));
statistics_init();
// plain
start();

Loading…
Cancel
Save