TT#156900 close sockets on branch deletion

Change-Id: I5aeabde1755d3144e3e74e5fb040e0c2b793e305
pull/1439/head
Richard Fuchs 4 years ago
parent d199199b84
commit 6f8ad8f936

@ -85,7 +85,7 @@ unsigned int call_socket_cpu_affinity = 0;
/* ********** */
static void __monologue_destroy(struct call_monologue *monologue, int recurse);
static void __monologue_destroy(struct call_monologue *monologue, bool recurse);
static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start,
struct timeval *interval_duration);
static void __call_free(void *p);
@ -874,19 +874,28 @@ static struct call_media *__get_media(struct call_monologue *ml, GList **it, con
}
static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigned int num_ports,
const struct endpoint *ep, const struct sdp_ng_flags *flags, bool always_resuse)
const struct endpoint *ep, const struct sdp_ng_flags *flags, bool always_reuse)
{
GList *l;
struct endpoint_map *em;
struct stream_fd *sfd;
GQueue intf_sockets = G_QUEUE_INIT;
struct intf_list *il, *em_il;
for (l = media->endpoint_maps.tail; l; l = l->prev) {
for (GList *l = media->endpoint_maps.tail; l; l = l->prev) {
em = l->data;
if (em->logical_intf != media->logical_intf)
continue;
if ((em->wildcard || always_resuse) && em->num_ports >= num_ports) {
// any of our sockets shut down?
for (GList *k = em->intf_sfds.head; k; k = k->next) {
struct intf_list *il = k->data;
for (GList *j = il->list.head; j; j = j->next) {
struct stream_fd *sfd = j->data;
if (sfd->socket.fd == -1)
goto make_new;
}
}
if ((em->wildcard || always_reuse) && em->num_ports >= num_ports) {
__C_DBG("found a wildcard endpoint map%s", ep ? " and filling it in" : "");
if (ep) {
em->endpoint = *ep;
@ -921,6 +930,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne
goto alloc;
}
make_new:
__C_DBG("allocating new %sendpoint map", ep ? "" : "wildcard ");
em = uid_slice_alloc0(em, &media->call->endpoint_maps);
if (ep)
@ -940,11 +950,12 @@ alloc:
__C_DBG("allocating stream_fds for %u ports", num_ports);
struct intf_list *il;
while ((il = g_queue_pop_head(&intf_sockets))) {
if (il->list.length != num_ports)
goto next_il;
em_il = g_slice_alloc0(sizeof(*em_il));
struct intf_list *em_il = g_slice_alloc0(sizeof(*em_il));
em_il->local_intf = il->local_intf;
g_queue_push_tail(&em->intf_sfds, em_il);
@ -1963,6 +1974,8 @@ static void __set_all_tos(struct call *c) {
for (l = c->stream_fds.head; l; l = l->next) {
sfd = l->data;
if (sfd->socket.fd == -1)
continue;
set_tos(&sfd->socket, c->tos);
}
}
@ -3211,8 +3224,6 @@ static void __call_cleanup(struct call *c) {
while (c->stream_fds.head) {
struct stream_fd *sfd = g_queue_pop_head(&c->stream_fds);
if (sfd->poller)
poller_del_item(sfd->poller, sfd->socket.fd);
stream_fd_release(sfd);
obj_put(sfd);
}
@ -3754,7 +3765,7 @@ void call_media_unkernelize(struct call_media *media) {
}
/* must be called with call->master_lock held in W */
static void __monologue_destroy(struct call_monologue *monologue, int recurse) {
static void __monologue_destroy(struct call_monologue *monologue, bool recurse) {
struct call *call;
struct call_monologue *dialogue;
@ -3789,7 +3800,20 @@ static void __monologue_destroy(struct call_monologue *monologue, int recurse) {
g_hash_table_remove(dialogue->other_tags, &monologue->tag);
g_hash_table_remove(dialogue->branches, &monologue->viabranch);
if (recurse && !g_hash_table_size(dialogue->other_tags) && !g_hash_table_size(dialogue->branches))
__monologue_destroy(dialogue, 0);
__monologue_destroy(dialogue, false);
}
// close sockets
for (GList *l = monologue->medias.head; l; l = l->next) {
struct call_media *m = l->data;
for (GList *k = m->streams.head; k; k = k->next) {
struct packet_stream *ps = k->data;
ps->selected_sfd = NULL;
struct stream_fd *sfd;
while ((sfd = g_queue_pop_head(&ps->sfds)))
stream_fd_release(sfd);
}
}
monologue->deleted = 0;
@ -3799,7 +3823,7 @@ static void __monologue_destroy(struct call_monologue *monologue, int recurse) {
int monologue_destroy(struct call_monologue *ml) {
struct call *c = ml->call;
__monologue_destroy(ml, 1);
__monologue_destroy(ml, true);
if (g_hash_table_size(c->tags) < 2 && g_hash_table_size(c->viabranches) == 0) {
ilog(LOG_INFO, "Call branch '" STR_FORMAT_M "' (%s" STR_FORMAT "%svia-branch '" STR_FORMAT_M "') "

@ -2659,16 +2659,19 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct lo
pi.readable = stream_fd_readable;
pi.closed = stream_fd_closed;
if (rtpe_config.poller_per_thread)
p = poller_map_get(rtpe_poller_map);
if (p) {
if (poller_add_item(p, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
sfd->poller = p;
}
if (sfd->socket.fd != -1) {
if (rtpe_config.poller_per_thread)
p = poller_map_get(rtpe_poller_map);
if (p) {
if (poller_add_item(p, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
else
sfd->poller = p;
}
RWLOCK_W(&local_media_socket_endpoints_lock);
g_hash_table_replace(local_media_socket_endpoints, &sfd->socket.local, obj_get(sfd));
RWLOCK_W(&local_media_socket_endpoints_lock);
g_hash_table_replace(local_media_socket_endpoints, &sfd->socket.local, obj_get(sfd));
}
return sfd;
}
@ -2685,15 +2688,22 @@ struct stream_fd *stream_fd_lookup(const endpoint_t *ep) {
void stream_fd_release(struct stream_fd *sfd) {
if (!sfd)
return;
RWLOCK_W(&local_media_socket_endpoints_lock);
struct stream_fd *ent = g_hash_table_lookup(local_media_socket_endpoints, &sfd->socket.local);
if (!ent)
return;
if (ent != sfd) // should not happen
if (sfd->socket.fd == -1)
return;
g_hash_table_remove(local_media_socket_endpoints, &sfd->socket.local); // releases reference
{
RWLOCK_W(&local_media_socket_endpoints_lock);
struct stream_fd *ent = g_hash_table_lookup(local_media_socket_endpoints, &sfd->socket.local);
if (ent == sfd)
g_hash_table_remove(local_media_socket_endpoints,
&sfd->socket.local); // releases reference
}
if (sfd->poller)
poller_del_item(sfd->poller, sfd->socket.fd);
sfd->poller = NULL;
release_port(&sfd->socket, sfd->local_intf->spec);
}

@ -1311,12 +1311,14 @@ static int redis_sfds(struct call *c, struct redis_list *sfds) {
unsigned int loc_uid;
struct stream_fd *sfd;
socket_t *sock;
int port;
int port, fd;
const char *err;
for (i = 0; i < sfds->len; i++) {
rh = &sfds->rh[i];
if (redis_hash_get_int(&fd, rh, "fd"))
fd = 0;
err = "'localport' key not present";
if (redis_hash_get_int(&port, rh, "localport"))
goto err;
@ -1343,14 +1345,20 @@ static int redis_sfds(struct call *c, struct redis_list *sfds) {
if (!loc)
goto err;
err = "failed to open ports";
if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid))
goto err;
err = "no port returned";
sock = g_queue_pop_head(&q);
if (!sock)
goto err;
set_tos(sock, c->tos);
if (fd != -1) {
err = "failed to open ports";
if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid))
goto err;
err = "no port returned";
sock = g_queue_pop_head(&q);
if (!sock)
goto err;
set_tos(sock, c->tos);
}
else {
sock = g_slice_alloc(sizeof(*sock));
dummy_socket(sock, &loc->spec->local_address.addr);
}
sfd = stream_fd_new(sock, c, loc);
if (redis_hash_get_sdes_params1(&sfd->crypto.params, rh, "") == -1)
@ -2237,6 +2245,7 @@ char* redis_encode_json(struct call *c) {
{
JSON_SET_SIMPLE_CSTR("pref_family",sfd->local_intf->logical->preferred_family->rfc_name);
JSON_SET_SIMPLE("localport","%u",sfd->socket.local.port);
JSON_SET_SIMPLE("fd", "%i", sfd->socket.fd);
JSON_SET_SIMPLE_STR("logical_intf",&sfd->local_intf->logical->name);
JSON_SET_SIMPLE("local_intf_uid","%u",sfd->local_intf->unique_id);
JSON_SET_SIMPLE("stream","%u",sfd->stream->unique_id);

@ -719,6 +719,14 @@ fail:
return -1;
}
void dummy_socket(socket_t *r, const sockaddr_t *sa) {
ZERO(*r);
r->fd = -1;
r->family = sa->family;
r->local.address = *sa;
r->remote.address.family = sa->family;
}
int connect_socket(socket_t *r, int type, const endpoint_t *ep) {
sockfamily_t *fam;

@ -217,6 +217,7 @@ int connect_socket(socket_t *r, int type, const endpoint_t *ep);
int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep); // 1 == in progress
int connect_socket_retry(socket_t *r); // retries connect() while in progress
int close_socket(socket_t *r);
void dummy_socket(socket_t *r, const sockaddr_t *);
sockfamily_t *get_socket_family_rfc(const str *s);
sockfamily_t *__get_socket_family_enum(enum socket_families);

Loading…
Cancel
Save