implement a "relays cache"

git.mgm/mediaproxy-ng/2.2
Richard Fuchs 13 years ago
parent 061507585c
commit 7137b784e9

@ -21,6 +21,9 @@
#define OFFSET_OF(t,e) ((unsigned int) (unsigned long) &(((t *) 0)->e))
#define ZERO(x) memset(&(x), 0, sizeof(x))
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a)))
#endif
#define IPF "%u.%u.%u.%u"
#define IPP(x) ((unsigned char *) (&(x)))[0], ((unsigned char *) (&(x)))[1], ((unsigned char *) (&(x)))[2], ((unsigned char *) (&(x)))[3]

@ -112,6 +112,7 @@ static char *rtp_codecs[] = {
static void call_destroy(struct call *);
static void unkernelize(struct peer *);
static void relays_cache_port_used(struct relays_cache *c);
@ -800,10 +801,9 @@ fail:
static int get_port4(struct streamrelay *r, u_int16_t p) {
static int get_port4(struct streamrelay *r, u_int16_t p, struct callmaster *m) {
int fd;
struct sockaddr_in sin;
struct callmaster *m = r->up->up->call->callmaster;
fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0)
@ -830,10 +830,9 @@ fail:
return -1;
}
static int get_port6(struct streamrelay *r, u_int16_t p) {
static int get_port6(struct streamrelay *r, u_int16_t p, struct callmaster *m) {
int fd;
struct sockaddr_in6 sin;
struct callmaster *m = r->up->up->call->callmaster;
int tos;
fd = socket(AF_INET6, SOCK_DGRAM, 0);
@ -867,9 +866,8 @@ fail:
return -1;
}
static int get_port(struct streamrelay *r, u_int16_t p) {
static int get_port(struct streamrelay *r, u_int16_t p, struct callmaster *m) {
int ret;
struct callmaster *m = r->up->up->call->callmaster;
assert(r->fd == -1);
@ -882,9 +880,9 @@ static int get_port(struct streamrelay *r, u_int16_t p) {
mutex_unlock(&m->portlock);
if (IN6_IS_ADDR_UNSPECIFIED(&m->conf.ipv6))
ret = get_port4(r, p);
ret = get_port4(r, p, m);
else
ret = get_port6(r, p);
ret = get_port6(r, p, m);
if (ret) {
mutex_lock(&m->portlock);
@ -898,9 +896,7 @@ static int get_port(struct streamrelay *r, u_int16_t p) {
return 0;
}
static void release_port(struct streamrelay *r) {
struct callmaster *m = r->up->up->call->callmaster;
static void release_port(struct streamrelay *r, struct callmaster *m) {
if (r->fd == -1 || !r->localport)
return;
mutex_lock(&m->portlock);
@ -911,7 +907,7 @@ static void release_port(struct streamrelay *r) {
r->localport = 0;
}
static void get_consecutive_ports(struct streamrelay *array, int array_len, int wanted_start_port, struct call *c) {
static int get_consecutive_ports(struct streamrelay *array, int array_len, int wanted_start_port, struct call *c) {
int i, j, cycle = 0;
struct streamrelay *it;
u_int16_t port;
@ -942,14 +938,14 @@ static void get_consecutive_ports(struct streamrelay *array, int array_len, int
goto release_restart;
}
if (get_port(it, port++))
if (get_port(it, port++, m))
goto release_restart;
}
break;
release_restart:
for (j = 0; j < i; j++)
release_port(&array[j]);
release_port(&array[j], m);
if (cycle >= 2 || wanted_start_port > 0)
goto fail;
@ -962,10 +958,11 @@ release_restart:
mylog(LOG_DEBUG, LOG_PREFIX_CI "Opened ports %u..%u for RTP",
LOG_PARAMS_CI(c), array[0].localport, array[array_len - 1].localport);
return;
return 0;
fail:
mylog(LOG_ERR, LOG_PREFIX_CI "Failed to get RTP port pair", LOG_PARAMS_CI(c));
return -1;
}
/* caller is responsible for appropriate locking */
@ -1054,7 +1051,7 @@ static void steal_peer(struct peer *dest, struct peer *src) {
mylog(LOG_DEBUG, LOG_PREFIX_CI "Closing port %u in favor of re-use",
LOG_PARAMS_CI(c), sr->localport);
poller_del_item(po, sr->fd);
release_port(sr);
release_port(sr, m);
}
sr->fd = srs->fd;
@ -1087,10 +1084,12 @@ static void steal_peer(struct peer *dest, struct peer *src) {
}
void callstream_init(struct callstream *s, int port1, int port2) {
int i, j, tport;
/* XXX no need for a full streamrelay struct, split it */
/* XXX return value? */
void callstream_init(struct callstream *s, struct relays_cache *rc) {
int i, j;
struct peer *p;
struct streamrelay *r;
struct streamrelay *r, *relay_AB;
struct poller_item pi;
struct call *c = s->call;
struct poller *po = c->callmaster->poller;
@ -1099,6 +1098,7 @@ void callstream_init(struct callstream *s, int port1, int port2) {
for (i = 0; i < 2; i++) {
p = &s->peers[i];
relay_AB = rc ? rc->array_ptrs[i] : NULL;
p->idx = i;
p->up = s;
@ -1111,15 +1111,11 @@ void callstream_init(struct callstream *s, int port1, int port2) {
r->idx = j;
r->up = p;
r->last = poller_now;
}
tport = (i == 0) ? port1 : port2;
if (tport >= 0) {
get_consecutive_ports(p->rtps, 2, tport, c);
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
if (relay_AB && relay_AB[j].fd != -1) {
r->fd = relay_AB[j].fd;
r->fd_family = relay_AB[j].fd_family;
r->localport = relay_AB[j].localport;
pi.fd = r->fd;
pi.obj = &s->obj;
@ -1128,15 +1124,21 @@ void callstream_init(struct callstream *s, int port1, int port2) {
pi.closed = stream_closed;
poller_add_item(po, &pi);
relay_AB[j].fd = -1;
}
}
}
if (rc)
relays_cache_port_used(rc);
}
static void callstream_free(void *ptr) {
struct callstream *s = ptr;
struct callmaster *m = s->call->callmaster;
int i, j;
struct peer *p;
struct streamrelay *r;
@ -1146,13 +1148,63 @@ static void callstream_free(void *ptr) {
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
release_port(r);
release_port(r, m);
}
}
mutex_destroy(&s->lock);
obj_put(s->call);
}
static void relays_cache_init(struct relays_cache *c) {
memset(c, -1, sizeof(*c));
c->relays_open = 0;
c->array_ptrs[0] = c->relays_A;
c->array_ptrs[1] = c->relays_B;
}
static int relays_cache_get_ports(struct relays_cache *c, int num, struct call *call) {
num *= 2;
if (c->relays_open >= num)
return 0;
if (c->relays_open + num > ARRAY_SIZE(c->relays_A))
return -1;
if (get_consecutive_ports(&c->relays_A[c->relays_open], num, 0, call))
return -1;
if (get_consecutive_ports(&c->relays_B[c->relays_open], num, 0, call))
return -1;
c->relays_open += num;
return 0;
}
static void relays_cache_port_used(struct relays_cache *c) {
if (c->relays_open < 2)
return;
c->relays_open -= 2;
if (c->relays_open) {
memmove(&c->relays_A[0], &c->relays_A[2], c->relays_open * sizeof(*c->relays_A));
memmove(&c->relays_B[0], &c->relays_B[2], c->relays_open * sizeof(*c->relays_B));
}
c->relays_A[c->relays_open].fd = -1;
c->relays_B[c->relays_open].fd = -1;
}
static void relays_cache_cleanup(struct relays_cache *c, struct callmaster *m) {
int i;
for (i = 0; i < ARRAY_SIZE(c->relays_A); i++) {
if (c->relays_A[i].fd == -1)
break;
release_port(&c->relays_A[i], m);
}
for (i = 0; i < ARRAY_SIZE(c->relays_B); i++) {
if (c->relays_B[i].fd == -1)
break;
release_port(&c->relays_B[i], m);
}
}
/* called with call->lock held */
static int call_streams(struct call *c, GQueue *s, const str *tag, enum call_opmode opmode) {
GQueue *q;
@ -1163,8 +1215,10 @@ static int call_streams(struct call *c, GQueue *s, const str *tag, enum call_opm
struct callstream *cs, *cs_o;
struct peer *p, *p2;
int ret = 1;
struct relays_cache relays_cache;
q = g_queue_new(); /* new callstreams list */
relays_cache_init(&relays_cache);
for (i = s->head; i; i = i->next) {
t = i->data;
@ -1208,14 +1262,15 @@ found:
mutex_lock(&cs->lock);
if (!matched_relay) {
/* nothing found to re-use, open new ports */
callstream_init(cs, 0, 0);
/* nothing found to re-use, use new ports */
relays_cache_get_ports(&relays_cache, 1, c);
callstream_init(cs, &relays_cache);
p = &cs->peers[0];
setup_peer(p, t, tag);
}
else {
/* re-use, so don't open new ports */
callstream_init(cs, -1, -1);
/* re-use, so don't use new ports */
callstream_init(cs, NULL);
if (matched_relay->up->idx == 0) {
/* request/lookup came in the same order as before */
steal_peer(&cs->peers[0], &cs_o->peers[0]);
@ -1299,7 +1354,8 @@ got_cs:
cs_o = cs;
cs = callstream_new(c, t->stream.num);
mutex_lock(&cs->lock);
callstream_init(cs, 0, 0);
relays_cache_get_ports(&relays_cache, 1, c);
callstream_init(cs, &relays_cache);
steal_peer(&cs->peers[0], &cs_o->peers[0]);
p = &cs->peers[1];
setup_peer(p, t, tag);
@ -1332,6 +1388,7 @@ skip:
c->callstreams = q;
}
relays_cache_cleanup(&relays_cache, c->callmaster);
return ret;
}

@ -72,6 +72,12 @@ struct streamrelay {
struct stats kstats;
time_t last;
};
struct relays_cache {
struct streamrelay relays_A[16];
struct streamrelay relays_B[16];
struct streamrelay *array_ptrs[2];
int relays_open;
};
struct peer {
struct streamrelay rtps[2];
str tag;
@ -153,7 +159,7 @@ void calls_dump_redis(struct callmaster *);
struct call *call_get_or_create(const str *callid, const str *viabranch, struct callmaster *m);
struct callstream *callstream_new(struct call *ca, int num);
void callstream_init(struct callstream *s, int port1, int port2);
void callstream_init(struct callstream *s, struct relays_cache *);
void kernelize(struct callstream *c);
int call_stream_address(GString *o, struct peer *p, enum stream_address_format);

Loading…
Cancel
Save