diff --git a/daemon/aux.h b/daemon/aux.h index 31938b71b..e61f52e9f 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -21,6 +21,9 @@ #define OFFSET_OF(t,e) ((unsigned int) (unsigned long) &(((t *) 0)->e)) #define ZERO(x) memset(&(x), 0, sizeof(x)) +#ifndef ARRAY_SIZE +#define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a))) +#endif #define IPF "%u.%u.%u.%u" #define IPP(x) ((unsigned char *) (&(x)))[0], ((unsigned char *) (&(x)))[1], ((unsigned char *) (&(x)))[2], ((unsigned char *) (&(x)))[3] diff --git a/daemon/call.c b/daemon/call.c index 19abd4211..1d7e1ce21 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -112,6 +112,7 @@ static char *rtp_codecs[] = { static void call_destroy(struct call *); static void unkernelize(struct peer *); +static void relays_cache_port_used(struct relays_cache *c); @@ -800,10 +801,9 @@ fail: -static int get_port4(struct streamrelay *r, u_int16_t p) { +static int get_port4(struct streamrelay *r, u_int16_t p, struct callmaster *m) { int fd; struct sockaddr_in sin; - struct callmaster *m = r->up->up->call->callmaster; fd = socket(AF_INET, SOCK_DGRAM, 0); if (fd < 0) @@ -830,10 +830,9 @@ fail: return -1; } -static int get_port6(struct streamrelay *r, u_int16_t p) { +static int get_port6(struct streamrelay *r, u_int16_t p, struct callmaster *m) { int fd; struct sockaddr_in6 sin; - struct callmaster *m = r->up->up->call->callmaster; int tos; fd = socket(AF_INET6, SOCK_DGRAM, 0); @@ -867,9 +866,8 @@ fail: return -1; } -static int get_port(struct streamrelay *r, u_int16_t p) { +static int get_port(struct streamrelay *r, u_int16_t p, struct callmaster *m) { int ret; - struct callmaster *m = r->up->up->call->callmaster; assert(r->fd == -1); @@ -882,9 +880,9 @@ static int get_port(struct streamrelay *r, u_int16_t p) { mutex_unlock(&m->portlock); if (IN6_IS_ADDR_UNSPECIFIED(&m->conf.ipv6)) - ret = get_port4(r, p); + ret = get_port4(r, p, m); else - ret = get_port6(r, p); + ret = get_port6(r, p, m); if (ret) { mutex_lock(&m->portlock); @@ -898,9 +896,7 @@ static int get_port(struct streamrelay *r, u_int16_t p) { return 0; } -static void release_port(struct streamrelay *r) { - struct callmaster *m = r->up->up->call->callmaster; - +static void release_port(struct streamrelay *r, struct callmaster *m) { if (r->fd == -1 || !r->localport) return; mutex_lock(&m->portlock); @@ -911,7 +907,7 @@ static void release_port(struct streamrelay *r) { r->localport = 0; } -static void get_consecutive_ports(struct streamrelay *array, int array_len, int wanted_start_port, struct call *c) { +static int get_consecutive_ports(struct streamrelay *array, int array_len, int wanted_start_port, struct call *c) { int i, j, cycle = 0; struct streamrelay *it; u_int16_t port; @@ -942,14 +938,14 @@ static void get_consecutive_ports(struct streamrelay *array, int array_len, int goto release_restart; } - if (get_port(it, port++)) + if (get_port(it, port++, m)) goto release_restart; } break; release_restart: for (j = 0; j < i; j++) - release_port(&array[j]); + release_port(&array[j], m); if (cycle >= 2 || wanted_start_port > 0) goto fail; @@ -962,10 +958,11 @@ release_restart: mylog(LOG_DEBUG, LOG_PREFIX_CI "Opened ports %u..%u for RTP", LOG_PARAMS_CI(c), array[0].localport, array[array_len - 1].localport); - return; + return 0; fail: mylog(LOG_ERR, LOG_PREFIX_CI "Failed to get RTP port pair", LOG_PARAMS_CI(c)); + return -1; } /* caller is responsible for appropriate locking */ @@ -1054,7 +1051,7 @@ static void steal_peer(struct peer *dest, struct peer *src) { mylog(LOG_DEBUG, LOG_PREFIX_CI "Closing port %u in favor of re-use", LOG_PARAMS_CI(c), sr->localport); poller_del_item(po, sr->fd); - release_port(sr); + release_port(sr, m); } sr->fd = srs->fd; @@ -1087,10 +1084,12 @@ static void steal_peer(struct peer *dest, struct peer *src) { } -void callstream_init(struct callstream *s, int port1, int port2) { - int i, j, tport; +/* XXX no need for a full streamrelay struct, split it */ +/* XXX return value? */ +void callstream_init(struct callstream *s, struct relays_cache *rc) { + int i, j; struct peer *p; - struct streamrelay *r; + struct streamrelay *r, *relay_AB; struct poller_item pi; struct call *c = s->call; struct poller *po = c->callmaster->poller; @@ -1099,6 +1098,7 @@ void callstream_init(struct callstream *s, int port1, int port2) { for (i = 0; i < 2; i++) { p = &s->peers[i]; + relay_AB = rc ? rc->array_ptrs[i] : NULL; p->idx = i; p->up = s; @@ -1111,15 +1111,11 @@ void callstream_init(struct callstream *s, int port1, int port2) { r->idx = j; r->up = p; r->last = poller_now; - } - - tport = (i == 0) ? port1 : port2; - - if (tport >= 0) { - get_consecutive_ports(p->rtps, 2, tport, c); - for (j = 0; j < 2; j++) { - r = &p->rtps[j]; + if (relay_AB && relay_AB[j].fd != -1) { + r->fd = relay_AB[j].fd; + r->fd_family = relay_AB[j].fd_family; + r->localport = relay_AB[j].localport; pi.fd = r->fd; pi.obj = &s->obj; @@ -1128,15 +1124,21 @@ void callstream_init(struct callstream *s, int port1, int port2) { pi.closed = stream_closed; poller_add_item(po, &pi); + + relay_AB[j].fd = -1; } } } + + if (rc) + relays_cache_port_used(rc); } static void callstream_free(void *ptr) { struct callstream *s = ptr; + struct callmaster *m = s->call->callmaster; int i, j; struct peer *p; struct streamrelay *r; @@ -1146,13 +1148,63 @@ static void callstream_free(void *ptr) { for (j = 0; j < 2; j++) { r = &p->rtps[j]; - release_port(r); + release_port(r, m); } } mutex_destroy(&s->lock); obj_put(s->call); } +static void relays_cache_init(struct relays_cache *c) { + memset(c, -1, sizeof(*c)); + c->relays_open = 0; + c->array_ptrs[0] = c->relays_A; + c->array_ptrs[1] = c->relays_B; +} + +static int relays_cache_get_ports(struct relays_cache *c, int num, struct call *call) { + num *= 2; + if (c->relays_open >= num) + return 0; + + if (c->relays_open + num > ARRAY_SIZE(c->relays_A)) + return -1; + if (get_consecutive_ports(&c->relays_A[c->relays_open], num, 0, call)) + return -1; + if (get_consecutive_ports(&c->relays_B[c->relays_open], num, 0, call)) + return -1; + c->relays_open += num; + return 0; +} + +static void relays_cache_port_used(struct relays_cache *c) { + if (c->relays_open < 2) + return; + + c->relays_open -= 2; + if (c->relays_open) { + memmove(&c->relays_A[0], &c->relays_A[2], c->relays_open * sizeof(*c->relays_A)); + memmove(&c->relays_B[0], &c->relays_B[2], c->relays_open * sizeof(*c->relays_B)); + } + c->relays_A[c->relays_open].fd = -1; + c->relays_B[c->relays_open].fd = -1; +} + +static void relays_cache_cleanup(struct relays_cache *c, struct callmaster *m) { + int i; + + for (i = 0; i < ARRAY_SIZE(c->relays_A); i++) { + if (c->relays_A[i].fd == -1) + break; + release_port(&c->relays_A[i], m); + } + for (i = 0; i < ARRAY_SIZE(c->relays_B); i++) { + if (c->relays_B[i].fd == -1) + break; + release_port(&c->relays_B[i], m); + } +} + /* called with call->lock held */ static int call_streams(struct call *c, GQueue *s, const str *tag, enum call_opmode opmode) { GQueue *q; @@ -1163,8 +1215,10 @@ static int call_streams(struct call *c, GQueue *s, const str *tag, enum call_opm struct callstream *cs, *cs_o; struct peer *p, *p2; int ret = 1; + struct relays_cache relays_cache; q = g_queue_new(); /* new callstreams list */ + relays_cache_init(&relays_cache); for (i = s->head; i; i = i->next) { t = i->data; @@ -1208,14 +1262,15 @@ found: mutex_lock(&cs->lock); if (!matched_relay) { - /* nothing found to re-use, open new ports */ - callstream_init(cs, 0, 0); + /* nothing found to re-use, use new ports */ + relays_cache_get_ports(&relays_cache, 1, c); + callstream_init(cs, &relays_cache); p = &cs->peers[0]; setup_peer(p, t, tag); } else { - /* re-use, so don't open new ports */ - callstream_init(cs, -1, -1); + /* re-use, so don't use new ports */ + callstream_init(cs, NULL); if (matched_relay->up->idx == 0) { /* request/lookup came in the same order as before */ steal_peer(&cs->peers[0], &cs_o->peers[0]); @@ -1299,7 +1354,8 @@ got_cs: cs_o = cs; cs = callstream_new(c, t->stream.num); mutex_lock(&cs->lock); - callstream_init(cs, 0, 0); + relays_cache_get_ports(&relays_cache, 1, c); + callstream_init(cs, &relays_cache); steal_peer(&cs->peers[0], &cs_o->peers[0]); p = &cs->peers[1]; setup_peer(p, t, tag); @@ -1332,6 +1388,7 @@ skip: c->callstreams = q; } + relays_cache_cleanup(&relays_cache, c->callmaster); return ret; } diff --git a/daemon/call.h b/daemon/call.h index 21fad231a..8cbec4299 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -72,6 +72,12 @@ struct streamrelay { struct stats kstats; time_t last; }; +struct relays_cache { + struct streamrelay relays_A[16]; + struct streamrelay relays_B[16]; + struct streamrelay *array_ptrs[2]; + int relays_open; +}; struct peer { struct streamrelay rtps[2]; str tag; @@ -153,7 +159,7 @@ void calls_dump_redis(struct callmaster *); struct call *call_get_or_create(const str *callid, const str *viabranch, struct callmaster *m); struct callstream *callstream_new(struct call *ca, int num); -void callstream_init(struct callstream *s, int port1, int port2); +void callstream_init(struct callstream *s, struct relays_cache *); void kernelize(struct callstream *c); int call_stream_address(GString *o, struct peer *p, enum stream_address_format);