MT#55283 virtualise poller methods

This allows us to use other poller implementations

Change-Id: Iac7c194c86ecaac550aae4baa047cab205c507b9
pull/1826/head
Richard Fuchs 1 year ago
parent 6aca3e88ad
commit e553660e23

@ -647,7 +647,7 @@ void control_ng_free(void *p) {
g_hash_table_destroy(rtpe_cngs_hash);
rtpe_cngs_hash = NULL;
}
poller_del_item(rtpe_control_poller, c->udp_listener.fd);
rtpe_poller_del_item(rtpe_control_poller, c->udp_listener.fd);
reset_socket(&c->udp_listener);
streambuf_listener_shutdown(&c->tcp_listener);
if (tcp_connections_hash)

@ -67,6 +67,12 @@ static unsigned int num_poller_threads;
unsigned int num_media_pollers;
unsigned int rtpe_poller_rr_iter;
bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item;
bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item;
bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback;
void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked;
void (*rtpe_poller_error)(struct poller *, void *) = poller_error;
struct rtpengine_config initial_rtpe_config;
static GQueue rtpe_tcp = G_QUEUE_INIT;

@ -949,7 +949,7 @@ static void release_port_poller(socket_t *r, struct intf_spec *spec, struct poll
release_port_push(lpr);
else {
__C_DBG("Adding late-release callback for port '%u'", lpr->socket.local.port);
poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr);
rtpe_poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr);
}
}
static void release_port(socket_t *r, struct intf_spec *spec) {
@ -3283,7 +3283,7 @@ stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) {
if (sfd->socket.fd != -1) {
struct poller *p = call->poller;
if (!poller_add_item(p, &pi))
if (!rtpe_poller_add_item(p, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
else
sfd->poller = p;

@ -87,7 +87,7 @@ static int tcp_listener_init(socket_t *sock, const endpoint_t *ep,
i.closed = tcp_listener_closed;
i.readable = tcp_listener_incoming;
i.obj = &cb->obj;
if (!poller_add_item(rtpe_control_poller, &i))
if (!rtpe_poller_add_item(rtpe_control_poller, &i))
goto fail;
obj_put(cb);
@ -121,7 +121,7 @@ static void streambuf_stream_closed(int fd, void *p) {
mutex_lock(&l->lock);
bool ret = t_hash_table_remove(l->streams, s);
mutex_unlock(&l->lock);
poller_del_item(rtpe_control_poller, s->sock.fd);
rtpe_poller_del_item(rtpe_control_poller, s->sock.fd);
reset_socket(&s->sock);
if (ret)
obj_put(s);
@ -186,7 +186,7 @@ static void streambuf_listener_newconn(struct obj *p, socket_t *newsock, char *a
t_hash_table_insert(listener->streams, s, s); // hand over ref
mutex_unlock(&listener->lock);
if (!poller_add_item(rtpe_control_poller, &i))
if (!rtpe_poller_add_item(rtpe_control_poller, &i))
goto fail;
obj_put(s);
@ -242,7 +242,7 @@ fail:
void streambuf_listener_shutdown(struct streambuf_listener *listener) {
if (!listener)
return;
poller_del_item(rtpe_control_poller, listener->listener.fd);
rtpe_poller_del_item(rtpe_control_poller, listener->listener.fd);
reset_socket(&listener->listener);
t_hash_table_destroy_ptr(&listener->streams);
}

@ -95,7 +95,7 @@ int udp_listener_init(socket_t *sock, const endpoint_t *ep,
i.closed = udp_listener_closed;
i.readable = udp_listener_incoming;
i.obj = &cb->obj;
if (!poller_add_item(rtpe_control_poller, &i))
if (!rtpe_poller_add_item(rtpe_control_poller, &i))
goto fail;
obj_put(cb);

@ -213,6 +213,14 @@ extern struct poller *rtpe_control_poller; // poller for control sockets (maybe
extern unsigned int num_media_pollers; // for media sockets, >= 1
extern unsigned int rtpe_poller_rr_iter; // round-robin assignment of pollers to each thread
struct poller_item;
extern bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *);
extern bool (*rtpe_poller_del_item)(struct poller *, int);
extern bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *);
extern void (*rtpe_poller_blocked)(struct poller *, void *);
extern void (*rtpe_poller_error)(struct poller *, void *);
INLINE struct poller *rtpe_get_poller(void) {
// XXX optimise this for num_media_pollers == 1 ?
return rtpe_pollers[g_atomic_int_add(&rtpe_poller_rr_iter, 1) % num_media_pollers];

@ -19,6 +19,11 @@ struct poller **rtpe_pollers = (struct poller *[]) {NULL};
struct poller *rtpe_control_poller;
unsigned int num_media_pollers = 1;
unsigned int rtpe_poller_rr_iter;
bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item;
bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item;
bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback;
void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked;
void (*rtpe_poller_error)(struct poller *, void *) = poller_error;
GString *dtmf_logs;
GQueue rtpe_control_ng = G_QUEUE_INIT;

@ -15,6 +15,11 @@ struct poller **rtpe_pollers;
struct poller *rtpe_control_poller;
unsigned int num_media_pollers;
unsigned int rtpe_poller_rr_iter;
bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item;
bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item;
bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback;
void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked;
void (*rtpe_poller_error)(struct poller *, void *) = poller_error;
GString *dtmf_logs;
GQueue rtpe_control_ng = G_QUEUE_INIT;

Loading…
Cancel
Save