|
|
|
|
@ -26,6 +26,9 @@ struct streambuf_callback {
|
|
|
|
|
struct obj *parent;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
TYPED_GHASHTABLE_IMPL(tcp_streams_ht, g_direct_hash, g_direct_equal, NULL, NULL)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void tcp_listener_incoming(int fd, void *p, uintptr_t x) {
|
|
|
|
|
struct tcp_listener_callback *cb = p;
|
|
|
|
|
int ret;
|
|
|
|
|
@ -117,7 +120,7 @@ static void streambuf_stream_closed(int fd, void *p, uintptr_t u) {
|
|
|
|
|
|
|
|
|
|
struct streambuf_listener *l = s->listener;
|
|
|
|
|
mutex_lock(&l->lock);
|
|
|
|
|
int ret = g_hash_table_remove(l->streams, s);
|
|
|
|
|
bool ret = t_hash_table_remove(l->streams, s);
|
|
|
|
|
mutex_unlock(&l->lock);
|
|
|
|
|
poller_del_item(rtpe_poller, s->sock.fd);
|
|
|
|
|
if (ret)
|
|
|
|
|
@ -180,7 +183,7 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a
|
|
|
|
|
obj_hold(s);
|
|
|
|
|
|
|
|
|
|
mutex_lock(&listener->lock);
|
|
|
|
|
g_hash_table_insert(listener->streams, s, s); // hand over ref
|
|
|
|
|
t_hash_table_insert(listener->streams, s, s); // hand over ref
|
|
|
|
|
mutex_unlock(&listener->lock);
|
|
|
|
|
|
|
|
|
|
if (poller_add_item(rtpe_poller, &i))
|
|
|
|
|
@ -192,7 +195,7 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a
|
|
|
|
|
|
|
|
|
|
fail:
|
|
|
|
|
mutex_lock(&listener->lock);
|
|
|
|
|
int ret = g_hash_table_remove(listener->streams, s);
|
|
|
|
|
bool ret = t_hash_table_remove(listener->streams, s);
|
|
|
|
|
mutex_unlock(&listener->lock);
|
|
|
|
|
|
|
|
|
|
if (ret)
|
|
|
|
|
@ -217,7 +220,7 @@ int streambuf_listener_init(struct streambuf_listener *listener, const endpoint_
|
|
|
|
|
ZERO(*listener);
|
|
|
|
|
|
|
|
|
|
mutex_init(&listener->lock);
|
|
|
|
|
listener->streams = g_hash_table_new(g_direct_hash, g_direct_equal);
|
|
|
|
|
listener->streams = tcp_streams_ht_new();
|
|
|
|
|
|
|
|
|
|
cb = obj_alloc("streambuf_callback", sizeof(*cb), __sb_free);
|
|
|
|
|
cb->newconn_func = newconn_func;
|
|
|
|
|
@ -241,8 +244,7 @@ void streambuf_listener_shutdown(struct streambuf_listener *listener) {
|
|
|
|
|
return;
|
|
|
|
|
poller_del_item(rtpe_poller, listener->listener.fd);
|
|
|
|
|
close_socket(&listener->listener);
|
|
|
|
|
if (listener->streams)
|
|
|
|
|
g_hash_table_destroy(listener->streams);
|
|
|
|
|
t_hash_table_destroy_ptr(&listener->streams);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streambuf_stream_close(struct streambuf_stream *s) {
|
|
|
|
|
|