lets try with a lot of large, nested, naive locks

remotes/origin/2.0
Richard Fuchs 13 years ago
parent 28ac7b996c
commit c19b99903c

@ -105,8 +105,10 @@ static void stream_closed(int fd, void *p, uintptr_t u) {
int i;
socklen_t j;
mutex_lock(&cs->lock);
r = &cs->peers[u >> 1].rtps[u & 1];
assert(r->fd == fd);
mutex_unlock(&cs->lock);
c = cs->call;
j = sizeof(i);
@ -119,6 +121,7 @@ static void stream_closed(int fd, void *p, uintptr_t u) {
/* called with callstream->lock held */
void kernelize(struct callstream *c) {
int i, j;
struct peer *p, *pp;
@ -203,10 +206,13 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_
m = c->callmaster;
smart_ntop_p(addr, &fsin->sin6_addr, sizeof(addr));
mutex_lock(&cs->lock);
if (p->fd == -1) {
mylog(LOG_WARNING, LOG_PREFIX_C "RTP packet to port %u discarded from %s:%u",
LOG_PARAMS_C(c), r->localport, addr, ntohs(fsin->sin6_port));
r->stats.errors++;
mutex_unlock(&cs->lock);
m->statsps.errors++;
return 0;
}
@ -311,6 +317,7 @@ forward:
if (ret == -1) {
r->stats.errors++;
mutex_unlock(&cs->lock);
m->statsps.errors++;
return -1;
}
@ -318,9 +325,10 @@ forward:
drop:
r->stats.packets++;
r->stats.bytes += l;
r->last = poller_now;
mutex_unlock(&cs->lock);
m->statsps.packets++;
m->statsps.bytes += l;
r->last = poller_now;
return 0;
}
@ -469,6 +477,8 @@ static void call_timer_iterator(void *key, void *val, void *ptr) {
unsigned int check;
struct streamrelay *sr;
mutex_lock(&c->lock);
if (!c->callstreams->head)
goto drop;
@ -476,6 +486,7 @@ static void call_timer_iterator(void *key, void *val, void *ptr) {
for (it = c->callstreams->head; it; it = it->next) {
cs = it->data;
mutex_lock(&cs->lock);
for (i = 0; i < 2; i++) {
p = &cs->peers[i];
@ -493,16 +504,20 @@ static void call_timer_iterator(void *key, void *val, void *ptr) {
goto good;
}
}
mutex_unlock(&cs->lock);
}
mylog(LOG_INFO, LOG_PREFIX_C "Closing call branch due to timeout",
LOG_PARAMS_C(c));
drop:
hlp->del = g_list_prepend(hlp->del, c);
mutex_unlock(&c->lock);
hlp->del = g_list_prepend(hlp->del, obj_get(c));
return;
good:
mutex_unlock(&cs->lock);
mutex_unlock(&c->lock);
;
}
@ -532,18 +547,22 @@ static void xmlrpc_kill_calls(GList *list, const char *url) {
while (list) {
ca = list->data;
mutex_lock(&ca->lock);
for (csl = ca->callstreams->head; csl; csl = csl->next) {
cs = csl->data;
mutex_lock(&cs->lock);
if (!cs->peers[1].tag || !*cs->peers[1].tag)
continue;
goto next;
alarm(2);
xmlrpc_client_call2f(&e, c, url, "di", &r, "(ssss)",
"sbc", "postControlCmd", cs->peers[1].tag, "teardown");
xmlrpc_DECREF(r);
alarm(0);
next:
mutex_unlock(&cs->lock);
}
mutex_unlock(&ca->lock);
list = list->next;
}
@ -607,6 +626,7 @@ next:
for (i = hlp.del; i; i = n) {
n = i->next;
c = i->data;
obj_put(c);
call_destroy(c);
g_list_free_1(i);
}
@ -964,6 +984,7 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2
s->call = obj_get(ca);
DBG("setting new callstream num to %i", num);
s->num = num;
mutex_init(&s->lock);
for (i = 0; i < 2; i++) {
p = &s->peers[i];
@ -1380,6 +1401,7 @@ static struct call *call_create(const char *callid, struct callmaster *m) {
return c;
}
/* returns call with lock held */
struct call *call_get_or_create(const char *callid, const char *viabranch, struct callmaster *m) {
struct call *c;
@ -1398,10 +1420,12 @@ restart:
goto restart;
}
g_hash_table_insert(m->callhash, c->callid, obj_get(c));
mutex_lock(&c->lock);
rwlock_unlock_w(&m->hashlock);
}
else {
obj_hold(c);
mutex_lock(&c->lock);
rwlock_unlock_r(&m->hashlock);
}
@ -1480,12 +1504,14 @@ char *call_update_udp(const char **out, struct callmaster *m) {
redis_update(c, m->conf.redis);
ret = streams_print(c->callstreams, 1, (num >= 0) ? 0 : 1, out[RE_UDP_COOKIE], 1);
mutex_unlock(&c->lock);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
log_info = NULL;
obj_put(c);
return ret;
fail:
mutex_unlock(&c->lock);
mylog(LOG_WARNING, "Failed to parse a media stream: %s/%s:%s", out[RE_UDP_UL_ADDR4], out[RE_UDP_UL_ADDR6], out[RE_UDP_UL_PORT]);
asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]);
log_info = NULL;
@ -1510,6 +1536,7 @@ char *call_lookup_udp(const char **out, struct callmaster *m) {
return ret;
}
obj_hold(c);
mutex_lock(&c->lock);
rwlock_unlock_r(&m->hashlock);
log_info = out[RE_UDP_UL_CALLID];
@ -1527,12 +1554,14 @@ char *call_lookup_udp(const char **out, struct callmaster *m) {
redis_update(c, m->conf.redis);
ret = streams_print(c->callstreams, 1, (num >= 0) ? 1 : 0, out[RE_UDP_COOKIE], 1);
mutex_unlock(&c->lock);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
log_info = NULL;
obj_put(c);
return ret;
fail:
mutex_unlock(&c->lock);
mylog(LOG_WARNING, "Failed to parse a media stream: %s/%s:%s", out[RE_UDP_UL_ADDR4], out[RE_UDP_UL_ADDR6], out[RE_UDP_UL_PORT]);
asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]);
log_info = NULL;
@ -1558,6 +1587,7 @@ char *call_request(const char **out, struct callmaster *m) {
redis_update(c, m->conf.redis);
ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 0 : 1, NULL, 0);
mutex_unlock(&c->lock);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
obj_put(c);
return ret;
@ -1577,6 +1607,7 @@ char *call_lookup(const char **out, struct callmaster *m) {
return NULL;
}
obj_hold(c);
mutex_lock(&c->lock);
rwlock_unlock_r(&m->hashlock);
strdupfree(&c->called_agent, out[RE_TCP_RL_AGENT] ? : "UNKNOWN");
@ -1589,6 +1620,7 @@ char *call_lookup(const char **out, struct callmaster *m) {
redis_update(c, m->conf.redis);
ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 1 : 0, NULL, 0);
mutex_unlock(&c->lock);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
obj_put(c);
return ret;
@ -1613,6 +1645,7 @@ char *call_delete_udp(const char **out, struct callmaster *m) {
goto err;
}
obj_hold(c);
mutex_lock(&c->lock);
rwlock_unlock_r(&m->hashlock);
log_info = out[RE_UDP_D_VIABRANCH];
@ -1652,21 +1685,25 @@ tag_match:
mylog(LOG_INFO, LOG_PREFIX_CI "Branch deleted", LOG_PARAMS_CI(c));
if (g_hash_table_size(c->branches))
goto success;
goto success_unlock;
else
DBG("no branches left, deleting full call");
}
mutex_unlock(&c->lock);
mylog(LOG_INFO, LOG_PREFIX_C "Deleting full call", c->callid);
call_destroy(c);
goto success;
success_unlock:
mutex_unlock(&c->lock);
success:
asprintf(&ret, "%s 0\n", out[RE_UDP_COOKIE]);
goto out;
err:
if (c)
mutex_unlock(&c->lock);
asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]);
goto out;
@ -1708,6 +1745,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) {
char addr1[64], addr2[64], addr3[64];
m = c->callmaster;
mutex_lock(&c->lock);
/* TODO: only called for tcp controller, so no linked list of calls? */
@ -1720,6 +1758,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) {
for (l = c->callstreams->head; l; l = l->next) {
cs = l->data;
mutex_lock(&cs->lock);
p = &cs->peers[0];
r1 = &p->rtps[0];
@ -1728,7 +1767,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) {
rx2 = &cs->peers[1].rtps[1];
if (r1->fd == -1 || r2->fd == -1)
continue;
goto next;
smart_ntop_p(addr1, &r1->peer.ip46, sizeof(addr1));
smart_ntop_p(addr2, &r2->peer.ip46, sizeof(addr2));
@ -1747,8 +1786,10 @@ static void call_status_iterator(void *key, void *val, void *ptr) {
"active",
p->codec ? : "unknown",
p->mediatype, (int) (poller_now - r1->last));
next:
mutex_unlock(&cs->lock);
}
mutex_unlock(&c->lock);
}
void calls_status(struct callmaster *m, struct control_stream *s) {

@ -71,6 +71,7 @@ struct peer {
};
struct callstream {
struct obj obj;
mutex_t lock;
struct peer peers[2];
struct call *call;
int num;

@ -305,6 +305,8 @@ int main(int argc, char **argv) {
m = callmaster_new(p);
if (!m)
return -1;
ZERO(mc);
mc.kernelfd = kfd;
mc.kernelid = table;
mc.ipv4 = ipv4;

Loading…
Cancel
Save