Reworked port re-use logic and adapted test scripts to accomodate for the changes.

All tests seem to be OK so far, but this is EXPERIMENTAL code - do not use in production.
git.mgm/mediaproxy-ng/2.0
Richard Fuchs 15 years ago
parent 2ce1269d4f
commit 2b37826e46

@ -12,6 +12,7 @@
#include <hiredis.h>
#endif
#include <stdlib.h>
#include <time.h>
#include "call.h"
#include "poller.h"
@ -173,16 +174,20 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_
if (l < 2)
goto skip;
if (!pe->codec) {
cc = b[1];
cc &= 0x7f;
if (cc < G_N_ELEMENTS(rtp_codecs))
pe->codec = rtp_codecs[cc] ? : "unknown";
else
pe->codec = "unknown";
}
if (c->lookup_done && m->poller->now > (c->lookup_done + 3)) {
if (!pe->codec) {
cc = b[1];
cc &= 0x7f;
if (cc < G_N_ELEMENTS(rtp_codecs))
pe->codec = rtp_codecs[cc] ? : "unknown";
else
pe->codec = "unknown";
}
mylog(LOG_DEBUG, "[%s] Confirmed peer information for port %u - " DF, c->callid, r->localport, DP(*fsin));
mylog(LOG_DEBUG, "[%s] Confirmed peer information for port %u - " DF, c->callid, r->localport, DP(*fsin));
pe->confirmed = 1;
}
p->peer.ip = fsin->sin_addr.s_addr;
p->peer.port = ntohs(fsin->sin_port);
@ -191,10 +196,9 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_
p2->peer.ip = fsin->sin_addr.s_addr;
p2->peer.port = p->peer.port + ((int) (p2->idx * 2) - 1);
pe->confirmed = 1;
if (pe2->confirmed && pe2->filled)
if (pe->confirmed && pe2->confirmed && pe2->filled)
kernelize(cs);
#ifndef NO_REDIS
@ -243,8 +247,6 @@ skip:
return -1;
}
pe->used = 1;
drop:
r->stats.packets++;
r->stats.bytes += l;
@ -601,7 +603,7 @@ static int setup_peer(struct peer *p, struct stream *s, const char *tag) {
a = &p->rtps[0];
b = &p->rtps[1];
if (a->peer.ip != s->ip || a->peer.port != b->peer.port) {
if (a->peer_advertised.ip != s->ip || a->peer_advertised.port != s->port) {
cs->peers[0].confirmed = 0;
unkernelize(&cs->peers[0]);
cs->peers[1].confirmed = 0;
@ -612,6 +614,8 @@ static int setup_peer(struct peer *p, struct stream *s, const char *tag) {
a->peer.port = b->peer.port = s->port;
if (b->peer.port)
b->peer.port++;
a->peer_advertised = a->peer;
b->peer_advertised = b->peer;
strdupfree(&p->mediatype, s->mediatype);
strdupfree(&p->tag, tag);
@ -620,8 +624,8 @@ static int setup_peer(struct peer *p, struct stream *s, const char *tag) {
return 0;
}
static void steal_peer(struct peer *p, struct streamrelay *r) {
struct peer *s = r->up;
static void steal_peer(struct peer *dest, struct peer *src) {
struct streamrelay *r;
int i;
struct poller_item pi;
struct streamrelay *sr, *srs;
@ -629,27 +633,29 @@ static void steal_peer(struct peer *p, struct streamrelay *r) {
struct poller *po;
ZERO(pi);
c = s->up->call;
r = &src->rtps[0];
c = src->up->call;
po = c->callmaster->poller;
mylog(LOG_DEBUG, "[%s] Re-using existing open RTP port %u", c->callid, r->localport);
p->confirmed = 0;
unkernelize(p);
s->confirmed = 0;
unkernelize(s);
dest->confirmed = 0;
unkernelize(dest);
src->confirmed = 0;
unkernelize(src);
p->filled = 1;
strmove(&p->mediatype, &s->mediatype);
strmove(&p->tag, &s->tag);
//p->kernelized = s->kernelized;
//s->kernelized = 0;
dest->filled = 1;
strmove(&dest->mediatype, &src->mediatype);
strmove(&dest->tag, &src->tag);
//dest->kernelized = src->kernelized;
//src->kernelized = 0;
for (i = 0; i < 2; i++) {
sr = &p->rtps[i];
srs = &s->rtps[i];
sr = &dest->rtps[i];
srs = &src->rtps[i];
if (sr->fd != -1) {
mylog(LOG_DEBUG, "[%s] Closing port %u in favor of re-use", c->callid, sr->localport);
close(sr->fd);
BIT_ARRAY_CLEAR(ports_used, sr->localport);
poller_del_item(po, sr->fd);
@ -659,6 +665,8 @@ static void steal_peer(struct peer *p, struct streamrelay *r) {
sr->peer.ip = srs->peer.ip;
sr->peer.port = srs->peer.port;
sr->peer_advertised.ip = srs->peer_advertised.ip;
sr->peer_advertised.port = srs->peer_advertised.port;
sr->localport = srs->localport;
@ -667,6 +675,8 @@ static void steal_peer(struct peer *p, struct streamrelay *r) {
srs->peer.ip = 0;
srs->peer.port = 0;
srs->localport = 0;
srs->peer_advertised.ip = 0;
srs->peer_advertised.port = 0;
pi.fd = sr->fd;
pi.ptr = sr;
@ -679,7 +689,7 @@ static void steal_peer(struct peer *p, struct streamrelay *r) {
static void callstream_init(struct callstream *s, struct call *ca, int port1, int port2) {
int i, j;
int i, j, tport;
struct peer *p;
struct streamrelay *r;
struct poller_item pi;
@ -709,17 +719,21 @@ static void callstream_init(struct callstream *s, struct call *ca, int port1, in
r->last = po->now;
}
get_port_pair(p, (i == 0) ? port1 : port2);
tport = (i == 0) ? port1 : port2;
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
if (tport >= 0) {
get_port_pair(p, tport);
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
pi.fd = r->fd;
pi.ptr = r;
pi.readable = stream_readable;
pi.closed = stream_closed;
pi.fd = r->fd;
pi.ptr = r;
pi.readable = stream_readable;
pi.closed = stream_closed;
poller_add_item(po, &pi);
poller_add_item(po, &pi);
}
}
}
}
@ -732,10 +746,9 @@ static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int
struct stream *t;
int x;
struct streamrelay *r;
struct callstream *cs;
struct callstream *cs, *cs_o;
struct peer *p;
unsigned int ret;
int no_reuse = 0;
q = g_queue_new(); /* new callstreams list */
@ -747,43 +760,22 @@ static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int
p = NULL;
if (!opmode) {
DBG("creating new callstream");
cs = g_slice_alloc(sizeof(*cs));
callstream_init(cs, c, 0, 0);
p = &cs->peers[0];
}
else {
l = c->callstreams->head;
if (!l) {
mylog(LOG_WARNING, "[%s] Got LOOKUP, but no callstreams found", c->callid);
break;
}
cs = l->data;
g_queue_delete_link(c->callstreams, l);
p = &cs->peers[1];
if (cs->peers[0].used)
no_reuse = 1;
}
if (no_reuse)
goto skip;
/* look for an existing call stream with identical parameters */
for (l = c->callstreams->head; l; l = l->next) {
cs = l->data;
cs_o = l->data;
for (x = 0; x < 2; x++) {
r = &cs->peers[x].rtps[0];
/*
if (r->up->used)
continue;
*/
if (r->peer.ip != t->ip)
r = &cs_o->peers[x].rtps[0];
if (r->peer_advertised.ip != t->ip)
continue;
if (r->peer.port != t->port)
if (r->peer_advertised.port != t->port)
continue;
if (strcmp(cs->peers[x].tag, tag))
if (strcmp(cs_o->peers[x].tag, tag))
continue;
DBG("found existing call stream to steal");
goto found;
@ -791,13 +783,79 @@ static unsigned int call_streams(struct call *c, GQueue *s, const char *tag, int
}
/* not found */
skip:
setup_peer(p, t, tag);
g_queue_push_tail(q, p->up);
continue;
r = NULL;
cs_o = NULL;
l = NULL;
found:
steal_peer(p, r);
if (!opmode) { /* request */
DBG("creating new callstream");
cs = g_slice_alloc(sizeof(*cs));
if (!r) {
/* nothing found to re-use, open new ports */
callstream_init(cs, c, 0, 0);
p = &cs->peers[0];
setup_peer(p, t, tag);
}
else {
/* re-use, so don't open new ports */
callstream_init(cs, c, -1, -1);
if (r->up->idx == 0) {
/* request/lookup came in the same order as before */
steal_peer(&cs->peers[0], &cs_o->peers[0]);
steal_peer(&cs->peers[1], &cs_o->peers[1]);
}
else {
/* reversed request/lookup */
steal_peer(&cs->peers[0], &cs_o->peers[1]);
steal_peer(&cs->peers[1], &cs_o->peers[0]);
}
}
g_queue_push_tail(q, cs);
ZERO(c->lookup_done);
continue;
}
/* lookup */
l = c->callstreams->head;
if (!l) {
mylog(LOG_WARNING, "[%s] Got LOOKUP, but no callstreams found", c->callid);
break;
}
cs = l->data;
g_queue_delete_link(c->callstreams, l);
p = &cs->peers[1];
if (r && p == r->up) {
/* best case, nothing to do */
;
}
else if (r && cs_o != cs) {
/* found something, but it's linked to a different stream */
steal_peer(p, r->up);
}
else if (!r && !p->filled) {
/* nothing found to steal, but this end is open */
setup_peer(p, t, tag);
}
else {
/* nothing found to steal and this end is used */
/* need a new call stream after all */
cs_o = cs;
cs = g_slice_alloc(sizeof(*cs));
callstream_init(cs, c, 0, 0);
steal_peer(&cs->peers[0], &cs_o->peers[0]);
p = &cs->peers[1];
setup_peer(p, t, tag);
g_queue_push_tail(c->callstreams, cs_o);
}
time(&c->lookup_done);
g_queue_push_tail(q, p->up);
}

@ -9,6 +9,7 @@
#ifndef NO_REDIS
#include <hiredis.h>
#endif
#include <time.h>
#include "ipt_MEDIAPROXY.h"
@ -37,6 +38,7 @@ struct stream {
struct streamrelay {
int fd;
struct stream peer;
struct stream peer_advertised;
u_int16_t localport;
unsigned char idx;
struct peer *up;
@ -54,7 +56,6 @@ struct peer {
int kernelized:1;
int filled:1;
int confirmed:1;
int used:1;
};
struct callstream {
struct peer peers[2];
@ -74,6 +75,7 @@ struct call {
char *calling_agent;
char *called_agent;
GHashTable *infohash;
time_t lookup_done;
};
struct callmaster {

@ -56,21 +56,25 @@ sub send_rcv {
my $pkt = join('',map(rand,1..10));
send($sendfd, $pkt, 0, sockaddr_in($sendtoport, inet_aton($sendtoip))) or die;
my $inc;
alarm(5);
recv($recvfd, $inc, length($pkt), 0);
alarm(0);
$inc eq $pkt or die;
{
local $SIG{ALRM} = sub {
print("timeout!\n");
};
alarm(1);
recv($recvfd, $inc, length($pkt), 0);
alarm(0);
}
$inc eq $pkt or print("NOT received packed ok\n"), return;
print("received packet ok\n");
}
sub send_rcv4 {
send_rcv(@_[0,1,2,3]);
sleep(1);
send_rcv(@_[3,4,5,0]);
sleep(1);
send_rcv(@_[0,1,2,3]);
sleep(1);
send_rcv(@_[3,4,5,0]);
for (1 .. 4) {
send_rcv(@_[0,1,2,3]);
sleep(1);
send_rcv(@_[3,4,5,0]);
sleep(1);
}
}
sub sim_req_lk {
@ -229,7 +233,10 @@ my ($mpip11, $mpport11) = sim_rq($callid1, $mpip10, $mpport10, $totag1, $fromtag
print("mediaproxy: tell A to send to $mpport11 instead of $mpip10:$mpport10\n");
brk();
send_rcv($client1, $mpip11, $mpport11, $client4); ###### <<<<<< error trigger
if (1) {
###### error trigger
send_rcv($client1, $mpip11, $mpport11, $client4);
}
print("A tells B: send RTP to $lp1\n");
my ($mpip12, $mpport12) = sim_lk($callid1, $local_ip, $lp1, $totag1, $fromtag1);

@ -74,14 +74,12 @@ sub send_rcv_brk {
}
sub send_rcv4 {
send_rcv(@_[0,1,2,3]);
sleep(1);
send_rcv(@_[3,4,5,0]);
sleep(1);
send_rcv(@_[0,1,2,3]);
sleep(1);
send_rcv(@_[3,4,5,0]);
sleep(1);
for (1 .. 4) {
send_rcv(@_[0,1,2,3]);
sleep(1);
send_rcv(@_[3,4,5,0]);
sleep(1);
}
}
sub sim_req_lk {
@ -202,8 +200,9 @@ for my $tuple (\@forward, \@backward) {
print("mediaproxy: tell $dst to send to $mpport1 instead of $p1\n");
brk();
send_rcv_brk($c2, $mpip1, $mpport1, $c1);
send_rcv_brk($c2, $mpip1, $mpport1, $c1);
for (1 .. 4) {
send_rcv_brk($c2, $mpip1, $mpport1, $c1);
}
print("$dst tells $src: send RTP to $p2\n");
brk();
@ -263,11 +262,13 @@ for my $tuple (\@forward, \@backward) {
print("mediaproxy: tell $dst to send to $mpport5 instead of $p1\n");
brk();
send_rcv_brk($c2, $mpip3, $mpport3, $c1);
send_rcv_brk($c2, $mpip3, $mpport3, $c1);
for (1 .. 4) {
send_rcv_brk($c2, $mpip3, $mpport3, $c1);
}
print("switching to new port...\n");
send_rcv_brk($c2, $mpip5, $mpport5, $c1);
send_rcv_brk($c2, $mpip5, $mpport5, $c1);
for (1 .. 4) {
send_rcv_brk($c2, $mpip5, $mpport5, $c1);
}
print("$dst tells $src: send RTP to $p2\n");
brk();
@ -276,8 +277,9 @@ for my $tuple (\@forward, \@backward) {
print("mediaproxy: tell $src to send to $mpport6 instead of $p2\n");
brk();
send_rcv_brk($c2, $mpip5, $mpport5, $c1);
send_rcv_brk($c2, $mpip5, $mpport5, $c1);
for (1 .. 4) {
send_rcv_brk($c2, $mpip5, $mpport5, $c1);
}
send_rcv4($c2, $mpip5, $mpport5, $c1, $mpip6, $mpport6);
@ -298,8 +300,9 @@ for my $tuple (\@forward, \@backward) {
print("mediaproxy: tell $dst to send to $mpport7 instead of $p1\n");
brk();
send_rcv_brk($c1, $mpip6, $mpport6, $c2);
send_rcv_brk($c1, $mpip6, $mpport6, $c2);
for (1 .. 4) {
send_rcv_brk($c1, $mpip6, $mpport6, $c2);
}
print("$dst tells $src: send RTP to $p2\n");
brk();
@ -308,11 +311,13 @@ for my $tuple (\@forward, \@backward) {
print("mediaproxy: tell $src to send to $mpport8 instead of $p2\n");
brk();
send_rcv_brk($c1, $mpip6, $mpport6, $c2);
send_rcv_brk($c1, $mpip6, $mpport6, $c2);
for (1 .. 4) {
send_rcv_brk($c1, $mpip6, $mpport6, $c2);
}
print("switching to new port...\n");
send_rcv_brk($c1, $mpip8, $mpport8, $c2);
send_rcv_brk($c1, $mpip8, $mpport8, $c2);
for (1 .. 4) {
send_rcv_brk($c1, $mpip8, $mpport8, $c2);
}
send_rcv4($c2, $mpip7, $mpport7, $c1, $mpip8, $mpport8);

Loading…
Cancel
Save