MT#55283 initial support for io_uring

Change-Id: I497db70a4ba6a9242b637a867a70fdfa0f361a44
pull/1826/head
Richard Fuchs 1 year ago
parent ad00134c61
commit ebaca8a4e0

1
daemon/.gitignore vendored

@ -28,3 +28,4 @@ mix_in_x64_avx512bw.S
mix_in_x64_sse2.S
poller.c
bufferpool.c
uring.c

@ -47,6 +47,7 @@ flags = [
'-DHAVE_BCG729',
'-DHAVE_MQTT',
'-DHAVE_CODEC_CHAIN',
'-DHAVE_LIBURING',
'-D__csh_lookup(x)=str_hash(x)',
'-DCSH_LOOKUP(x)=' + csh_lookup_str,
'-O2',

@ -102,6 +102,9 @@ ifeq ($(with_transcoding),yes)
LIBSRCS+= codeclib.strhash.c resample.c
LIBASM= mvr2s_x64_avx2.S mvr2s_x64_avx512.S mix_in_x64_avx2.S mix_in_x64_avx512bw.S mix_in_x64_sse2.S
endif
ifneq ($(have_liburing),yes)
LIBSRCS+= uring.c
endif
OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) $(LIBASM:.S=.o)
MDS= rtpengine.ronn

@ -14,6 +14,8 @@
#include "main.h"
#include "bufferpool.h"
#include "media_socket.h"
#include "uring.h"
#include "poller.h"
#if 0
#define BSDB(x...) fprintf(stderr, x)
@ -196,6 +198,10 @@ static void thread_detach_cleanup(void *dtp) {
struct detach_thread *dt = dtp;
g_slice_free1(sizeof(*dt), dt);
bufferpool_destroy(media_bufferpool);
#ifdef HAVE_LIBURING
if (rtpe_config.common.io_uring)
uring_thread_cleanup();
#endif
thread_join_me();
}
@ -249,6 +255,10 @@ static void *thread_detach_func(void *d) {
}
media_bufferpool = bufferpool_new(g_malloc, g_free, 64 * 65536);
#ifdef HAVE_LIBURING
if (rtpe_config.common.io_uring)
uring_thread_init();
#endif
thread_cleanup_push(thread_detach_cleanup, dt);
dt->func(dt->data);
@ -294,6 +304,8 @@ static void thread_looper_helper(void *fp) {
enum thread_looper_action ret = lh.f();
uring_thread_loop();
struct timeval stop;
gettimeofday(&stop, NULL);
long long duration_us = timeval_diff(&stop, &rtpe_now);

@ -23,6 +23,7 @@
#endif
#include "kernel.h"
#include "bufferpool.h"
#include "uring.h"
#define DEFAULT_AVIO_BUFSIZE 4096
@ -239,6 +240,18 @@ static void send_timer_rtcp(struct send_timer *st, struct ssrc_ctx *ssrc_out) {
timeval_add_usec(&ssrc_out->next_rtcp, 5000000 + (ssl_random() % 2000000));
}
struct async_send_req {
struct uring_req req; // must be first
struct iovec iov;
struct msghdr msg;
struct sockaddr_storage sin;
void *buf;
};
static void async_send_req_free(struct uring_req *p, int32_t res, uint32_t flags) {
struct async_send_req *req = (__typeof__(req)) p;
bufferpool_unref(req->buf);
uring_req_free(p);
}
static bool __send_timer_send_1(struct rtp_header *rh, struct packet_stream *sink, struct codec_packet *cp) {
stream_fd *sink_fd = sink->selected_sfd;
@ -265,9 +278,19 @@ static bool __send_timer_send_1(struct rtp_header *rh, struct packet_stream *sin
if (cp->kernel_send_info.local.family)
kernel_send_rtcp(&cp->kernel_send_info, cp->s.s, cp->s.len);
else
socket_sendto(&sink_fd->socket,
cp->s.s, cp->s.len, &sink->endpoint);
else {
struct async_send_req *req = uring_alloc_req(sizeof(*req), async_send_req_free);
req->iov = (__typeof(req->iov)) {
.iov_base = cp->s.s,
.iov_len = cp->s.len,
};
req->msg = (__typeof(req->msg)) {
.msg_iov = &req->iov,
.msg_iovlen = 1,
};
req->buf = bufferpool_ref(cp->s.s);
uring_sendmsg(&sink_fd->socket, &req->msg, &sink->endpoint, &req->sin, &req->req);
}
if (sink->call->recording && rtpe_config.rec_egress) {
// fill in required members

@ -14,6 +14,7 @@
#include "log.h"
#include "ice.h"
#include "ssllib.h"
#include "uring.h"
#define STUN_CRC_XOR 0x5354554eUL
@ -664,15 +665,12 @@ ignore:
return -1;
}
int stun_binding_request(const endpoint_t *dst, uint32_t transaction[3], str *pwd,
str ufrags[2], int controlling, uint64_t tiebreaker, uint32_t priority,
socket_t *sock, int to_use)
{
struct async_stun_req {
struct uring_req req; // must be first
struct header hdr;
struct msghdr mh;
struct iovec iov[10]; /* hdr, username x2, ice_controlled/ing, priority, uc, fp, mi, sw x2 */
char username_buf[256];
int i;
struct generic un_attr;
struct controlled_ing cc;
struct priority prio;
@ -680,30 +678,39 @@ int stun_binding_request(const endpoint_t *dst, uint32_t transaction[3], str *pw
struct fingerprint fp;
struct msg_integrity mi;
struct software sw;
struct sockaddr_storage sin;
};
output_init(&mh, iov, &hdr, STUN_BINDING_REQUEST, transaction);
software(&mh, &sw);
int stun_binding_request(const endpoint_t *dst, uint32_t transaction[3], str *pwd,
str ufrags[2], int controlling, uint64_t tiebreaker, uint32_t priority,
socket_t *sock, int to_use)
{
struct async_stun_req *r = uring_alloc_buffer_req(sizeof(*r));
int i;
i = snprintf(username_buf, sizeof(username_buf), STR_FORMAT":"STR_FORMAT,
output_init(&r->mh, r->iov, &r->hdr, STUN_BINDING_REQUEST, transaction);
software(&r->mh, &r->sw);
i = snprintf(r->username_buf, sizeof(r->username_buf), STR_FORMAT":"STR_FORMAT,
STR_FMT(&ufrags[0]), STR_FMT(&ufrags[1]));
if (i <= 0 || i >= sizeof(username_buf))
if (i <= 0 || i >= sizeof(r->username_buf))
return -1;
output_add_data_wr(&mh, &un_attr, STUN_USERNAME, username_buf, i);
output_add_data_wr(&r->mh, &r->un_attr, STUN_USERNAME, r->username_buf, i);
cc.tiebreaker = htobe64(tiebreaker);
output_add(&mh, &cc, controlling ? STUN_ICE_CONTROLLING : STUN_ICE_CONTROLLED);
r->cc.tiebreaker = htobe64(tiebreaker);
output_add(&r->mh, &r->cc, controlling ? STUN_ICE_CONTROLLING : STUN_ICE_CONTROLLED);
prio.priority = htonl(priority);
output_add(&mh, &prio, STUN_PRIORITY);
r->prio.priority = htonl(priority);
output_add(&r->mh, &r->prio, STUN_PRIORITY);
if (to_use)
output_add(&mh, &uc, STUN_USE_CANDIDATE);
output_add(&r->mh, &r->uc, STUN_USE_CANDIDATE);
integrity(&mh, &mi, pwd);
fingerprint(&mh, &fp);
integrity(&r->mh, &r->mi, pwd);
fingerprint(&r->mh, &r->fp);
output_finish_src(&mh);
socket_sendmsg(sock, &mh, dst);
output_finish_src(&r->mh);
uring_sendmsg(sock, &r->mh, dst, &r->sin, &r->req);
return 0;
}

@ -2,6 +2,7 @@
#include "helpers.h"
#include "log_funcs.h"
#include "poller.h"
static int tt_obj_cmp(const void *a, const void *b) {
@ -93,6 +94,7 @@ static void timerthread_run(void *p) {
obj_put(tt_obj);
log_info_reset();
uring_thread_loop();
mutex_lock(&tt->lock);
continue;

1
debian/control vendored

@ -44,6 +44,7 @@ Build-Depends:
libswresample-dev (>= 6:10),
libsystemd-dev,
libtest2-suite-perl,
liburing-dev (>= 2.3) <!pkg.ngcp-rtpengine.nouring>,
libwebsockets-dev,
libxmlrpc-core-c3-dev (>= 1.16.07),
libxtables-dev (>= 1.4) | iptables-dev (>= 1.4),

@ -34,6 +34,7 @@ flags = [
'-DWITH_IPTABLES_OPTION',
'-DWITH_TRANSCODING',
'-DHAVE_BCG729',
'-DHAVE_LIBURING',
'-O2',
'-fstack-protector',
'--param=ssp-buffer-size=4',

@ -43,6 +43,7 @@ flags = [
'-DWITH_IPTABLES_OPTION',
'-DHAVE_BCG729',
'-DHAVE_CODEC_CHAIN',
'-DHAVE_LIBURING',
'-D__csh_lookup(x)=str_hash(x)',
'-DCSH_LOOKUP(x)=' + csh_lookup_str,
'-O2',

@ -199,6 +199,9 @@ void config_load(int *argc, char ***argv, GOptionEntry *app_entries, const char
{ "foreground", 'f', 0, G_OPTION_ARG_NONE, &rtpe_common_config_ptr->foreground, "Don't fork to background", NULL },
{ "thread-stack", 0,0, G_OPTION_ARG_INT, &rtpe_common_config_ptr->thread_stack, "Thread stack size in kB", "INT" },
{ "poller-size", 0,0, G_OPTION_ARG_INT, &rtpe_common_config_ptr->poller_size, "Max poller items per iteration", "INT" },
#ifdef HAVE_LIBURING
{ "io-uring", 0,0, G_OPTION_ARG_NONE, &rtpe_common_config_ptr->io_uring, "Use io_uring", NULL },
#endif
{ "evs-lib-path", 0,0, G_OPTION_ARG_FILENAME, &rtpe_common_config_ptr->evs_lib_path, "Location of .so for 3GPP EVS codec", "FILE" },
#ifdef HAVE_CODEC_CHAIN
{ "codec-chain-lib-path",0,0, G_OPTION_ARG_FILENAME, &rtpe_common_config_ptr->codec_chain_lib_path,"Location of libcodec-chain.so", "FILE" },

@ -34,6 +34,7 @@ struct rtpengine_common_config {
gboolean foreground;
int thread_stack;
int poller_size;
gboolean io_uring;
int max_log_line_length;
char *evs_lib_path;
char *codec_chain_lib_path;

@ -45,6 +45,16 @@ CFLAGS+= -DHAVE_LIBSYSTEMD
LDLIBS+= $(shell pkg-config --libs libsystemd)
endif
# look for liburing
ifeq ($(shell pkg-config --exists liburing && echo yes),yes)
have_liburing := yes
endif
ifeq ($(have_liburing),yes)
CFLAGS+= $(shell pkg-config --cflags liburing)
CFLAGS+= -DHAVE_LIBURING
LDLIBS+= $(shell pkg-config --libs liburing)
endif
ifeq ($(DBG),yes)
CFLAGS+= -D__DEBUG=1
endif

@ -17,7 +17,7 @@
#include "obj.h"
#include "log_funcs.h"
#include "auxlib.h"
#include "uring.h"
@ -292,6 +292,14 @@ void poller_error(struct poller *p, void *fdp) {
it->blocked = 1;
}
#ifdef HAVE_LIBURING
static unsigned int __uring_thread_loop_dummy(void) { return 0; }
__thread unsigned int (*uring_thread_loop)(void) = __uring_thread_loop_dummy;
#endif
void poller_loop(void *d) {
struct poller *p = d;
int poller_size = rtpe_common_config_ptr->poller_size;
@ -305,6 +313,7 @@ void poller_loop(void *d) {
int ret = poller_poll(p, thread_sleep_time, evs, poller_size);
if (ret < 0)
usleep(20 * 1000);
uring_thread_loop();
}
thread_cleanup_pop(true);

@ -39,4 +39,11 @@ void poller_error(struct poller *, void *);
void poller_loop(void *);
#ifdef HAVE_LIBURING
extern __thread unsigned int (*uring_thread_loop)(void);
#else
INLINE unsigned int uring_thread_loop(void) { return 0; }
#endif
#endif

@ -0,0 +1,74 @@
#include "uring.h"
#include <errno.h>
#include <string.h>
#include "log.h"
#include "loglib.h"
#include "socket.h"
#include "poller.h"
static ssize_t __socket_sendmsg(socket_t *s, struct msghdr *m, const endpoint_t *e,
struct sockaddr_storage *ss, struct uring_req *r)
{
ssize_t ret = socket_sendmsg(s, m, e);
r->handler(r, 0, 0);
return ret;
}
__thread __typeof(__socket_sendmsg) (*uring_sendmsg) = __socket_sendmsg;
#ifdef HAVE_LIBURING
#include <liburing.h>
static __thread struct io_uring rtpe_uring;
static ssize_t __uring_sendmsg(socket_t *s, struct msghdr *m, const endpoint_t *e,
struct sockaddr_storage *ss, struct uring_req *r)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring);
assert(sqe != NULL);
s->family->endpoint2sockaddr(ss, e);
m->msg_name = ss;
m->msg_namelen = s->family->sockaddr_size;
io_uring_sqe_set_data(sqe, r);
io_uring_prep_sendmsg(sqe, s->fd, m, 0);
return 0;
}
static unsigned int __uring_thread_loop(void) {
io_uring_submit_and_get_events(&rtpe_uring);
struct io_uring_cqe *cqe;
unsigned int head, num = 0;
io_uring_for_each_cqe(&rtpe_uring, head, cqe) {
struct uring_req *req = io_uring_cqe_get_data(cqe);
req->handler(req, cqe->res, cqe->flags);
num++;
}
io_uring_cq_advance(&rtpe_uring, num);
return num;
}
void uring_thread_init(void) {
struct io_uring_params params = {0};
int ret = io_uring_queue_init_params(4096, &rtpe_uring, &params);
if (ret)
die("io_uring init failed (%s)", strerror(errno));
uring_sendmsg = __uring_sendmsg;
uring_thread_loop = __uring_thread_loop;
}
void uring_thread_cleanup(void) {
io_uring_queue_exit(&rtpe_uring);
}
#endif

@ -0,0 +1,43 @@
#ifndef _URING_H_
#define _URING_H_
#include "socket.h"
struct uring_req;
typedef void uring_req_handler_fn(struct uring_req *, int32_t res, uint32_t flags);
struct uring_req {
uring_req_handler_fn *handler;
};
extern __thread ssize_t (*uring_sendmsg)(socket_t *, struct msghdr *, const endpoint_t *,
struct sockaddr_storage *, struct uring_req *);
INLINE void uring_req_buffer_free(struct uring_req *r, int32_t res, uint32_t flags) {
g_free(r);
}
INLINE void uring_req_free(struct uring_req *r) {
g_free(r);
}
INLINE void *uring_alloc_req(size_t len, uring_req_handler_fn *fn) {
struct uring_req *ret = g_malloc0(len);
ret->handler = fn;
return ret;
}
INLINE void *uring_alloc_buffer_req(size_t len) {
return uring_alloc_req(len, uring_req_buffer_free);
}
#ifdef HAVE_LIBURING
#include "bufferpool.h"
void uring_thread_init(void);
void uring_thread_cleanup(void);
#endif
#endif

@ -17,3 +17,4 @@ dtmflib.c
poller.c
ssllib.c
bufferpool.c
uring.c

@ -13,6 +13,8 @@ cp -r debian ${DIST}
# No libbcg729-dev package
sed -i -e '/libbcg729-dev/d' ${DIST}/control
# No liburing-dev package
sed -i -e '/liburing-dev/d' ${DIST}/control
# Update for buster debhelper
sed -i -e 's/debhelper-compat.*/debhelper-compat (= 12),/' ${DIST}/control

@ -13,6 +13,8 @@ cp -r debian ${DIST}
# No libbcg729-dev package
sed -i -e '/libbcg729-dev/d' ${DIST}/control
# No liburing-dev package
sed -i -e '/liburing-dev/d' ${DIST}/control
# Update for focal debhelper
sed -i -e 's/debhelper-compat.*/debhelper-compat (= 12),/' ${DIST}/control

@ -23,3 +23,5 @@ mvr2s_x64_avx2.S
mix_in_x64_avx2.S
mix_in_x64_avx512bw.S
mix_in_x64_sse2.S
bufferpool.c
uring.c

@ -35,7 +35,7 @@ include ../lib/g729.Makefile
SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \
decoder.c output.c mix.c db.c log.c forward.c tag.c poller.c notify.c
LIBSRCS= loglib.c auxlib.c rtplib.c codeclib.strhash.c resample.c str.c socket.c streambuf.c ssllib.c \
dtmflib.c
dtmflib.c bufferpool.c
LIBASM= mvr2s_x64_avx2.S mvr2s_x64_avx512.S mix_in_x64_avx2.S mix_in_x64_avx512bw.S mix_in_x64_sse2.S
OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) $(LIBASM:.S=.o)

1
t/.gitignore vendored

@ -86,3 +86,4 @@ mix_in_x64_sse2.S
test-amr-decode
test-amr-encode
bufferpool.c
uring.c

@ -78,7 +78,7 @@ SRCS+= spandsp_recv_fax_pcm.c spandsp_recv_fax_t38.c spandsp_send_fax_pcm.c \
ifeq ($(RTPENGINE_EXTENDED_TESTS),1)
SRCS+= test-amr-decode.c test-amr-encode.c
endif
LIBSRCS+= codeclib.strhash.c resample.c socket.c streambuf.c dtmflib.c poller.c
LIBSRCS+= codeclib.strhash.c resample.c socket.c streambuf.c dtmflib.c poller.c bufferpool.c
DAEMONSRCS+= control_ng_flags_parser.c codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c \
dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \
cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \
@ -87,6 +87,9 @@ DAEMONSRCS+= control_ng_flags_parser.c codec.c call.c ice.c kernel.c media_socke
HASHSRCS+= call_interfaces.c control_ng.c sdp.c janus.c
LIBASM= mvr2s_x64_avx2.S mvr2s_x64_avx512.S mix_in_x64_avx2.S mix_in_x64_avx512bw.S mix_in_x64_sse2.S
endif
ifneq ($(have_liburing),yes)
LIBSRCS+= uring.c
endif
OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) $(DAEMONSRCS:.c=.o) $(HASHSRCS:.c=.strhash.o) $(LIBASM:.S=.o)
@ -195,7 +198,7 @@ test-bitstr: test-bitstr.o
test-mix-buffer: test-mix-buffer.o $(COMMONOBJS) mix_buffer.o ssrc.o rtp.o crypto.o helpers.o \
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o codeclib.strhash.o dtmflib.o \
mvr2s_x64_avx2.o mvr2s_x64_avx512.o resample.o bufferpool.o
mvr2s_x64_avx2.o mvr2s_x64_avx512.o resample.o bufferpool.o uring.o poller.o
spandsp_send_fax_pcm: spandsp_send_fax_pcm.o
@ -226,7 +229,7 @@ test-stats: test-stats.o $(COMMONOBJS) codeclib.strhash.o resample.o codec.o ssr
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o \
websocket.o cli.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o audio_player.o mix_buffer.o \
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o uring.o
test-transcode: test-transcode.o $(COMMONOBJS) codeclib.strhash.o resample.o codec.o ssrc.o call.o ice.o helpers.o \
kernel.o media_socket.o stun.o bencode.o socket.o poller.o dtls.o recording.o statistics.o \
@ -235,13 +238,13 @@ test-transcode: test-transcode.o $(COMMONOBJS) codeclib.strhash.o resample.o cod
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o websocket.o \
cli.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o audio_player.o mix_buffer.o \
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o uring.o
test-resample: test-resample.o $(COMMONOBJS) codeclib.strhash.o resample.o dtmflib.o mvr2s_x64_avx2.o \
mvr2s_x64_avx512.o
test-payload-tracker: test-payload-tracker.o $(COMMONOBJS) ssrc.o helpers.o auxlib.o rtp.o crypto.o codeclib.strhash.o \
resample.o dtmflib.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o bufferpool.o
resample.o dtmflib.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o bufferpool.o uring.o poller.o
test-kernel-module: test-kernel-module.o $(COMMONOBJS) kernel.o

Loading…
Cancel
Save