diff --git a/daemon/cli.c b/daemon/cli.c index 3bdd7cae9..3f1845a96 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -1082,13 +1082,19 @@ static void cli_stream_readable(struct streambuf_stream *s) { log_info_clear(); } +static void cli_free(void *p) { + struct cli *c = p; + streambuf_listener_shutdown(&c->listeners[0]); + streambuf_listener_shutdown(&c->listeners[1]); +} + struct cli *cli_new(struct poller *p, endpoint_t *ep) { struct cli *c; if (!p) return NULL; - c = obj_alloc0("cli", sizeof(*c), NULL); + c = obj_alloc0("cli", sizeof(*c), cli_free); if (streambuf_listener_init(&c->listeners[0], p, ep, cli_incoming, cli_stream_readable, diff --git a/daemon/control_ng.c b/daemon/control_ng.c index edac69cb8..99fba2cce 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -359,6 +359,26 @@ out: } +void control_ng_free(void *p) { + struct control_ng *c = p; + // XXX this should go elsewhere + if (rtpe_cngs_hash) { + GList *ll = g_hash_table_get_values(rtpe_cngs_hash); + for (GList *l = ll; l; l = l->next) { + struct control_ng_stats *s = l->data; + g_slice_free1(sizeof(*s), s); + } + g_list_free(ll); + g_hash_table_destroy(rtpe_cngs_hash); + rtpe_cngs_hash = NULL; + } + poller_del_item(c->poller, c->udp_listeners[0].fd); + poller_del_item(c->poller, c->udp_listeners[1].fd); + close_socket(&c->udp_listeners[0]); + close_socket(&c->udp_listeners[1]); + cookie_cache_cleanup(&c->cookie_cache); +} + struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned char tos) { struct control_ng *c; @@ -366,11 +386,12 @@ struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned cha if (!p) return NULL; - c = obj_alloc0("control_ng", sizeof(*c), NULL); + c = obj_alloc0("control_ng", sizeof(*c), control_ng_free); cookie_cache_init(&c->cookie_cache); c->udp_listeners[0].fd = -1; c->udp_listeners[1].fd = -1; + c->poller = p; if (udp_listener_init(&c->udp_listeners[0], p, ep, control_ng_incoming, &c->obj)) goto fail2; diff --git a/daemon/control_tcp.c b/daemon/control_tcp.c index 35af4ce45..24a38e4f2 100644 --- a/daemon/control_tcp.c +++ b/daemon/control_tcp.c @@ -154,6 +154,14 @@ static void control_incoming(struct streambuf_stream *s) { } +static void control_tcp_free(void *p) { + struct control_tcp *c = p; + streambuf_listener_shutdown(&c->listeners[0]); + streambuf_listener_shutdown(&c->listeners[1]); + pcre_free(c->parse_re); + pcre_free_study(c->parse_ree); +} + struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep) { struct control_tcp *c; const char *errptr; @@ -162,7 +170,7 @@ struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep) { if (!p) return NULL; - c = obj_alloc0("control", sizeof(*c), NULL); + c = obj_alloc0("control", sizeof(*c), control_tcp_free); if (streambuf_listener_init(&c->listeners[0], p, ep, control_incoming, control_stream_readable, diff --git a/daemon/control_udp.c b/daemon/control_udp.c index ee34c69f8..620034c1a 100644 --- a/daemon/control_udp.c +++ b/daemon/control_udp.c @@ -134,6 +134,16 @@ out: log_info_clear(); } +void control_udp_free(void *p) { + struct control_udp *u = p; + pcre_free_study(u->parse_ree); + pcre_free(u->parse_re); + pcre_free(u->fallback_re); + close_socket(&u->udp_listeners[0]); + close_socket(&u->udp_listeners[1]); + cookie_cache_cleanup(&u->cookie_cache); +} + struct control_udp *control_udp_new(struct poller *p, endpoint_t *ep) { struct control_udp *c; const char *errptr; @@ -142,7 +152,7 @@ struct control_udp *control_udp_new(struct poller *p, endpoint_t *ep) { if (!p) return NULL; - c = obj_alloc0("control_udp", sizeof(*c), NULL); + c = obj_alloc0("control_udp", sizeof(*c), control_udp_free); c->parse_re = pcre_compile( /* cookie cmd flags callid viabranch:5 */ diff --git a/daemon/cookie_cache.c b/daemon/cookie_cache.c index bbcf7c552..1a7aa4d22 100644 --- a/daemon/cookie_cache.c +++ b/daemon/cookie_cache.c @@ -76,3 +76,10 @@ void cookie_cache_remove(struct cookie_cache *c, const str *s) { cond_broadcast(&c->cond); mutex_unlock(&c->lock); } + +void cookie_cache_cleanup(struct cookie_cache *c) { + g_hash_table_destroy(c->current.cookies); + g_hash_table_destroy(c->old.cookies); + g_string_chunk_free(c->current.chunks); + g_string_chunk_free(c->old.chunks); +} diff --git a/daemon/main.c b/daemon/main.c index ae41287e9..00f4d23f2 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -51,6 +51,10 @@ struct poller *rtpe_poller; struct rtpengine_config initial_rtpe_config; +static struct control_tcp *rtpe_tcp; +static struct control_udp *rtpe_udp; +static struct cli *rtpe_cli; + struct rtpengine_config rtpe_config = { // non-zero defaults .kernel_table = -1, @@ -745,9 +749,6 @@ static void init_everything(void) { static void create_everything(void) { - struct control_tcp *ct; - struct control_udp *cu; - struct cli *cl; struct timeval tmp_tv; struct timeval redis_start, redis_stop; double redis_diff = 0; @@ -786,18 +787,18 @@ no_kernel: } } - ct = NULL; + rtpe_tcp = NULL; if (rtpe_config.tcp_listen_ep.port) { - ct = control_tcp_new(rtpe_poller, &rtpe_config.tcp_listen_ep); - if (!ct) + rtpe_tcp = control_tcp_new(rtpe_poller, &rtpe_config.tcp_listen_ep); + if (!rtpe_tcp) die("Failed to open TCP control connection port"); } - cu = NULL; + rtpe_udp = NULL; if (rtpe_config.udp_listen_ep.port) { interfaces_exclude_port(rtpe_config.udp_listen_ep.port); - cu = control_udp_new(rtpe_poller, &rtpe_config.udp_listen_ep); - if (!cu) + rtpe_udp = control_udp_new(rtpe_poller, &rtpe_config.udp_listen_ep); + if (!rtpe_udp) die("Failed to open UDP control connection port"); } @@ -809,11 +810,11 @@ no_kernel: die("Failed to open UDP control connection port"); } - cl = NULL; + rtpe_cli = NULL; if (rtpe_config.cli_listen_ep.port) { interfaces_exclude_port(rtpe_config.cli_listen_ep.port); - cl = cli_new(rtpe_poller, &rtpe_config.cli_listen_ep); - if (!cl) + rtpe_cli = cli_new(rtpe_poller, &rtpe_config.cli_listen_ep); + if (!rtpe_cli) die("Failed to open UDP CLI connection port"); } @@ -947,5 +948,11 @@ int main(int argc, char **argv) { call_free(); interfaces_free(); + obj_release(rtpe_cli); + obj_release(rtpe_udp); + obj_release(rtpe_tcp); + obj_release(rtpe_control_ng); + poller_free(&rtpe_poller); + return 0; } diff --git a/daemon/poller.c b/daemon/poller.c index f3335dc34..cb1107e6c 100644 --- a/daemon/poller.c +++ b/daemon/poller.c @@ -66,6 +66,31 @@ struct poller *poller_new(void) { return p; } +static void __ti_put(void *p) { + struct timer_item *ti = p; + obj_put(ti); +} +void poller_free(struct poller **pp) { + struct poller *p = *pp; + for (unsigned int i = 0; i < p->items_size; i++) { + struct poller_item_int *ip = p->items[i]; + if (!ip) + continue; + p->items[i] = NULL; + obj_put(ip); + } + g_slist_free_full(p->timers, __ti_put); + g_slist_free_full(p->timers_add, __ti_put); + g_slist_free_full(p->timers_del, __ti_put); + if (p->fd != -1) + close(p->fd); + p->fd = -1; + if (p->items) + free(p->items); + free(p); + *pp = NULL; +} + static int epoll_events(struct poller_item *it, struct poller_item_int *ii) { if (!it) diff --git a/daemon/tcp_listener.c b/daemon/tcp_listener.c index 96e09599c..1356f940f 100644 --- a/daemon/tcp_listener.c +++ b/daemon/tcp_listener.c @@ -55,13 +55,18 @@ static void tcp_listener_closed(int fd, void *p, uintptr_t u) { abort(); } +static void __tlc_free(void *p) { + struct tcp_listener_callback *cb = p; + obj_put_o(cb->p); +} + int tcp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep, tcp_listener_callback_t func, struct obj *obj) { struct poller_item i; struct tcp_listener_callback *cb; - cb = obj_alloc("tcp_listener_callback", sizeof(*cb), NULL); + cb = obj_alloc("tcp_listener_callback", sizeof(*cb), __tlc_free); cb->func = func; cb->p = obj_get_o(obj); cb->ul = sock; @@ -79,11 +84,11 @@ int tcp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep, if (poller_add_item(p, &i)) goto fail; + obj_put(cb); return 0; fail: close_socket(sock); - obj_put_o(obj); obj_put(cb); return -1; } @@ -199,6 +204,11 @@ fail: obj_put(s); } +static void __sb_free(void *p) { + struct streambuf_callback *cb = p; + obj_put_o(cb->parent); +} + int streambuf_listener_init(struct streambuf_listener *listener, struct poller *p, const endpoint_t *ep, streambuf_callback_t newconn_func, streambuf_callback_t newdata_func, @@ -214,7 +224,7 @@ int streambuf_listener_init(struct streambuf_listener *listener, struct poller * mutex_init(&listener->lock); listener->streams = g_hash_table_new(g_direct_hash, g_direct_equal); - cb = obj_alloc("streambuf_callback", sizeof(*cb), NULL); + cb = obj_alloc("streambuf_callback", sizeof(*cb), __sb_free); cb->newconn_func = newconn_func; cb->newdata_func = newdata_func; cb->closed_func = closed_func; @@ -225,12 +235,20 @@ int streambuf_listener_init(struct streambuf_listener *listener, struct poller * if (tcp_listener_init(&listener->listener, p, ep, streambuf_listener_newconn, &cb->obj)) goto fail; + obj_put(cb); return 0; fail: obj_put(cb); return -1; } +void streambuf_listener_shutdown(struct streambuf_listener *listener) { + if (!listener) + return; + poller_del_item(listener->poller, listener->listener.fd); + close_socket(&listener->listener); + g_hash_table_destroy(listener->streams); +} void streambuf_stream_close(struct streambuf_stream *s) { streambuf_stream_closed(s->sock.fd, s, 0); diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c index 656df6e70..5529c46c0 100644 --- a/daemon/udp_listener.c +++ b/daemon/udp_listener.c @@ -55,13 +55,18 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) { } } +static void __ulc_free(void *p) { + struct udp_listener_callback *cb = p; + obj_put_o(cb->p); +} + int udp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep, udp_listener_callback_t func, struct obj *obj) { struct poller_item i; struct udp_listener_callback *cb; - cb = obj_alloc("udp_listener_callback", sizeof(*cb), NULL); + cb = obj_alloc("udp_listener_callback", sizeof(*cb), __ulc_free); cb->func = func; cb->p = obj_get_o(obj); cb->ul = sock; @@ -77,11 +82,11 @@ int udp_listener_init(socket_t *sock, struct poller *p, const endpoint_t *ep, if (poller_add_item(p, &i)) goto fail; + obj_put(cb); return 0; fail: close_socket(sock); - obj_put_o(obj); obj_put(cb); return -1; } diff --git a/include/control_ng.h b/include/control_ng.h index 21d9d2e0a..73336bc91 100644 --- a/include/control_ng.h +++ b/include/control_ng.h @@ -36,6 +36,7 @@ struct control_ng { struct obj obj; struct cookie_cache cookie_cache; socket_t udp_listeners[2]; + struct poller *poller; }; struct control_ng *control_ng_new(struct poller *, endpoint_t *, unsigned char); diff --git a/include/cookie_cache.h b/include/cookie_cache.h index 05e2c22ab..cc8d5d37d 100644 --- a/include/cookie_cache.h +++ b/include/cookie_cache.h @@ -22,5 +22,6 @@ void cookie_cache_init(struct cookie_cache *); str *cookie_cache_lookup(struct cookie_cache *, const str *); void cookie_cache_insert(struct cookie_cache *, const str *, const str *); void cookie_cache_remove(struct cookie_cache *, const str *); +void cookie_cache_cleanup(struct cookie_cache *); #endif diff --git a/include/obj.h b/include/obj.h index fc7cb010a..5404476f9 100644 --- a/include/obj.h +++ b/include/obj.h @@ -86,6 +86,9 @@ INLINE void __obj_put(struct obj *o); #endif +#define obj_release(op) do { if (op) obj_put_o((struct obj *) op); op = NULL; } while (0) + + #include "log.h" diff --git a/include/poller.h b/include/poller.h index 01f72a6ff..d708eb7e8 100644 --- a/include/poller.h +++ b/include/poller.h @@ -31,6 +31,7 @@ struct poller; struct poller *poller_new(void); +void poller_free(struct poller **); int poller_add_item(struct poller *, struct poller_item *); int poller_update_item(struct poller *, struct poller_item *); int poller_del_item(struct poller *, int); diff --git a/include/tcp_listener.h b/include/tcp_listener.h index 327e108a7..664592039 100644 --- a/include/tcp_listener.h +++ b/include/tcp_listener.h @@ -40,6 +40,7 @@ int streambuf_listener_init(struct streambuf_listener *listener, struct poller * streambuf_callback_t closed_func, streambuf_callback_t timer_func, struct obj *obj); +void streambuf_listener_shutdown(struct streambuf_listener *); void streambuf_stream_close(struct streambuf_stream *); void streambuf_stream_shutdown(struct streambuf_stream *);