diff --git a/daemon/helpers.c b/daemon/helpers.c index b77835f53..557bd9219 100644 --- a/daemon/helpers.c +++ b/daemon/helpers.c @@ -254,7 +254,7 @@ static void *thread_detach_func(void *d) { dt->priority, strerror(errno)); } - media_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free, 64 * 65536); + media_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free); #ifdef HAVE_LIBURING if (rtpe_config.common.io_uring) uring_thread_init(); diff --git a/daemon/kernel.c b/daemon/kernel.c index 30fe49f79..fdc834981 100644 --- a/daemon/kernel.c +++ b/daemon/kernel.c @@ -19,7 +19,6 @@ #include "xt_RTPENGINE.h" #define PREFIX "/proc/rtpengine" -#define MMAP_PAGE_SIZE (4096 * 16) struct kernel_interface kernel; @@ -58,13 +57,13 @@ static bool kernel_delete_table(unsigned int id) { return kernel_action_table("del", id); } -static void *kernel_alloc(size_t len) { - void *b = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, kernel.fd, 0); +static void *kernel_alloc(void) { + void *b = mmap(NULL, BUFFERPOOL_SHARD_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, kernel.fd, 0); assert(b != NULL && b != MAP_FAILED); return b; } -static void kernel_free(void *p, size_t len) { - munmap(p, len); +static void kernel_free(void *p) { + munmap(p, BUFFERPOOL_SHARD_SIZE); } static int kernel_open_table(unsigned int id) { @@ -144,7 +143,7 @@ bool kernel_setup_table(unsigned int id) { kernel.table = id; kernel.is_open = true; - shm_bufferpool = bufferpool_new2(kernel_alloc, kernel_free, MMAP_PAGE_SIZE); + shm_bufferpool = bufferpool_new(kernel_alloc, kernel_free); return true; } diff --git a/daemon/main.c b/daemon/main.c index 3cc0ba518..53164baed 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1458,7 +1458,7 @@ static void early_init(void) { #ifdef WITH_TRANSCODING static void clib_init(void) { - media_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free, 64 * 65536); + media_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free); #ifdef HAVE_LIBURING if (rtpe_config.common.io_uring) uring_thread_init(); @@ -1506,7 +1506,7 @@ static void kernel_setup(void) { return; fallback: - shm_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free, 4096); // fallback userspace bufferpool + shm_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free); // fallback userspace bufferpool } diff --git a/lib/bufferpool.c b/lib/bufferpool.c index f7ba81441..71fe4c05f 100644 --- a/lib/bufferpool.c +++ b/lib/bufferpool.c @@ -3,12 +3,12 @@ #include <stdbool.h> #include "obj.h" +static_assert((BUFFERPOOL_SHARD_SIZE & (BUFFERPOOL_SHARD_SIZE - 1)) == 0, + "BUFFERPOOL_SHARD_SIZE is not a power of two"); + struct bufferpool { - void *(*alloc)(size_t); + void *(*alloc)(void); void (*dealloc)(void *); - void (*dealloc2)(void *, size_t); - size_t shard_size; - size_t address_mask; mutex_t lock; GQueue empty_shards; GQueue full_shards; @@ -21,7 +21,6 @@ struct bpool_shard { void *buf; // actual head of buffer, given to free() void *empty; // head of usable buffer, head == empty if empty void *end; - size_t size; void *head; bool full; unsigned int (*recycle)(void *); @@ -32,30 +31,16 @@ struct bpool_shard { static rwlock_t bpool_shards_lock = RWLOCK_STATIC_INIT; static GPtrArray *bpool_shards; -static struct bufferpool *bufferpool_new_common(void *(*alloc)(size_t), size_t shard_size) { +struct bufferpool *bufferpool_new(void *(*alloc)(void), void (*dealloc)(void *)) { struct bufferpool *ret = g_new0(__typeof(*ret), 1); ret->alloc = alloc; - ret->shard_size = shard_size; - ret->address_mask = shard_size - 1; - assert((ret->address_mask & shard_size) == 0); // must be a power of two mutex_init(&ret->lock); g_queue_init(&ret->empty_shards); g_queue_init(&ret->full_shards); - return ret; -} - -struct bufferpool *bufferpool_new(void *(*alloc)(size_t), void (*dealloc)(void *), size_t shard_size) { - struct bufferpool *ret = bufferpool_new_common(alloc, shard_size); ret->dealloc = dealloc; return ret; } -struct bufferpool *bufferpool_new2(void *(*alloc)(size_t), void (*dealloc)(void *, size_t), size_t shard_size) { - struct bufferpool *ret = bufferpool_new_common(alloc, shard_size); - ret->dealloc2 = dealloc; - return ret; -} - // bufferpool is locked and shard is in "full" list but with zero refs static void bufferpool_recycle(struct bpool_shard *shard) { struct bufferpool *bp = shard->bp; @@ -74,13 +59,7 @@ static void bufferpool_recycle(struct bpool_shard *shard) { static void bufferpool_dealloc(struct bpool_shard *shard) { struct bufferpool *bp = shard->bp; - void *p = shard->buf; - size_t len = shard->size; - - if (bp->dealloc) - bp->dealloc(p); - else - bp->dealloc2(p, len); + bp->dealloc(shard->buf); } // bufferpool is locked @@ -101,20 +80,19 @@ static int bpool_shards_sort(const void *A, const void *B) { } static struct bpool_shard *bufferpool_new_shard(struct bufferpool *bp) { - void *buf = bp->alloc(bp->shard_size); + void *buf = bp->alloc(); if (!buf) return NULL; // all bottom bits must be zero - assert(((size_t) buf & (bp->shard_size - 1)) == 0); + assert(((size_t) buf & BUFFERPOOL_BOTTOM_MASK) == 0); struct bpool_shard *ret = g_new0(__typeof(*ret), 1); ret->bp = bp; ret->buf = buf; - ret->size = bp->shard_size; ret->empty = buf; ret->head = buf; - ret->end = buf + bp->shard_size; + ret->end = buf + BUFFERPOOL_SHARD_SIZE; RWLOCK_W(&bpool_shards_lock); @@ -125,7 +103,7 @@ static struct bpool_shard *bufferpool_new_shard(struct bufferpool *bp) { } void *bufferpool_alloc(struct bufferpool *bp, size_t len) { - if (len > bp->shard_size) + if (len > BUFFERPOOL_SHARD_SIZE) return NULL; LOCK(&bp->lock); @@ -323,8 +301,8 @@ void bufferpool_cleanup(void) { g_ptr_array_free(bpool_shards, true); } -void *bufferpool_aligned_alloc(size_t len) { - void *m = aligned_alloc(len, len); +void *bufferpool_aligned_alloc(void) { + void *m = aligned_alloc(BUFFERPOOL_SHARD_SIZE, BUFFERPOOL_SHARD_SIZE); assert(m != NULL); return m; } diff --git a/lib/bufferpool.h b/lib/bufferpool.h index ad2070505..9123a5c4d 100644 --- a/lib/bufferpool.h +++ b/lib/bufferpool.h @@ -6,14 +6,18 @@ #define BUFFERPOOL_ALIGNMENT (sizeof(void *)) // bytes #define BUFFERPOOL_ALIGN(x) (((x + BUFFERPOOL_ALIGNMENT - 1) / BUFFERPOOL_ALIGNMENT) * BUFFERPOOL_ALIGNMENT) +#define BUFFERPOOL_SHARD_SIZE (1LL<<24) // 16 MB, must be a power of two +#define BUFFERPOOL_OVERHEAD (0) // storage space not available + +#define BUFFERPOOL_BOTTOM_MASK (BUFFERPOOL_SHARD_SIZE - 1) + struct bufferpool; struct bpool_shard; void bufferpool_init(void); void bufferpool_cleanup(void); -struct bufferpool *bufferpool_new(void *(*alloc)(size_t), void (*dealloc)(void *), size_t shard_size); -struct bufferpool *bufferpool_new2(void *(*alloc)(size_t), void (*dealloc)(void *, size_t), size_t shard_size); +struct bufferpool *bufferpool_new(void *(*alloc)(void), void (*dealloc)(void *)); void bufferpool_destroy(struct bufferpool *); void *bufferpool_alloc(struct bufferpool *bp, size_t len); @@ -30,7 +34,7 @@ INLINE void *bufferpool_alloc0(struct bufferpool *bp, size_t len) { return ret; } -void *bufferpool_aligned_alloc(size_t); +void *bufferpool_aligned_alloc(void); void bufferpool_aligned_free(void *); typedef char bp_char; diff --git a/lib/uring.c b/lib/uring.c index 61056d7ed..87fd895fd 100644 --- a/lib/uring.c +++ b/lib/uring.c @@ -144,7 +144,9 @@ struct poller *uring_poller_new(void) { ret->evs = g_ptr_array_new(); ret->blocked = g_array_new(false, true, sizeof(char)); - ret->bufferpool = bufferpool_new(g_malloc, g_free, BUFFER_SIZE * BUFFERS_COUNT); + static_assert(BUFFERPOOL_SHARD_SIZE - BUFFERPOOL_OVERHEAD >= BUFFER_SIZE * BUFFERS_COUNT, + "BUFFERPOOL_SHARD_SIZE too small"); + ret->bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free); for (int i = 0; i < BUFFER_POOLS; i++) { ret->buffers[i] = g_new0(__typeof(*ret->buffers[i]), 1); ret->buffers[i]->buf = bufferpool_reserve(ret->bufferpool, BUFFERS_COUNT, diff --git a/t/test-stats.c b/t/test-stats.c index f5778369d..fc792a775 100644 --- a/t/test-stats.c +++ b/t/test-stats.c @@ -68,7 +68,7 @@ static void __assert_metrics_eq(stats_metric_q *q, const char *b, unsigned int l int main(void) { rtpe_common_config_ptr = &rtpe_config.common; bufferpool_init(); - shm_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free, 4096); + shm_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free); endpoint_parse_any(&rtpe_config.graphite_ep, "1.2.3.4:4567"); diff --git a/t/test-transcode.c b/t/test-transcode.c index 11876828a..47dbadd38 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -418,8 +418,8 @@ static void dtmf(const char *s) { int main(void) { rtpe_common_config_ptr = &rtpe_config.common; bufferpool_init(); - media_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free, 4096); - shm_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free, 4096); + media_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free); + shm_bufferpool = bufferpool_new(bufferpool_aligned_alloc, bufferpool_aligned_free); unsigned long random_seed = 0;