diff --git a/daemon/call.c b/daemon/call.c index 598f9f8..f701147 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -105,8 +105,10 @@ static void stream_closed(int fd, void *p, uintptr_t u) { int i; socklen_t j; + mutex_lock(&cs->lock); r = &cs->peers[u >> 1].rtps[u & 1]; assert(r->fd == fd); + mutex_unlock(&cs->lock); c = cs->call; j = sizeof(i); @@ -119,6 +121,7 @@ static void stream_closed(int fd, void *p, uintptr_t u) { +/* called with callstream->lock held */ void kernelize(struct callstream *c) { int i, j; struct peer *p, *pp; @@ -203,10 +206,13 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_ m = c->callmaster; smart_ntop_p(addr, &fsin->sin6_addr, sizeof(addr)); + mutex_lock(&cs->lock); + if (p->fd == -1) { mylog(LOG_WARNING, LOG_PREFIX_C "RTP packet to port %u discarded from %s:%u", LOG_PARAMS_C(c), r->localport, addr, ntohs(fsin->sin6_port)); r->stats.errors++; + mutex_unlock(&cs->lock); m->statsps.errors++; return 0; } @@ -311,6 +317,7 @@ forward: if (ret == -1) { r->stats.errors++; + mutex_unlock(&cs->lock); m->statsps.errors++; return -1; } @@ -318,9 +325,10 @@ forward: drop: r->stats.packets++; r->stats.bytes += l; + r->last = poller_now; + mutex_unlock(&cs->lock); m->statsps.packets++; m->statsps.bytes += l; - r->last = poller_now; return 0; } @@ -469,6 +477,8 @@ static void call_timer_iterator(void *key, void *val, void *ptr) { unsigned int check; struct streamrelay *sr; + mutex_lock(&c->lock); + if (!c->callstreams->head) goto drop; @@ -476,6 +486,7 @@ static void call_timer_iterator(void *key, void *val, void *ptr) { for (it = c->callstreams->head; it; it = it->next) { cs = it->data; + mutex_lock(&cs->lock); for (i = 0; i < 2; i++) { p = &cs->peers[i]; @@ -493,16 +504,20 @@ static void call_timer_iterator(void *key, void *val, void *ptr) { goto good; } } + mutex_unlock(&cs->lock); } mylog(LOG_INFO, LOG_PREFIX_C "Closing call branch due to timeout", LOG_PARAMS_C(c)); drop: - hlp->del = g_list_prepend(hlp->del, c); + mutex_unlock(&c->lock); + hlp->del = g_list_prepend(hlp->del, obj_get(c)); return; good: + mutex_unlock(&cs->lock); + mutex_unlock(&c->lock); ; } @@ -532,18 +547,22 @@ static void xmlrpc_kill_calls(GList *list, const char *url) { while (list) { ca = list->data; + mutex_lock(&ca->lock); for (csl = ca->callstreams->head; csl; csl = csl->next) { cs = csl->data; + mutex_lock(&cs->lock); if (!cs->peers[1].tag || !*cs->peers[1].tag) - continue; + goto next; alarm(2); xmlrpc_client_call2f(&e, c, url, "di", &r, "(ssss)", "sbc", "postControlCmd", cs->peers[1].tag, "teardown"); xmlrpc_DECREF(r); alarm(0); +next: + mutex_unlock(&cs->lock); } - + mutex_unlock(&ca->lock); list = list->next; } @@ -607,6 +626,7 @@ next: for (i = hlp.del; i; i = n) { n = i->next; c = i->data; + obj_put(c); call_destroy(c); g_list_free_1(i); } @@ -964,6 +984,7 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2 s->call = obj_get(ca); DBG("setting new callstream num to %i", num); s->num = num; + mutex_init(&s->lock); for (i = 0; i < 2; i++) { p = &s->peers[i]; @@ -1380,6 +1401,7 @@ static struct call *call_create(const char *callid, struct callmaster *m) { return c; } +/* returns call with lock held */ struct call *call_get_or_create(const char *callid, const char *viabranch, struct callmaster *m) { struct call *c; @@ -1398,10 +1420,12 @@ restart: goto restart; } g_hash_table_insert(m->callhash, c->callid, obj_get(c)); + mutex_lock(&c->lock); rwlock_unlock_w(&m->hashlock); } else { obj_hold(c); + mutex_lock(&c->lock); rwlock_unlock_r(&m->hashlock); } @@ -1480,12 +1504,14 @@ char *call_update_udp(const char **out, struct callmaster *m) { redis_update(c, m->conf.redis); ret = streams_print(c->callstreams, 1, (num >= 0) ? 0 : 1, out[RE_UDP_COOKIE], 1); + mutex_unlock(&c->lock); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); log_info = NULL; obj_put(c); return ret; fail: + mutex_unlock(&c->lock); mylog(LOG_WARNING, "Failed to parse a media stream: %s/%s:%s", out[RE_UDP_UL_ADDR4], out[RE_UDP_UL_ADDR6], out[RE_UDP_UL_PORT]); asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]); log_info = NULL; @@ -1510,6 +1536,7 @@ char *call_lookup_udp(const char **out, struct callmaster *m) { return ret; } obj_hold(c); + mutex_lock(&c->lock); rwlock_unlock_r(&m->hashlock); log_info = out[RE_UDP_UL_CALLID]; @@ -1527,12 +1554,14 @@ char *call_lookup_udp(const char **out, struct callmaster *m) { redis_update(c, m->conf.redis); ret = streams_print(c->callstreams, 1, (num >= 0) ? 1 : 0, out[RE_UDP_COOKIE], 1); + mutex_unlock(&c->lock); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); log_info = NULL; obj_put(c); return ret; fail: + mutex_unlock(&c->lock); mylog(LOG_WARNING, "Failed to parse a media stream: %s/%s:%s", out[RE_UDP_UL_ADDR4], out[RE_UDP_UL_ADDR6], out[RE_UDP_UL_PORT]); asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]); log_info = NULL; @@ -1558,6 +1587,7 @@ char *call_request(const char **out, struct callmaster *m) { redis_update(c, m->conf.redis); ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 0 : 1, NULL, 0); + mutex_unlock(&c->lock); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); obj_put(c); return ret; @@ -1577,6 +1607,7 @@ char *call_lookup(const char **out, struct callmaster *m) { return NULL; } obj_hold(c); + mutex_lock(&c->lock); rwlock_unlock_r(&m->hashlock); strdupfree(&c->called_agent, out[RE_TCP_RL_AGENT] ? : "UNKNOWN"); @@ -1589,6 +1620,7 @@ char *call_lookup(const char **out, struct callmaster *m) { redis_update(c, m->conf.redis); ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 1 : 0, NULL, 0); + mutex_unlock(&c->lock); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); obj_put(c); return ret; @@ -1613,6 +1645,7 @@ char *call_delete_udp(const char **out, struct callmaster *m) { goto err; } obj_hold(c); + mutex_lock(&c->lock); rwlock_unlock_r(&m->hashlock); log_info = out[RE_UDP_D_VIABRANCH]; @@ -1652,21 +1685,25 @@ tag_match: mylog(LOG_INFO, LOG_PREFIX_CI "Branch deleted", LOG_PARAMS_CI(c)); if (g_hash_table_size(c->branches)) - goto success; + goto success_unlock; else DBG("no branches left, deleting full call"); } + mutex_unlock(&c->lock); mylog(LOG_INFO, LOG_PREFIX_C "Deleting full call", c->callid); call_destroy(c); + goto success; - +success_unlock: + mutex_unlock(&c->lock); success: asprintf(&ret, "%s 0\n", out[RE_UDP_COOKIE]); goto out; err: - + if (c) + mutex_unlock(&c->lock); asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]); goto out; @@ -1708,6 +1745,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) { char addr1[64], addr2[64], addr3[64]; m = c->callmaster; + mutex_lock(&c->lock); /* TODO: only called for tcp controller, so no linked list of calls? */ @@ -1720,6 +1758,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) { for (l = c->callstreams->head; l; l = l->next) { cs = l->data; + mutex_lock(&cs->lock); p = &cs->peers[0]; r1 = &p->rtps[0]; @@ -1728,7 +1767,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) { rx2 = &cs->peers[1].rtps[1]; if (r1->fd == -1 || r2->fd == -1) - continue; + goto next; smart_ntop_p(addr1, &r1->peer.ip46, sizeof(addr1)); smart_ntop_p(addr2, &r2->peer.ip46, sizeof(addr2)); @@ -1747,8 +1786,10 @@ static void call_status_iterator(void *key, void *val, void *ptr) { "active", p->codec ? : "unknown", p->mediatype, (int) (poller_now - r1->last)); +next: + mutex_unlock(&cs->lock); } - + mutex_unlock(&c->lock); } void calls_status(struct callmaster *m, struct control_stream *s) { diff --git a/daemon/call.h b/daemon/call.h index 4eea459..25ad1aa 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -71,6 +71,7 @@ struct peer { }; struct callstream { struct obj obj; + mutex_t lock; struct peer peers[2]; struct call *call; int num; diff --git a/daemon/main.c b/daemon/main.c index 7532caf..288dd16 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -305,6 +305,8 @@ int main(int argc, char **argv) { m = callmaster_new(p); if (!m) return -1; + + ZERO(mc); mc.kernelfd = kfd; mc.kernelid = table; mc.ipv4 = ipv4;