add proper locking to callstream setup procedure

remotes/origin/2.0
Richard Fuchs 13 years ago
parent 5f40e7d173
commit c11d0e2882

@ -925,8 +925,7 @@ fail:
release_port(b); release_port(b);
} }
/* caller is responsible for appropriate locking */
static int setup_peer(struct peer *p, struct stream *s, const char *tag) { static int setup_peer(struct peer *p, struct stream *s, const char *tag) {
struct streamrelay *a, *b; struct streamrelay *a, *b;
struct callstream *cs; struct callstream *cs;
@ -971,6 +970,7 @@ static int setup_peer(struct peer *p, struct stream *s, const char *tag) {
return 0; return 0;
} }
/* caller is responsible for appropriate locking */
static void steal_peer(struct peer *dest, struct peer *src) { static void steal_peer(struct peer *dest, struct peer *src) {
struct streamrelay *r; struct streamrelay *r;
int i; int i;
@ -1118,6 +1118,7 @@ static void callstream_free(void *ptr) {
obj_put(s->call); obj_put(s->call);
} }
/* called with call->lock held */
static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) { static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) {
GQueue *q; GQueue *q;
GList *i, *l; GList *i, *l;
@ -1141,6 +1142,7 @@ static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode)
/* look for an existing call stream with identical parameters */ /* look for an existing call stream with identical parameters */
for (l = c->callstreams->head; l; l = l->next) { for (l = c->callstreams->head; l; l = l->next) {
cs_o = l->data; cs_o = l->data;
mutex_lock(&cs_o->lock);
for (x = 0; x < 2; x++) { for (x = 0; x < 2; x++) {
r = &cs_o->peers[x].rtps[0]; r = &cs_o->peers[x].rtps[0];
DBG("comparing new ["IP6F"]:%u/%s to old ["IP6F"]:%u/%s", DBG("comparing new ["IP6F"]:%u/%s to old ["IP6F"]:%u/%s",
@ -1156,6 +1158,7 @@ static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode)
DBG("found existing call stream to steal"); DBG("found existing call stream to steal");
goto found; goto found;
} }
mutex_unlock(&cs_o->lock);
} }
/* not found */ /* not found */
@ -1164,7 +1167,7 @@ static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode)
l = NULL; l = NULL;
found: found:
/* cs_o remains locked if set */
if (!opmode) { /* request */ if (!opmode) { /* request */
DBG("creating new callstream"); DBG("creating new callstream");
@ -1189,9 +1192,10 @@ found:
steal_peer(&cs->peers[0], &cs_o->peers[1]); steal_peer(&cs->peers[0], &cs_o->peers[1]);
steal_peer(&cs->peers[1], &cs_o->peers[0]); steal_peer(&cs->peers[1], &cs_o->peers[0]);
} }
mutex_unlock(&cs_o->lock);
} }
g_queue_push_tail(q, cs); /* hand over the ref */ g_queue_push_tail(q, cs); /* hand over the ref of new cs */
ZERO(c->lookup_done); ZERO(c->lookup_done);
continue; continue;
} }
@ -1199,18 +1203,25 @@ found:
/* lookup */ /* lookup */
for (l = c->callstreams->head; l; l = l->next) { for (l = c->callstreams->head; l; l = l->next) {
cs = l->data; cs = l->data;
if (cs != cs_o)
mutex_lock(&cs->lock);
DBG("hunting for callstream, %i <> %i", cs->num, t->num); DBG("hunting for callstream, %i <> %i", cs->num, t->num);
if (cs->num != t->num) if (cs->num == t->num)
continue; goto got_cs;
goto got_cs; if (cs != cs_o)
mutex_unlock(&cs->lock);
} }
mylog(LOG_WARNING, LOG_PREFIX_CI "Got LOOKUP, but no usable callstreams found", mylog(LOG_WARNING, LOG_PREFIX_CI "Got LOOKUP, but no usable callstreams found",
LOG_PARAMS_CI(c)); LOG_PARAMS_CI(c));
if (cs_o)
mutex_unlock(&cs_o->lock);
break; break;
got_cs: got_cs:
g_queue_delete_link(c->callstreams, l); /* cs and cs_o remain locked, and maybe cs == cs_o */
/* r == peer[x].rtp[0] of cs_o */
g_queue_delete_link(c->callstreams, l); /* steal cs ref */
p = &cs->peers[1]; p = &cs->peers[1];
p2 = &cs->peers[0]; p2 = &cs->peers[0];
@ -1248,19 +1259,25 @@ got_cs:
/* nothing found to steal and this end is used */ /* nothing found to steal and this end is used */
/* need a new call stream after all */ /* need a new call stream after all */
DBG("case 4"); DBG("case 4");
if (cs_o)
mutex_unlock(&cs_o->lock);
cs_o = cs; cs_o = cs;
cs = obj_alloc0("callstream", sizeof(*cs), callstream_free); cs = obj_alloc0("callstream", sizeof(*cs), callstream_free);
callstream_init(cs, c, 0, 0, t->num); callstream_init(cs, c, 0, 0, t->num);
mutex_lock(&cs->lock);
steal_peer(&cs->peers[0], &cs_o->peers[0]); steal_peer(&cs->peers[0], &cs_o->peers[0]);
p = &cs->peers[1]; p = &cs->peers[1];
setup_peer(p, t, tag); setup_peer(p, t, tag);
g_queue_push_tail(c->callstreams, cs_o); /* hand over ref XXX? */ g_queue_push_tail(c->callstreams, cs_o); /* hand over ref to original cs */
} }
time(&c->lookup_done); time(&c->lookup_done);
skip: skip:
g_queue_push_tail(q, p->up); g_queue_push_tail(q, p->up); /* hand over ref to cs */
mutex_unlock(&cs->lock);
if (cs_o && cs_o != cs)
mutex_unlock(&cs_o->lock);
} }
ret = ret * q->length; ret = ret * q->length;
@ -1304,7 +1321,7 @@ static void unkernelize(struct peer *p) {
} }
/* called with callstream->lock held */
static void kill_callstream(struct callstream *s) { static void kill_callstream(struct callstream *s) {
int i, j; int i, j;
struct peer *p; struct peer *p;
@ -1342,6 +1359,7 @@ static void call_destroy(struct call *c) {
redis_delete(c, m->conf.redis); redis_delete(c, m->conf.redis);
mutex_lock(&c->lock); mutex_lock(&c->lock);
/* at this point, no more callstreams can be added */
mylog(LOG_INFO, LOG_PREFIX_C "Final packet stats:", c->callid); mylog(LOG_INFO, LOG_PREFIX_C "Final packet stats:", c->callid);
while (c->callstreams->head) { while (c->callstreams->head) {
s = g_queue_pop_head(c->callstreams); s = g_queue_pop_head(c->callstreams);

Loading…
Cancel
Save