Implemented delay measurement of RTP packets in kernel.

pull/101/head
Frederic-Philippe Metz 10 years ago
parent e0f9c59274
commit 6d33ef76b0

@ -63,7 +63,7 @@ endif
SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \
bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c measuredelay.c
crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c
OBJS= $(SRCS:.c=.o)

@ -32,7 +32,7 @@
#include "rtcp.h"
#include "rtp.h"
#include "call_interfaces.h"
#include "rtpengine_config.h"
@ -1344,10 +1344,13 @@ static void callmaster_timer(void *ptr) {
DS(bytes);
DS(errors);
#if (RE_HAS_MEASUREDELAY)
mutex_lock(&m->statspslock);
ps->stats.start = m->statsps.start = ke->stats.start;
ps->stats.end = m->statsps.end = ke->stats.end;
ps->stats.delay_min = m->statsps.delay_min = ke->stats.delay_min;
ps->stats.delay_avg = m->statsps.delay_avg = ke->stats.delay_avg;
ps->stats.delay_max = m->statsps.delay_max = ke->stats.delay_max;
mutex_unlock(&m->statspslock);
#endif
mutex_lock(&ps->in_lock);
@ -1358,8 +1361,11 @@ static void callmaster_timer(void *ptr) {
ps->kernel_stats.bytes = ke->stats.bytes;
ps->kernel_stats.errors = ke->stats.errors;
ps->kernel_stats.start = ke->stats.start;
ps->kernel_stats.end = ke->stats.end;
#if (RE_HAS_MEASUREDELAY)
ps->kernel_stats.delay_min = ke->stats.delay_min;
ps->kernel_stats.delay_avg = ke->stats.delay_avg;
ps->kernel_stats.delay_max = ke->stats.delay_max;
#endif
update = 0;
@ -2383,13 +2389,6 @@ static void unkernelize(struct packet_stream *p) {
PS_CLEAR(p, KERNELIZED);
}
void timespec_subtract (struct timespec *result, const struct timespec *a, const struct timespec *b) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec - (long long)b->tv_sec) * 1000000000 + ((long long)a->tv_nsec - (long long)b->tv_nsec);
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
void timeval_subtract (struct timeval *result, const struct timeval *a, const struct timeval *b) {
long microseconds=0;
microseconds = ((long)a->tv_sec - (long)b->tv_sec) * 1000000 + ((long)a->tv_usec - (long)b->tv_usec);
@ -2502,21 +2501,63 @@ void call_destroy(struct call *c) {
if (_log_facility_cdr) {
const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp";
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets=%llu, "
"ml%i_midx%u_%s_relayed_bytes=%llu, "
"ml%i_midx%u_%s_relayed_errors=%llu, "
"ml%i_midx%u_%s_last_packet=%llu, ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet);
if(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) {
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets=%llu, "
"ml%i_midx%u_%s_relayed_bytes=%llu, "
"ml%i_midx%u_%s_relayed_errors=%llu, "
"ml%i_midx%u_%s_last_packet=%llu, ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet);
} else {
#if (RE_HAS_MEASUREDELAY)
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets=%llu, "
"ml%i_midx%u_%s_relayed_bytes=%llu, "
"ml%i_midx%u_%s_relayed_errors=%llu, "
"ml%i_midx%u_%s_last_packet=%llu, "
"ml%i_midx%u_%s_delay_min=%llu.%09llu, "
"ml%i_midx%u_%s_delay_avg=%llu.%09llu, "
"ml%i_midx%u_%s_delay_max=%llu.%09llu, ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_min.tv_sec, (unsigned long long) ps->stats.delay_min.tv_nsec,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_avg.tv_sec, (unsigned long long) ps->stats.delay_avg.tv_nsec,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.delay_max.tv_sec, (unsigned long long) ps->stats.delay_max.tv_nsec);
#else
cdrbufcur += sprintf(cdrbufcur,
"ml%i_midx%u_%s_endpoint_ip=%s, "
"ml%i_midx%u_%s_endpoint_port=%u, "
"ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets=%llu, "
"ml%i_midx%u_%s_relayed_bytes=%llu, "
"ml%i_midx%u_%s_relayed_errors=%llu, "
"ml%i_midx%u_%s_last_packet=%llu, ",
cdrlinecnt, md->index, protocol, addr,
cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet);
#endif
}
}
ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, "

@ -190,8 +190,9 @@ struct stats {
u_int64_t packets;
u_int64_t bytes;
u_int64_t errors;
struct timespec start;
struct timespec end;
struct timespec delay_min;
struct timespec delay_avg;
struct timespec delay_max;
};
struct totalstats {

@ -16,6 +16,7 @@
#include "call.h"
#include "cli.h"
#include "rtpengine_config.h"
static const char* TRUNCATED = " ... Output truncated. Increase Output Buffer ...\n";
@ -148,10 +149,22 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
if (PS_ISSET(ps, FALLBACK_RTCP))
continue;
struct timespec result;
timespec_subtract(&result,&(ps->stats.end),&(ps->stats.start));
#if (RE_HAS_MEASUREDELAY)
if (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) {
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
"%llu p, %llu b, %llu e, %llu last_packet\n",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
(unsigned long long) ps->stats.packets,
(unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet);
} else {
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
"%llu p, %llu b, %llu e, %llu last_packet, %llu.%09llu delay\n",
"%llu p, %llu b, %llu e, %llu last_packet, %llu.%09llu delay_min, %llu.%09llu delay_avg, %llu.%09llu delay_max\n",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port,
@ -160,8 +173,25 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
(unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet,
(unsigned long long) result.tv_sec,
(unsigned long long) result.tv_nsec);
(unsigned long long) ps->stats.delay_min.tv_sec,
(unsigned long long) ps->stats.delay_min.tv_nsec,
(unsigned long long) ps->stats.delay_avg.tv_sec,
(unsigned long long) ps->stats.delay_avg.tv_nsec,
(unsigned long long) ps->stats.delay_max.tv_sec,
(unsigned long long) ps->stats.delay_max.tv_nsec);
}
#else
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
"%llu p, %llu b, %llu e, %llu last_packet\n",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
smart_ntop_p_buf(&ps->endpoint.ip46), ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
(unsigned long long) ps->stats.packets,
(unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet);
#endif
ADJUSTLEN(printlen,outbufend,replybuffer);
}
}

@ -102,21 +102,6 @@ int kernel_del_stream(int fd, u_int16_t p) {
return -1;
}
int kernel_measure_delay(int fd) {
struct rtpengine_message msg;
int ret;
ZERO(msg);
msg.cmd = MMG_MEASUREDELAY;
ret = write(fd, &msg, sizeof(msg));
if (ret > 0)
return 0;
ilog(LOG_ERROR, "Failed to delete relay stream from kernel: %s", strerror(errno));
return -1;
}
GList *kernel_list(unsigned int id) {
char str[64];
int fd;

@ -27,7 +27,6 @@
#include "call_interfaces.h"
#include "cli.h"
#include "graphite.h"
#include "measuredelay.h"
@ -630,13 +629,6 @@ static void timer_loop(void *d) {
poller_timers_wait_run(p, 100);
}
static void measuredelay_loop(void *d) {
struct callmaster *cm = d;
while (!global_shutdown)
measuredelay_loop_run(cm,1); // time in seconds
}
static void graphite_loop(void *d) {
struct callmaster *cm = d;
@ -670,7 +662,7 @@ int main(int argc, char **argv) {
thread_create_detach(sighandler, NULL);
thread_create_detach(timer_loop, ctx.p);
thread_create_detach(measuredelay_loop, ctx.m);
if (graphite_ip)
thread_create_detach(graphite_loop, ctx.m);

@ -1,33 +0,0 @@
/*
* meauredelay.c
*
* Created on: Feb 13, 2015
* Author: fmetz
*/
#include "log.h"
#include "call.h"
#include "measuredelay.h"
#include "kernel.h"
static struct callmaster* cm=0;
static time_t g_now, next_run;
void measuredelay_loop_run(struct callmaster* callmaster, int seconds) {
int rc=0;
g_now = time(NULL);
if (g_now < next_run)
goto sleep;
next_run = g_now + seconds;
if (!cm)
cm = callmaster;
kernel_measure_delay(cm->conf.kernelfd);
sleep:
usleep(100000);
}

@ -1,14 +0,0 @@
/*
* measuredelay.h
*
* Created on: Feb 13, 2015
* Author: fmetz
*/
#ifndef MEASUREDELAY_H_
#define MEASUREDELAY_H_
void measuredelay_loop_run(struct callmaster* callmaster, int seconds);
#endif /* MEASUREDELAY_H_ */

@ -0,0 +1,15 @@
/*
* rtpengine_config.h
*
* Description: Config file with preprocessor config makros
*
* Created on: Mar 18, 2015
* Author: fmetz
*/
#ifndef RTPENGINE_CONFIG_H_
#define RTPENGINE_CONFIG_H_
#define RE_HAS_MEASUREDELAY 1
#endif /* RTPENGINE_CONFIG_H_ */

@ -27,6 +27,8 @@
#include "xt_RTPENGINE.h"
#endif
#include "rtpengine_config.h"
MODULE_LICENSE("GPL");
@ -1664,19 +1666,6 @@ static ssize_t proc_control_write(struct file *file, const char __user *buf, siz
goto err;
break;
case MMG_MEASUREDELAY:
port=0;
if (t==NULL)
break;
g = find_next_target(t, &port);
while (g != NULL) {
spin_lock_irqsave(&g->stats_lock, flags);
g->stats.measureactive=1;
spin_unlock_irqrestore(&g->stats_lock, flags);
g = find_next_target(t, &port);
}
break;
default:
printk(KERN_WARNING "xt_RTPENGINE unimplemented op %u\n", msg.cmd);
err = -EINVAL;
@ -2122,7 +2111,48 @@ static inline int is_dtls(struct sk_buff *skb) {
return 1;
}
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src, struct timespec *starttime) {
static void re_timespec_subtract (struct timespec *result, const struct timespec *a, const struct timespec *b) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec - (long long)b->tv_sec) * (long long)1000000000 + ((long long)a->tv_nsec - (long long)b->tv_nsec);
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
static void re_timespec_multiply(struct timespec *result, const struct timespec *a, const long long multiplier) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec * (long long)1000000000) + (long long)a->tv_nsec * multiplier;
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
static void re_timespec_devide(struct timespec *result, const struct timespec *a, const long devisor) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec * (long long)1000000000) + (long long)a->tv_nsec / devisor;
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
static void re_timespec_add(struct timespec *result, const struct timespec *a, const struct timespec *b) {
long long nanoseconds=0;
nanoseconds = ((long)a->tv_sec + (long long)b->tv_sec) * (long long)1000000000 + ((long long)a->tv_nsec + (long long)b->tv_nsec);
result->tv_sec = nanoseconds/(long long)1000000000;
result->tv_nsec = nanoseconds%(long long)1000000000;
}
/* Return negative, zero, positive if A < B, A == B, A > B, respectively.
Assume the nanosecond components are in range, or close to it. */
static int re_timespec_cmp (struct timespec *a, struct timespec *b)
{
return (a->tv_sec < b->tv_sec ? -1
: a->tv_sec > b->tv_sec ? 1
: a->tv_nsec - b->tv_nsec);
}
#if (RE_HAS_MEASUREDELAY)
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src, struct timespec *starttime) {
#else
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src) {
#endif
struct udphdr *uh;
struct rtpengine_target *g;
struct sk_buff *skb2;
@ -2133,6 +2163,10 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t,
struct rtp_parsed rtp;
u_int64_t pkt_idx = 0;
#if (RE_HAS_MEASUREDELAY)
struct timespec endtime, delay;
#endif
skb_reset_transport_header(skb);
uh = udp_hdr(skb);
skb_pull(skb, sizeof(*uh));
@ -2232,17 +2266,35 @@ not_rtp:
out:
spin_lock_irqsave(&g->stats_lock, flags);
if (g->stats.measureactive==1) {
g->stats.start = *starttime;
getnstimeofday(&g->stats.end);
g->stats.measureactive=0;
}
if (err)
g->stats.errors++;
else {
g->stats.packets++;
g->stats.bytes += skb->len;
#if (RE_HAS_MEASUREDELAY)
getnstimeofday(&endtime);
re_timespec_subtract(&delay,&endtime, starttime);
if (g->stats.packets==1) {
g->stats.delay_min=delay;
g->stats.delay_avg=delay;
g->stats.delay_max=delay;
} else {
if (re_timespec_cmp(&g->stats.delay_min,&delay)>0) {
g->stats.delay_min = delay;
}
if (re_timespec_cmp(&g->stats.delay_max,&delay)<0) {
g->stats.delay_max = delay;
}
re_timespec_multiply(&g->stats.delay_avg,&g->stats.delay_avg,g->stats.packets-1);
re_timespec_add(&g->stats.delay_avg,&g->stats.delay_avg,&delay);
re_timespec_devide(&g->stats.delay_avg,&g->stats.delay_avg,g->stats.packets);
}
#endif
}
spin_unlock_irqrestore(&g->stats_lock, flags);
@ -2278,8 +2330,11 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para
struct iphdr *ih;
struct rtpengine_table *t;
struct re_address src;
#if (RE_HAS_MEASUREDELAY)
struct timespec starttime;
getnstimeofday(&starttime);
#endif
t = get_table(pinfo->id);
if (!t)
@ -2298,8 +2353,11 @@ static unsigned int rtpengine4(struct sk_buff *oskb, const struct xt_action_para
memset(&src, 0, sizeof(src));
src.family = AF_INET;
src.u.ipv4 = ih->saddr;
#if (RE_HAS_MEASUREDELAY)
return rtpengine46(skb, t, &src, &starttime);
#else
return rtpengine46(skb, t, &src);
#endif
skip2:
kfree_skb(skb);
skip3:
@ -2321,8 +2379,11 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para
struct ipv6hdr *ih;
struct rtpengine_table *t;
struct re_address src;
#if (RE_HAS_MEASUREDELAY)
struct timespec starttime;
getnstimeofday(&starttime);
#endif
t = get_table(pinfo->id);
if (!t)
@ -2341,7 +2402,11 @@ static unsigned int rtpengine6(struct sk_buff *oskb, const struct xt_action_para
memset(&src, 0, sizeof(src));
src.family = AF_INET6;
memcpy(&src.u.ipv6, &ih->saddr, sizeof(src.u.ipv6));
#if (RE_HAS_MEASUREDELAY)
return rtpengine46(skb, t, &src, &starttime);
#else
return rtpengine46(skb, t, &src);
#endif
skip2:
kfree_skb(skb);

@ -9,9 +9,9 @@ struct rtpengine_stats {
u_int64_t packets;
u_int64_t bytes;
u_int64_t errors;
struct timespec start;
struct timespec end;
u_int32_t measureactive;
struct timespec delay_min;
struct timespec delay_avg;
struct timespec delay_max;
};
struct re_address {
@ -88,8 +88,7 @@ struct rtpengine_message {
MMG_NOOP = 1,
MMG_ADD,
MMG_DEL,
MMG_UPDATE,
MMG_MEASUREDELAY,
MMG_UPDATE
} cmd;
struct rtpengine_target_info target;

Loading…
Cancel
Save