TT#14008 Squashed commit of the following:

commit b3d2d70d4cfb449fb80af188eee89b6dc343830d
Merge: 1e62cbbb eacd2634
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Thu Mar 4 11:36:26 2021 -0500

    Merge branch 'fix_packet_order' of https://github.com/enreached/rtpengine into enreached-fix_packet_order

    Change-Id: I30690cf4955c04238c359e4aca2d0fea36ebfd36

commit eacd2634e8
Author: Damir Nedžibović <damir.nedzibovic@enreach.com>
Date:   Thu Mar 4 16:56:07 2021 +0100

    Update docs with --poller-per-thread option.

commit 7d02539673
Author: Damir Nedžibović <damir.nedzibovic@enreach.com>
Date:   Thu Mar 4 16:47:55 2021 +0100

    Prevent mem leak.

commit 3510a5d021
Author: Damir Nedžibović <damir.nedzibovic@enreach.com>
Date:   Thu Mar 4 13:25:39 2021 +0100

    Make poller per thread functionality configurable.

commit a2142b2a5e
Author: Damir Nedžibović <damir.nedzibovic@enreach.com>
Date:   Thu Mar 4 11:57:18 2021 +0100

    Use pthread_self() instead of syscall(SYS_gettid).

commit 2e471cae40
Author: Damir Nedžibović <damir.nedzibovic@enreach.com>
Date:   Thu Mar 4 11:46:44 2021 +0100

    Refactor the code a bit.

commit 91f72455c9
Author: Damir Nedžibović <damir.nedzibovic@enreach.com>
Date:   Thu Mar 4 11:21:42 2021 +0100

    Use usleep() instead.

commit 08d3bdb4ed
Author: Damir Nedžibović <damir.nedzibovic@enreach.com>
Date:   Thu Mar 4 11:03:39 2021 +0100

    Use ssl_random() instead.

commit 2d9995a864
Author: Damir Nedžibović <damir.nedzibovic@enreach.com>
Date:   Tue Mar 2 15:50:13 2021 +0100

    Sleep for 20ms instead.

commit eded168a38
Author: Damir Nedžibović <damir.nedzibovic@enreach.com>
Date:   Tue Mar 2 15:11:43 2021 +0100

    Implement poller per thread in order to fix packet order.

Change-Id: If652c906be8b16115eb84e749f89a2729ba75120
pull/1218/head
Damir Nedžibović 4 years ago committed by Richard Fuchs
parent 1e62cbbb9a
commit f650f95111

@ -53,6 +53,7 @@
struct poller *rtpe_poller;
struct poller_map *rtpe_poller_map;
struct rtpengine_config initial_rtpe_config;
static struct control_tcp *rtpe_tcp;
@ -478,6 +479,7 @@ static void options(int *argc, char ***argv) {
{ "https-key", 0,0, G_OPTION_ARG_STRING, &rtpe_config.https_key, "Private key for HTTPS and WSS","FILE"},
{ "http-threads", 0,0, G_OPTION_ARG_INT, &rtpe_config.http_threads,"Number of worker threads for HTTP and WS","INT"},
{ "software-id", 0,0, G_OPTION_ARG_STRING, &rtpe_config.software_id,"Identification string of this software presented to external systems","STRING"},
{ "poller-per-thread", 0,0, G_OPTION_ARG_NONE, &rtpe_config.poller_per_thread, "Use poller per thread", NULL },
#ifdef WITH_TRANSCODING
{ "dtx-delay", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_delay, "Delay in milliseconds to trigger DTX handling","INT"},
{ "max-dtx", 0,0, G_OPTION_ARG_INT, &rtpe_config.max_dtx, "Maximum duration of DTX handling", "INT"},
@ -930,6 +932,10 @@ no_kernel:
if (!rtpe_poller)
die("poller creation failed");
rtpe_poller_map = poller_map_new();
if (!rtpe_poller_map)
die("poller map creation failed");
dtls_timer(rtpe_poller);
if (call_init())
@ -1075,9 +1081,15 @@ int main(int argc, char **argv) {
service_notify("READY=1\n");
for (idx = 0; idx < rtpe_config.num_threads; ++idx)
thread_create_detach_prio(poller_loop, rtpe_poller, rtpe_config.scheduling,
rtpe_config.priority, "poller");
for (idx = 0; idx < rtpe_config.num_threads; ++idx) {
if (!rtpe_config.poller_per_thread)
thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller");
else
thread_create_detach_prio(poller_loop, rtpe_poller_map, rtpe_config.scheduling, rtpe_config.priority, "poller");
}
if (!rtpe_config.poller_per_thread)
thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller");
if (rtpe_config.media_num_threads < 0)
rtpe_config.media_num_threads = rtpe_config.num_threads;
@ -1135,7 +1147,6 @@ int main(int argc, char **argv) {
codeclib_free();
statistics_free();
call_interfaces_free();
interfaces_free();
ice_free();
dtls_cert_free();
control_ng_cleanup();
@ -1157,6 +1168,8 @@ int main(int argc, char **argv) {
obj_release(rtpe_tcp);
obj_release(rtpe_control_ng);
poller_free(&rtpe_poller);
poller_map_free(&rtpe_poller_map);
interfaces_free();
return 0;
}

@ -2223,6 +2223,7 @@ static void stream_fd_free(void *p) {
struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct local_intf *lif) {
struct stream_fd *sfd;
struct poller_item pi;
struct poller *p = rtpe_poller;
sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free);
sfd->unique_id = g_queue_get_length(&call->stream_fds);
@ -2240,8 +2241,12 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct lo
pi.readable = stream_fd_readable;
pi.closed = stream_fd_closed;
if (poller_add_item(rtpe_poller, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
if (rtpe_config.poller_per_thread)
p = poller_map_get(rtpe_poller_map);
if (p) {
if (poller_add_item(p, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
}
return sfd;
}

@ -50,9 +50,69 @@ struct poller {
GSList *timers_del;
};
struct poller_map {
mutex_t lock;
GHashTable *table;
};
struct poller_map *poller_map_new(void) {
struct poller_map *p;
p = malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
mutex_init(&p->lock);
p->table = g_hash_table_new(g_direct_hash, g_direct_equal);
return p;
}
void poller_map_add(struct poller_map *map) {
pthread_t tid = -1;
struct poller *p;
if (!map)
return;
tid = pthread_self();
mutex_lock(&map->lock);
p = poller_new();
g_hash_table_insert(map->table, (gpointer)tid, p);
mutex_unlock(&map->lock);
}
struct poller *poller_map_get(struct poller_map *map) {
if (!map)
return NULL;
struct poller *p = NULL;
pthread_t tid = pthread_self();
mutex_lock(&map->lock);
p = g_hash_table_lookup(map->table, (gpointer)tid);
if (!p) {
gpointer *arr = g_hash_table_get_keys_as_array(map->table, NULL);
p = g_hash_table_lookup(map->table, arr[ssl_random() % g_hash_table_size(map->table)]);
g_free(arr);
}
mutex_unlock(&map->lock);
return p;
}
static void poller_map_free_poller(gpointer k, gpointer v, gpointer d) {
struct poller *p = (struct poller *)v;
poller_free(&p);
}
void poller_map_free(struct poller_map **map) {
struct poller_map *m = *map;
if (!m)
return;
mutex_lock(&m->lock);
g_hash_table_foreach(m->table, poller_map_free_poller, NULL);
g_hash_table_destroy(m->table);
mutex_unlock(&m->lock);
mutex_destroy(&m->lock);
free(m);
*map = NULL;
}
struct poller *poller_new(void) {
struct poller *p;
@ -538,8 +598,19 @@ now:
}
void poller_loop(void *d) {
struct poller_map *map = d;
poller_map_add(map);
struct poller *p = poller_map_get(map);
poller_loop2(p);
}
void poller_loop2(void *d) {
struct poller *p = d;
while (!rtpe_shutdown)
poller_poll(p, 100);
while (!rtpe_shutdown) {
int ret = poller_poll(p, 100);
if (ret < 0)
usleep(20 * 1000);
}
}

@ -848,6 +848,14 @@ information.
Always sets the option B<reorder-codecs> in answer messages as described in the
F<README.md>.
=item B<--poller-per-thread>
Enable 'poller per thread' functionality: for every worker thread (see the
--num-threads option) a poller will be created. With this option on, it is
guaranteed that only a single thread will ever read from a particular socket,
thus maintaining the order of the packets. Might help when having issues with
DTMF packets (RFC 2833).
=back
=head1 INTERFACES

@ -116,12 +116,15 @@ struct rtpengine_config {
str cn_payload;
int reorder_codecs;
char *software_id;
int poller_per_thread;
};
struct poller;
extern struct poller *rtpe_poller; // main global poller instance XXX convert to struct instead of pointer?
struct poller_map;
extern struct poller *rtpe_poller; // main global poller instance XXX convert to struct instead of pointer?
extern struct poller_map *rtpe_poller_map;
extern struct rtpengine_config rtpe_config;
extern struct rtpengine_config initial_rtpe_config;

@ -28,9 +28,12 @@ struct poller_item {
};
struct poller;
struct poller_map;
struct poller *poller_new(void);
struct poller_map *poller_map_new(void);
struct poller *poller_map_get(struct poller_map *);
void poller_map_free(struct poller_map **);
void poller_free(struct poller **);
int poller_add_item(struct poller *, struct poller_item *);
int poller_update_item(struct poller *, struct poller_item *);
@ -43,6 +46,7 @@ void poller_error(struct poller *, void *);
int poller_poll(struct poller *, int);
void poller_timer_loop(void *);
void poller_loop(void *);
void poller_loop2(void *);
int poller_add_timer(struct poller *, void (*)(void *), struct obj *);
int poller_del_timer(struct poller *, void (*)(void *), struct obj *);

@ -10,6 +10,7 @@ int _log_facility_cdr;
int _log_facility_dtmf;
struct rtpengine_config rtpe_config;
struct poller *rtpe_poller;
struct poller_map *rtpe_poller_map;
GString *dtmf_logs;
static str *sdup(char *s) {

Loading…
Cancel
Save