From 2b37826e46c285ccdddc30be95bc9e705ff32e30 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 7 Sep 2011 21:17:31 +0000 Subject: [PATCH] Reworked port re-use logic and adapted test scripts to accomodate for the changes. All tests seem to be OK so far, but this is EXPERIMENTAL code - do not use in production. --- daemon/call.c | 204 ++++++++++++++++++++++------------ daemon/call.h | 4 +- tests/3-way-connect-simulator | 31 ++++-- tests/reinvite-simulator | 49 ++++---- 4 files changed, 180 insertions(+), 108 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 67f927399..f03e0201d 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -12,6 +12,7 @@ #include #endif #include +#include #include "call.h" #include "poller.h" @@ -173,16 +174,20 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_ if (l < 2) goto skip; - if (!pe->codec) { - cc = b[1]; - cc &= 0x7f; - if (cc < G_N_ELEMENTS(rtp_codecs)) - pe->codec = rtp_codecs[cc] ? : "unknown"; - else - pe->codec = "unknown"; - } + if (c->lookup_done && m->poller->now > (c->lookup_done + 3)) { + if (!pe->codec) { + cc = b[1]; + cc &= 0x7f; + if (cc < G_N_ELEMENTS(rtp_codecs)) + pe->codec = rtp_codecs[cc] ? : "unknown"; + else + pe->codec = "unknown"; + } - mylog(LOG_DEBUG, "[%s] Confirmed peer information for port %u - " DF, c->callid, r->localport, DP(*fsin)); + mylog(LOG_DEBUG, "[%s] Confirmed peer information for port %u - " DF, c->callid, r->localport, DP(*fsin)); + + pe->confirmed = 1; + } p->peer.ip = fsin->sin_addr.s_addr; p->peer.port = ntohs(fsin->sin_port); @@ -191,10 +196,9 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_ p2->peer.ip = fsin->sin_addr.s_addr; p2->peer.port = p->peer.port + ((int) (p2->idx * 2) - 1); - pe->confirmed = 1; - if (pe2->confirmed && pe2->filled) + if (pe->confirmed && pe2->confirmed && pe2->filled) kernelize(cs); #ifndef NO_REDIS @@ -243,8 +247,6 @@ skip: return -1; } - pe->used = 1; - drop: r->stats.packets++; r->stats.bytes += l; @@ -601,7 +603,7 @@ static int setup_peer(struct peer *p, struct stream *s, const char *tag) { a = &p->rtps[0]; b = &p->rtps[1]; - if (a->peer.ip != s->ip || a->peer.port != b->peer.port) { + if (a->peer_advertised.ip != s->ip || a->peer_advertised.port != s->port) { cs->peers[0].confirmed = 0; unkernelize(&cs->peers[0]); cs->peers[1].confirmed = 0; @@ -612,6 +614,8 @@ static int setup_peer(struct peer *p, struct stream *s, const char *tag) { a->peer.port = b->peer.port = s->port; if (b->peer.port) b->peer.port++; + a->peer_advertised = a->peer; + b->peer_advertised = b->peer; strdupfree(&p->mediatype, s->mediatype); strdupfree(&p->tag, tag); @@ -620,8 +624,8 @@ static int setup_peer(struct peer *p, struct stream *s, const char *tag) { return 0; } -static void steal_peer(struct peer *p, struct streamrelay *r) { - struct peer *s = r->up; +static void steal_peer(struct peer *dest, struct peer *src) { + struct streamrelay *r; int i; struct poller_item pi; struct streamrelay *sr, *srs; @@ -629,27 +633,29 @@ static void steal_peer(struct peer *p, struct streamrelay *r) { struct poller *po; ZERO(pi); - c = s->up->call; + r = &src->rtps[0]; + c = src->up->call; po = c->callmaster->poller; mylog(LOG_DEBUG, "[%s] Re-using existing open RTP port %u", c->callid, r->localport); - p->confirmed = 0; - unkernelize(p); - s->confirmed = 0; - unkernelize(s); + dest->confirmed = 0; + unkernelize(dest); + src->confirmed = 0; + unkernelize(src); - p->filled = 1; - strmove(&p->mediatype, &s->mediatype); - strmove(&p->tag, &s->tag); - //p->kernelized = s->kernelized; - //s->kernelized = 0; + dest->filled = 1; + strmove(&dest->mediatype, &src->mediatype); + strmove(&dest->tag, &src->tag); + //dest->kernelized = src->kernelized; + //src->kernelized = 0; for (i = 0; i < 2; i++) { - sr = &p->rtps[i]; - srs = &s->rtps[i]; + sr = &dest->rtps[i]; + srs = &src->rtps[i]; if (sr->fd != -1) { + mylog(LOG_DEBUG, "[%s] Closing port %u in favor of re-use", c->callid, sr->localport); close(sr->fd); BIT_ARRAY_CLEAR(ports_used, sr->localport); poller_del_item(po, sr->fd); @@ -659,6 +665,8 @@ static void steal_peer(struct peer *p, struct streamrelay *r) { sr->peer.ip = srs->peer.ip; sr->peer.port = srs->peer.port; + sr->peer_advertised.ip = srs->peer_advertised.ip; + sr->peer_advertised.port = srs->peer_advertised.port; sr->localport = srs->localport; @@ -667,6 +675,8 @@ static void steal_peer(struct peer *p, struct streamrelay *r) { srs->peer.ip = 0; srs->peer.port = 0; srs->localport = 0; + srs->peer_advertised.ip = 0; + srs->peer_advertised.port = 0; pi.fd = sr->fd; pi.ptr = sr; @@ -679,7 +689,7 @@ static void steal_peer(struct peer *p, struct streamrelay *r) { static void callstream_init(struct callstream *s, struct call *ca, int port1, int port2) { - int i, j; + int i, j, tport; struct peer *p; struct streamrelay *r; struct poller_item pi; @@ -709,17 +719,21 @@ static void callstream_init(struct callstream *s, struct call *ca, int port1, in r->last = po->now; } - get_port_pair(p, (i == 0) ? port1 : port2); + tport = (i == 0) ? port1 : port2; - for (j = 0; j < 2; j++) { - r = &p->rtps[j]; + if (tport >= 0) { + get_port_pair(p, tport); + + for (j = 0; j < 2; j++) { + r = &p->rtps[j]; - pi.fd = r->fd; - pi.ptr = r; - pi.readable = stream_readable; - pi.closed = stream_closed; + pi.fd = r->fd; + pi.ptr = r; + pi.readable = stream_readable; + pi.closed = stream_closed; - poller_add_item(po, &pi); + poller_add_item(po, &pi); + } } } } @@ -732,10 +746,9 @@ static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int struct stream *t; int x; struct streamrelay *r; - struct callstream *cs; + struct callstream *cs, *cs_o; struct peer *p; unsigned int ret; - int no_reuse = 0; q = g_queue_new(); /* new callstreams list */ @@ -747,43 +760,22 @@ static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int p = NULL; - if (!opmode) { - DBG("creating new callstream"); - cs = g_slice_alloc(sizeof(*cs)); - callstream_init(cs, c, 0, 0); - p = &cs->peers[0]; - } - else { - l = c->callstreams->head; - if (!l) { - mylog(LOG_WARNING, "[%s] Got LOOKUP, but no callstreams found", c->callid); - break; - } - cs = l->data; - g_queue_delete_link(c->callstreams, l); - p = &cs->peers[1]; - if (cs->peers[0].used) - no_reuse = 1; - } - if (no_reuse) - goto skip; + + + /* look for an existing call stream with identical parameters */ for (l = c->callstreams->head; l; l = l->next) { - cs = l->data; + cs_o = l->data; for (x = 0; x < 2; x++) { - r = &cs->peers[x].rtps[0]; - /* - if (r->up->used) - continue; - */ - if (r->peer.ip != t->ip) + r = &cs_o->peers[x].rtps[0]; + if (r->peer_advertised.ip != t->ip) continue; - if (r->peer.port != t->port) + if (r->peer_advertised.port != t->port) continue; - if (strcmp(cs->peers[x].tag, tag)) + if (strcmp(cs_o->peers[x].tag, tag)) continue; DBG("found existing call stream to steal"); goto found; @@ -791,13 +783,79 @@ static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int } /* not found */ -skip: - setup_peer(p, t, tag); - g_queue_push_tail(q, p->up); - continue; + r = NULL; + cs_o = NULL; + l = NULL; found: - steal_peer(p, r); + + if (!opmode) { /* request */ + DBG("creating new callstream"); + + cs = g_slice_alloc(sizeof(*cs)); + + if (!r) { + /* nothing found to re-use, open new ports */ + callstream_init(cs, c, 0, 0); + p = &cs->peers[0]; + setup_peer(p, t, tag); + } + else { + /* re-use, so don't open new ports */ + callstream_init(cs, c, -1, -1); + if (r->up->idx == 0) { + /* request/lookup came in the same order as before */ + steal_peer(&cs->peers[0], &cs_o->peers[0]); + steal_peer(&cs->peers[1], &cs_o->peers[1]); + } + else { + /* reversed request/lookup */ + steal_peer(&cs->peers[0], &cs_o->peers[1]); + steal_peer(&cs->peers[1], &cs_o->peers[0]); + } + } + + g_queue_push_tail(q, cs); + ZERO(c->lookup_done); + continue; + } + + /* lookup */ + l = c->callstreams->head; + if (!l) { + mylog(LOG_WARNING, "[%s] Got LOOKUP, but no callstreams found", c->callid); + break; + } + cs = l->data; + g_queue_delete_link(c->callstreams, l); + p = &cs->peers[1]; + + if (r && p == r->up) { + /* best case, nothing to do */ + ; + } + else if (r && cs_o != cs) { + /* found something, but it's linked to a different stream */ + steal_peer(p, r->up); + } + else if (!r && !p->filled) { + /* nothing found to steal, but this end is open */ + setup_peer(p, t, tag); + } + else { + /* nothing found to steal and this end is used */ + /* need a new call stream after all */ + cs_o = cs; + cs = g_slice_alloc(sizeof(*cs)); + callstream_init(cs, c, 0, 0); + steal_peer(&cs->peers[0], &cs_o->peers[0]); + p = &cs->peers[1]; + setup_peer(p, t, tag); + g_queue_push_tail(c->callstreams, cs_o); + } + + time(&c->lookup_done); + g_queue_push_tail(q, p->up); } diff --git a/daemon/call.h b/daemon/call.h index 936e9b811..f55b0f47f 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -9,6 +9,7 @@ #ifndef NO_REDIS #include #endif +#include #include "ipt_MEDIAPROXY.h" @@ -37,6 +38,7 @@ struct stream { struct streamrelay { int fd; struct stream peer; + struct stream peer_advertised; u_int16_t localport; unsigned char idx; struct peer *up; @@ -54,7 +56,6 @@ struct peer { int kernelized:1; int filled:1; int confirmed:1; - int used:1; }; struct callstream { struct peer peers[2]; @@ -74,6 +75,7 @@ struct call { char *calling_agent; char *called_agent; GHashTable *infohash; + time_t lookup_done; }; struct callmaster { diff --git a/tests/3-way-connect-simulator b/tests/3-way-connect-simulator index be6e6e8ab..453de2e9b 100755 --- a/tests/3-way-connect-simulator +++ b/tests/3-way-connect-simulator @@ -56,21 +56,25 @@ sub send_rcv { my $pkt = join('',map(rand,1..10)); send($sendfd, $pkt, 0, sockaddr_in($sendtoport, inet_aton($sendtoip))) or die; my $inc; - alarm(5); - recv($recvfd, $inc, length($pkt), 0); - alarm(0); - $inc eq $pkt or die; + { + local $SIG{ALRM} = sub { + print("timeout!\n"); + }; + alarm(1); + recv($recvfd, $inc, length($pkt), 0); + alarm(0); + } + $inc eq $pkt or print("NOT received packed ok\n"), return; print("received packet ok\n"); } sub send_rcv4 { - send_rcv(@_[0,1,2,3]); - sleep(1); - send_rcv(@_[3,4,5,0]); - sleep(1); - send_rcv(@_[0,1,2,3]); - sleep(1); - send_rcv(@_[3,4,5,0]); + for (1 .. 4) { + send_rcv(@_[0,1,2,3]); + sleep(1); + send_rcv(@_[3,4,5,0]); + sleep(1); + } } sub sim_req_lk { @@ -229,7 +233,10 @@ my ($mpip11, $mpport11) = sim_rq($callid1, $mpip10, $mpport10, $totag1, $fromtag print("mediaproxy: tell A to send to $mpport11 instead of $mpip10:$mpport10\n"); brk(); -send_rcv($client1, $mpip11, $mpport11, $client4); ###### <<<<<< error trigger +if (1) { + ###### error trigger + send_rcv($client1, $mpip11, $mpport11, $client4); +} print("A tells B: send RTP to $lp1\n"); my ($mpip12, $mpport12) = sim_lk($callid1, $local_ip, $lp1, $totag1, $fromtag1); diff --git a/tests/reinvite-simulator b/tests/reinvite-simulator index c74a6c77e..0008a2a13 100755 --- a/tests/reinvite-simulator +++ b/tests/reinvite-simulator @@ -74,14 +74,12 @@ sub send_rcv_brk { } sub send_rcv4 { - send_rcv(@_[0,1,2,3]); - sleep(1); - send_rcv(@_[3,4,5,0]); - sleep(1); - send_rcv(@_[0,1,2,3]); - sleep(1); - send_rcv(@_[3,4,5,0]); - sleep(1); + for (1 .. 4) { + send_rcv(@_[0,1,2,3]); + sleep(1); + send_rcv(@_[3,4,5,0]); + sleep(1); + } } sub sim_req_lk { @@ -202,8 +200,9 @@ for my $tuple (\@forward, \@backward) { print("mediaproxy: tell $dst to send to $mpport1 instead of $p1\n"); brk(); - send_rcv_brk($c2, $mpip1, $mpport1, $c1); - send_rcv_brk($c2, $mpip1, $mpport1, $c1); + for (1 .. 4) { + send_rcv_brk($c2, $mpip1, $mpport1, $c1); + } print("$dst tells $src: send RTP to $p2\n"); brk(); @@ -263,11 +262,13 @@ for my $tuple (\@forward, \@backward) { print("mediaproxy: tell $dst to send to $mpport5 instead of $p1\n"); brk(); - send_rcv_brk($c2, $mpip3, $mpport3, $c1); - send_rcv_brk($c2, $mpip3, $mpport3, $c1); + for (1 .. 4) { + send_rcv_brk($c2, $mpip3, $mpport3, $c1); + } print("switching to new port...\n"); - send_rcv_brk($c2, $mpip5, $mpport5, $c1); - send_rcv_brk($c2, $mpip5, $mpport5, $c1); + for (1 .. 4) { + send_rcv_brk($c2, $mpip5, $mpport5, $c1); + } print("$dst tells $src: send RTP to $p2\n"); brk(); @@ -276,8 +277,9 @@ for my $tuple (\@forward, \@backward) { print("mediaproxy: tell $src to send to $mpport6 instead of $p2\n"); brk(); - send_rcv_brk($c2, $mpip5, $mpport5, $c1); - send_rcv_brk($c2, $mpip5, $mpport5, $c1); + for (1 .. 4) { + send_rcv_brk($c2, $mpip5, $mpport5, $c1); + } send_rcv4($c2, $mpip5, $mpport5, $c1, $mpip6, $mpport6); @@ -298,8 +300,9 @@ for my $tuple (\@forward, \@backward) { print("mediaproxy: tell $dst to send to $mpport7 instead of $p1\n"); brk(); - send_rcv_brk($c1, $mpip6, $mpport6, $c2); - send_rcv_brk($c1, $mpip6, $mpport6, $c2); + for (1 .. 4) { + send_rcv_brk($c1, $mpip6, $mpport6, $c2); + } print("$dst tells $src: send RTP to $p2\n"); brk(); @@ -308,11 +311,13 @@ for my $tuple (\@forward, \@backward) { print("mediaproxy: tell $src to send to $mpport8 instead of $p2\n"); brk(); - send_rcv_brk($c1, $mpip6, $mpport6, $c2); - send_rcv_brk($c1, $mpip6, $mpport6, $c2); + for (1 .. 4) { + send_rcv_brk($c1, $mpip6, $mpport6, $c2); + } print("switching to new port...\n"); - send_rcv_brk($c1, $mpip8, $mpport8, $c2); - send_rcv_brk($c1, $mpip8, $mpport8, $c2); + for (1 .. 4) { + send_rcv_brk($c1, $mpip8, $mpport8, $c2); + } send_rcv4($c2, $mpip7, $mpport7, $c1, $mpip8, $mpport8);