From 38b17eb04d33746f74ef09089e15519f60433c17 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 6 Feb 2024 14:32:40 -0500 Subject: [PATCH] MT#55283 support kernel-based media player Change-Id: I5958015d819ca280273f80245dfa748531577b6a --- daemon/kernel.c | 102 +++ daemon/main.c | 10 +- daemon/media_player.c | 126 +++- docs/rtpengine.md | 22 + etc/rtpengine.conf | 8 + include/kernel.h | 7 + include/main.h | 2 + include/media_player.h | 1 + kernel-module/xt_RTPENGINE.c | 1218 +++++++++++++++++++++++++++++++++- kernel-module/xt_RTPENGINE.h | 62 ++ utils/.gitignore | 1 + utils/kplay-test.c | 320 +++++++++ utils/kplay-test2.c | 166 +++++ 13 files changed, 2003 insertions(+), 42 deletions(-) create mode 100644 utils/.gitignore create mode 100644 utils/kplay-test.c create mode 100644 utils/kplay-test2.c diff --git a/daemon/kernel.c b/daemon/kernel.c index cb48a47f2..d6dd89446 100644 --- a/daemon/kernel.c +++ b/daemon/kernel.c @@ -100,6 +100,12 @@ bool kernel_init_table(void) { [REMG_ADD_STREAM] = sizeof(struct rtpengine_command_add_stream), [REMG_DEL_STREAM] = sizeof(struct rtpengine_command_del_stream), [REMG_PACKET] = sizeof(struct rtpengine_command_packet), + [REMG_INIT_PLAY_STREAMS] = sizeof(struct rtpengine_command_init_play_streams), + [REMG_GET_PACKET_STREAM] = sizeof(struct rtpengine_command_get_packet_stream), + [REMG_PLAY_STREAM_PACKET] = sizeof(struct rtpengine_command_play_stream_packet), + [REMG_PLAY_STREAM] = sizeof(struct rtpengine_command_play_stream), + [REMG_STOP_STREAM] = sizeof(struct rtpengine_command_stop_stream), + [REMG_FREE_PACKET_STREAM] = sizeof(struct rtpengine_command_free_packet_stream), }, .rtpe_stats = rtpe_stats, }; @@ -252,3 +258,99 @@ unsigned int kernel_add_intercept_stream(unsigned int call_idx, const char *id) return UNINIT_IDX; return cmd.stream.idx.stream_idx; } + +bool kernel_init_player(int num_media, int num_sessions) { + if (num_media <= 0 || num_sessions <= 0) + return false; + if (!kernel.is_open) + return false; + + struct rtpengine_command_init_play_streams ips = { + .cmd = REMG_INIT_PLAY_STREAMS, + .num_packet_streams = num_media, + .num_play_streams = num_sessions, + }; + ssize_t ret = write(kernel.fd, &ips, sizeof(ips)); + if (ret != sizeof(ips)) + return false; + + kernel.use_player = true; + + return true; +} + +unsigned int kernel_get_packet_stream(void) { + if (!kernel.use_player) + return -1; + + struct rtpengine_command_get_packet_stream gps = { .cmd = REMG_GET_PACKET_STREAM }; + ssize_t ret = read(kernel.fd, &gps, sizeof(gps)); + if (ret != sizeof(gps)) + return -1; + return gps.packet_stream_idx; +} + +bool kernel_add_stream_packet(unsigned int idx, const char *buf, size_t len, unsigned long delay_ms, + uint32_t ts, uint32_t dur) +{ + if (!kernel.use_player) + return false; + + size_t total_len = len + sizeof(struct rtpengine_command_play_stream_packet); + struct rtpengine_command_play_stream_packet *cmd = alloca(total_len); + + cmd->cmd = REMG_PLAY_STREAM_PACKET; + cmd->play_stream_packet.packet_stream_idx = idx; + cmd->play_stream_packet.delay_ms = delay_ms; + cmd->play_stream_packet.delay_ts = ts; + cmd->play_stream_packet.duration_ts = dur; + + memcpy(&cmd->play_stream_packet.data, buf, len); + + ssize_t ret = write(kernel.fd, cmd, total_len); + if (ret != total_len) + return false; + return true; +} + +unsigned int kernel_start_stream_player(struct rtpengine_play_stream_info *info) { + if (!kernel.use_player) + return -1; + + struct rtpengine_command_play_stream ps = { + .cmd = REMG_PLAY_STREAM, + .info = *info, + }; + ssize_t ret = read(kernel.fd, &ps, sizeof(ps)); + if (ret == sizeof(ps)) + return ps.play_idx; + return -1; +} + +bool kernel_stop_stream_player(unsigned int idx) { + if (!kernel.use_player) + return false; + + struct rtpengine_command_stop_stream ss = { + .cmd = REMG_STOP_STREAM, + .play_idx = idx, + }; + ssize_t ret = write(kernel.fd, &ss, sizeof(ss)); + if (ret == sizeof(ss)) + return true; + return false; +} + +bool kernel_free_packet_stream(unsigned int idx) { + if (!kernel.use_player) + return false; + + struct rtpengine_command_free_packet_stream fps = { + .cmd = REMG_FREE_PACKET_STREAM, + .packet_stream_idx = idx, + }; + ssize_t ret = write(kernel.fd, &fps, sizeof(fps)); + if (ret == sizeof(fps)) + return true; + return false; +} diff --git a/daemon/main.c b/daemon/main.c index d0a56e851..ff254134b 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -118,6 +118,7 @@ struct rtpengine_config rtpe_config = { }, }, .max_recv_iters = MAX_RECV_ITERS, + .kernel_player_media = 128, }; static void sighandler(gpointer x) { @@ -633,6 +634,8 @@ static void options(int *argc, char ***argv) { { "silence-detect",0,0, G_OPTION_ARG_DOUBLE, &silence_detect, "Audio level threshold in percent for silence detection","FLOAT"}, { "cn-payload",0,0, G_OPTION_ARG_STRING_ARRAY,&cn_payload, "Comfort noise parameters to replace silence with","INT INT INT ..."}, { "player-cache",0,0, G_OPTION_ARG_NONE, &rtpe_config.player_cache,"Cache media files for playback in memory",NULL}, + { "kernel-player",0,0, G_OPTION_ARG_INT, &rtpe_config.kernel_player,"Max number of kernel media player streams","INT"}, + { "kernel-player-media",0,0,G_OPTION_ARG_INT, &rtpe_config.kernel_player_media,"Max number of kernel media files","INT"}, { "audio-buffer-length",0,0, G_OPTION_ARG_INT,&rtpe_config.audio_buffer_length,"Length in milliseconds of audio buffer","INT"}, { "audio-buffer-delay",0,0, G_OPTION_ARG_INT,&rtpe_config.audio_buffer_delay,"Initial delay in milliseconds for buffered audio","INT"}, { "audio-player",0,0, G_OPTION_ARG_STRING, &use_audio_player, "When to enable the internal audio player","on-demand|play-media|transcoding|always"}, @@ -1245,6 +1248,12 @@ static void kernel_setup(void) { #endif if (!kernel_setup_table(rtpe_config.kernel_table) && rtpe_config.no_fallback) die("Userspace fallback disallowed - exiting"); + + if (rtpe_config.player_cache && rtpe_config.kernel_player > 0 && rtpe_config.kernel_player_media > 0) { + if (!kernel_init_player(rtpe_config.kernel_player_media, rtpe_config.kernel_player)) + die("Failed to initialise kernel media player"); + } + return; fallback: @@ -1297,7 +1306,6 @@ static void init_everything(void) { die("Kernel module version mismatch or other fatal error"); } - static void create_everything(void) { struct timeval tmp_tv; diff --git a/daemon/media_player.c b/daemon/media_player.c index a9c4f3b27..195472b31 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -47,13 +47,17 @@ struct media_player_cache_index { struct media_player_content_index index; rtp_payload_type dst_pt; }; +TYPED_GHASHTABLE(media_player_ht, struct media_player, struct media_player, g_direct_hash, g_direct_equal, NULL, NULL) // XXX ref counting players struct media_player_cache_entry { - bool finished; + volatile bool finished; // "unfinished" elements, only used while decoding is active: mutex_t lock; cond_t cond; // to wait for more data to be decoded cache_packet_arr *packets; // read-only except for decoder thread, which uses finished flags and locks + unsigned long duration; // cumulative in ms, summed up while decoding + unsigned int kernel_idx; // -1 if not in use + media_player_ht wait_queue; // players waiting on decoder to finish struct codec_scheduler csch; struct media_player_coder coder; // de/encoder data @@ -64,7 +68,7 @@ struct media_player_cache_packet { char *buf; str s; long long pts; - long long duration; + long long duration; // us long long duration_ts; }; @@ -130,12 +134,22 @@ static void media_player_shutdown(struct media_player *mp) { mp->media = NULL; media_player_coder_shutdown(&mp->coder); + if (mp->kernel_idx != -1) + kernel_stop_stream_player(mp->kernel_idx); + else if (mp->cache_entry) { + mutex_lock(&mp->cache_entry->lock); + if (t_hash_table_is_set(mp->cache_entry->wait_queue)) + t_hash_table_remove(mp->cache_entry->wait_queue, mp); + mutex_unlock(&mp->cache_entry->lock); + } + mp->cache_index.type = MP_OTHER; if (mp->cache_index.file.s) g_free(mp->cache_index.file.s); mp->cache_index.file = STR_NULL;// coverity[missing_lock : FALSE] mp->cache_entry = NULL; // coverity[missing_lock : FALSE] mp->cache_read_idx = 0; + mp->kernel_idx = -1; } #endif @@ -185,6 +199,7 @@ void media_player_new(struct media_player **mpp, struct call_monologue *ml) { mp->tt_obj.tt = &media_player_thread; mutex_init(&mp->lock); + mp->kernel_idx = -1; mp->run_func = media_player_read_packet; // default mp->call = obj_get(ml->call); @@ -499,8 +514,70 @@ retry:; return false; } -static void media_player_cached_reader_start(struct media_player *mp, const rtp_payload_type *dst_pt) { +static void media_player_kernel_player_start_now(struct media_player *mp) { struct media_player_cache_entry *entry = mp->cache_entry; + if (!entry) + return; + const rtp_payload_type *dst_pt = &entry->coder.handler->dest_pt; + + ilog(LOG_DEBUG, "Starting kernel media player index %i (PT %i)", entry->kernel_idx, dst_pt->payload_type); + + struct rtpengine_play_stream_info info = { + .packet_stream_idx = entry->kernel_idx, + .pt = dst_pt->payload_type, + .seq = mp->seq, + .ts = mp->buffer_ts, + .ssrc = mp->ssrc_out->parent->h.ssrc, + .repeat = mp->opts.repeat, + .stats = mp->sink->stats_out, + .iface_stats = mp->sink->selected_sfd->local_intf->stats, + .ssrc_stats = mp->ssrc_out->stats, + }; + mp->sink->endpoint.address.family->endpoint2kernel(&info.dst_addr, &mp->sink->endpoint); // XXX unify with __re_address_translate_ep + mp->sink->selected_sfd->socket.local.address.family->endpoint2kernel(&info.src_addr, &mp->sink->selected_sfd->socket.local); // XXX unify with __re_address_translate_ep + mp->crypt_handler->out->kernel(&info.encrypt, mp->sink); + + unsigned int idx = kernel_start_stream_player(&info); + if (idx == -1) + ilog(LOG_ERR, "Failed to start kernel media player (index %i): %s", info.packet_stream_idx, strerror(errno)); + else + mp->kernel_idx = idx; +} + +static void media_player_kernel_player_start(struct media_player *mp) { + struct media_player_cache_entry *entry = mp->cache_entry; + if (!entry) + return; + + // decoder finished yet? try unlocked read first + bool finished = entry->finished; + + if (!finished) { + mutex_lock(&entry->lock); + // check flag again in case it happened just now + if (!entry->finished) { + // add us to wait list + ilog(LOG_DEBUG, "Decoder not finished yet, waiting to start kernel player index %i", + entry->kernel_idx); + t_hash_table_insert(entry->wait_queue, mp, mp); // XXX reference needed? + mutex_unlock(&entry->lock); + return; + } + // finished now, drop down below + mutex_unlock(&entry->lock); + } + + media_player_kernel_player_start_now(mp); +} + +static void media_player_cached_reader_start(struct media_player *mp) { + struct media_player_cache_entry *entry = mp->cache_entry; + const rtp_payload_type *dst_pt = &entry->coder.handler->dest_pt; + + if (entry->kernel_idx != -1) { + media_player_kernel_player_start(mp); + return; + } // create dummy codec handler and start timer @@ -551,7 +628,7 @@ static bool media_player_cache_get_entry(struct media_player *mp, bool ret = true; // entry exists, use cached data if (entry) { - media_player_cached_reader_start(mp, dst_pt); + media_player_cached_reader_start(mp); goto out; } @@ -563,10 +640,12 @@ static bool media_player_cache_get_entry(struct media_player *mp, *ins_key = lookup; str_init_dup_str(&ins_key->index.file, &lookup.index.file); codec_init_payload_type(&ins_key->dst_pt, MT_UNKNOWN); // duplicate contents + entry = mp->cache_entry = g_slice_alloc0(sizeof(*entry)); mutex_init(&entry->lock); cond_init(&entry->cond); entry->packets = cache_packet_arr_new_sized(64); + entry->wait_queue = media_player_ht_new(); switch (lookup.index.type) { case MP_DB: @@ -584,6 +663,15 @@ static bool media_player_cache_get_entry(struct media_player *mp, g_hash_table_insert(media_player_cache, ins_key, entry); + entry->kernel_idx = -1; + if (kernel.use_player) { + entry->kernel_idx = kernel_get_packet_stream(); + if (entry->kernel_idx == -1) + ilog(LOG_ERR, "Failed to get kernel packet stream entry (%s)", strerror(errno)); + else + ilog(LOG_DEBUG, "Using kernel packet stream index %i", entry->kernel_idx); + } + out: mutex_unlock(&media_player_cache_lock); @@ -631,12 +719,23 @@ static void media_player_cache_entry_decoder_thread(void *p) { av_packet_unref(entry->coder.pkt); } + ilog(LOG_DEBUG, "Decoder thread for %s finished", entry->info_str); + mutex_lock(&entry->lock); entry->finished = true; cond_broadcast(&entry->cond); - mutex_unlock(&entry->lock); - ilog(LOG_DEBUG, "Decoder thread for %s finished", entry->info_str); + media_player_ht_iter iter; + t_hash_table_iter_init(&iter, entry->wait_queue); + struct media_player *mp; + while (t_hash_table_iter_next(&iter, &mp, NULL)) { + if (mp->media) + media_player_kernel_player_start_now(mp); + } + t_hash_table_destroy(entry->wait_queue); // not needed any more + entry->wait_queue = media_player_ht_null(); + + mutex_unlock(&entry->lock); } static void packet_encoded_cache(AVPacket *pkt, struct codec_ssrc_handler *ch, struct media_packet *mp, @@ -658,6 +757,16 @@ static void packet_encoded_cache(AVPacket *pkt, struct codec_ssrc_handler *ch, s mutex_lock(&entry->lock); t_ptr_array_add(entry->packets, ep); + if (entry->kernel_idx != -1) { + ilog(LOG_DEBUG, "Adding media packet (length %zu, TS %" PRIu64 ", delay %lu ms) to kernel packet stream %i", + s->len, pkt->pts, entry->duration, entry->kernel_idx); + if (!kernel_add_stream_packet(entry->kernel_idx, s->s, s->len, entry->duration, pkt->pts, + pkt->duration)) + ilog(LOG_ERR, "Failed to add packet to kernel player (%s)", strerror(errno)); + } + + entry->duration += ep->duration / 1000; + cond_broadcast(&entry->cond); mutex_unlock(&entry->lock); } @@ -693,7 +802,7 @@ static bool media_player_cache_entry_init(struct media_player *mp, const rtp_pay // use low priority (10 nice) thread_create_detach_prio(media_player_cache_entry_decoder_thread, entry, NULL, 10, "mp decoder"); - media_player_cached_reader_start(mp, dst_pt); + media_player_cached_reader_start(mp); return true; } @@ -1382,8 +1491,11 @@ static void media_player_cache_entry_free(void *p) { t_ptr_array_free(e->packets, true); mutex_destroy(&e->lock); g_free(e->info_str); + if (t_hash_table_is_set(e->wait_queue)) + t_hash_table_destroy(e->wait_queue); // XXX release references? media_player_coder_shutdown(&e->coder); av_packet_free(&e->coder.pkt); + kernel_free_packet_stream(e->kernel_idx); g_slice_free1(sizeof(*e), e); } #endif diff --git a/docs/rtpengine.md b/docs/rtpengine.md index ada11326e..0390c0f5c 100644 --- a/docs/rtpengine.md +++ b/docs/rtpengine.md @@ -1081,6 +1081,28 @@ call to inject-DTMF won't be sent to __\-\-dtmf-log-dest=__ or __\-\-listen-tcp- RTP data is cached and retained in memory for the lifetime of the process. +- __\-\-kernel-player=__*INT* +- __\-\-kernel-player-media=__*INT* + + Enables and configures the kernel-based media player. Disabled by default + and only available if the kernel module is in use, and requires + __player-cache__ to also be enabled. + + When enabled, media playback will be handled by a set of kernel threads. + The option __kernel-player__ defaults to zero and needs to set to non-zero + to enable the feature. The number given to the option is the maximum number + of concurrent kernel media players that can be used. + + The option __kernel-player-media__ configures the maximum number of unique + media "files" that can be stored for playback in the kernel module. Media + files requested for playback are first decoded by the __player-cache__ + feature, and then given to the kernel module in a pre-encoded format for + quick playback. Defaults to 128. + + Both player slots and media slots are shared among all instances of + *rtpengine* (using different kernel table IDs) running on a system using + the same kernel module. Unused slots use minimal resources. + - __audio-buffer-length=__*INT* Set the buffer length used by the audio player (see below) in milliseconds. The diff --git a/etc/rtpengine.conf b/etc/rtpengine.conf index 25688ce03..d405b0b90 100644 --- a/etc/rtpengine.conf +++ b/etc/rtpengine.conf @@ -116,6 +116,14 @@ recording-method = proc # silence-detect = 0.05 # cn-payload = 60 +# player-cache = false +# kernel-player = 0 +# kernel-player-media = 128 + +# audio-buffer-length = 120 +# audio-buffer-delay = 10 +# audio-player = on-demand + # sip-source = false # dtls-passive = false diff --git a/include/kernel.h b/include/kernel.h index f798688af..435566d55 100644 --- a/include/kernel.h +++ b/include/kernel.h @@ -22,6 +22,7 @@ struct kernel_interface { int fd; bool is_open; bool is_wanted; + bool use_player; }; extern struct kernel_interface kernel; @@ -40,5 +41,11 @@ void kernel_del_call(unsigned int); unsigned int kernel_add_intercept_stream(unsigned int call_idx, const char *id); +bool kernel_init_player(int num_media, int num_sessions); +unsigned int kernel_get_packet_stream(void); +bool kernel_add_stream_packet(unsigned int, const char *, size_t, unsigned long ms, uint32_t ts, uint32_t dur); +unsigned int kernel_start_stream_player(struct rtpengine_play_stream_info *); +bool kernel_stop_stream_player(unsigned int idx); +bool kernel_free_packet_stream(unsigned int); #endif diff --git a/include/main.h b/include/main.h index 83deb0dea..4143c7e77 100644 --- a/include/main.h +++ b/include/main.h @@ -150,6 +150,8 @@ struct rtpengine_config { uint32_t silence_detect_int; str cn_payload; gboolean player_cache; + int kernel_player; + int kernel_player_media; int audio_buffer_length; int audio_buffer_delay; enum { diff --git a/include/media_player.h b/include/media_player.h index 99004b849..88f971864 100644 --- a/include/media_player.h +++ b/include/media_player.h @@ -72,6 +72,7 @@ struct media_player { struct media_player_content_index cache_index; struct media_player_cache_entry *cache_entry; unsigned int cache_read_idx; + unsigned int kernel_idx; struct ssrc_ctx *ssrc_out; unsigned long seq; diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index cb75a9162..47e1a60e8 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -32,6 +32,9 @@ #include #include #include +#include +#include +#include #include "xt_RTPENGINE.h" @@ -153,6 +156,89 @@ MODULE_ALIAS("ip6t_RTPENGINE"); #define _w_unlock(l, f) write_unlock_irqrestore(l, f) #endif +#if 0 +#define _spinlock_t spinlock_t +#define _spin_lock_init(x) spin_lock_init(x) +#define _spin_lock(x) spin_lock(x) +#define _spin_unlock(x) spin_unlock(x) +#define _rwlock_t rwlock_t +#define _rwlock_init(x) rwlock_init(x) +#define _read_lock(x) read_lock(x) +#define _read_unlock(x) read_unlock(x) +#define _write_lock(x) write_lock(x) +#define _write_unlock(x) write_unlock(x) +#else +typedef struct { + spinlock_t l; + int owner; + int line; +} _spinlock_t; +static inline void _spin_lock_init(_spinlock_t *s) { + spin_lock_init(&s->l); + s->owner = 0; +} +static inline void __spin_lock(_spinlock_t *s, int line) { + unsigned int cnt = 0; + while (true) { + if (spin_trylock(&s->l)) { + s->owner = current->pid; + s->line = line; + return; + } + cnt++; + if (cnt == 100000000) + printk(KERN_WARNING "%i stuck at %i, owned by %i at %i\n", current->pid, line, s->owner, s->line); + } +} +static inline void _spin_unlock(_spinlock_t *s) { + s->owner = 0; + spin_unlock(&s->l); +} +typedef struct { + rwlock_t l; + int owner; + int line; +} _rwlock_t; +static inline void _rwlock_init(_rwlock_t *s) { + rwlock_init(&s->l); + s->owner = 0; +} +static inline void __read_lock(_rwlock_t *s, int line) { + unsigned int cnt = 0; + while (true) { + if (read_trylock(&s->l)) + return; + cnt++; + if (cnt == 100000000) + printk(KERN_WARNING "%i stuck at %i, owned by %i at %i\n", current->pid, line, s->owner, s->line); + } +} +static inline void _read_unlock(_rwlock_t *s) { + read_unlock(&s->l); +} +static inline void __write_lock(_rwlock_t *s, int line) { + unsigned int cnt = 0; + while (true) { + if (write_trylock(&s->l)) { + s->owner = current->pid; + s->line = line; + return; + } + cnt++; + if (cnt == 100000000) + printk(KERN_WARNING "%i stuck at %i, owned by %i at %i\n", current->pid, line, s->owner, s->line); + } +} +static inline void _write_unlock(_rwlock_t *s) { + s->owner = 0; + write_unlock(&s->l); +} +#define _spin_lock(x) __spin_lock(x, __LINE__) +#define _read_lock(x) __read_lock(x, __LINE__) +#define _write_lock(x) __write_lock(x, __LINE__) +#endif + + #if defined(RHEL_RELEASE_CODE) && LINUX_VERSION_CODE >= KERNEL_VERSION(5,14,0) && \ @@ -250,7 +336,7 @@ static void table_put(struct rtpengine_table *); static struct rtpengine_target *get_target(struct rtpengine_table *, const struct re_address *); static int is_valid_address(const struct re_address *rea); -static int aes_f8_session_key_init(struct re_crypto_context *, struct rtpengine_srtp *); +static int aes_f8_session_key_init(struct re_crypto_context *, const struct rtpengine_srtp *); static int srtp_encrypt_aes_cm(struct re_crypto_context *, struct rtpengine_srtp *, struct rtp_parsed *, uint32_t *); static int srtcp_encrypt_aes_cm(struct re_crypto_context *, struct rtpengine_srtp *, @@ -272,6 +358,12 @@ static int send_proxy_packet_output(struct sk_buff *skb, struct rtpengine_target int rtp_pt_idx, struct rtpengine_output *o, struct rtp_parsed *rtp, int ssrc_idx, const struct xt_action_param *par); +static int send_proxy_packet(struct sk_buff *skb, struct re_address *src, struct re_address *dst, + unsigned char tos, const struct xt_action_param *par); +static uint32_t proxy_packet_srtp_encrypt(struct sk_buff *skb, struct re_crypto_context *ctx, + struct rtpengine_srtp *srtp, + struct rtp_parsed *rtp, int ssrc_idx, + struct ssrc_stats **ssrc_stats); static void call_put(struct re_call *call); static void del_stream(struct re_stream *stream, struct rtpengine_table *); @@ -283,8 +375,6 @@ static inline int bitfield_clear(unsigned long *bf, unsigned int i); - - // mirror global_stats_counter from userspace struct global_stats_counter { #define F(x) atomic64_t x; @@ -429,6 +519,12 @@ struct rtpengine_table { struct list_head shm_list; struct global_stats_counter *rtpe_stats; + + _spinlock_t player_lock; + struct list_head play_streams; + unsigned int num_play_streams; + struct list_head packet_streams; + unsigned int num_packet_streams; }; struct re_cipher { @@ -444,7 +540,7 @@ struct re_cipher { struct rtp_parsed *, uint32_t *); int (*encrypt_rtcp)(struct re_crypto_context *, struct rtpengine_srtp *, struct rtp_parsed *, uint32_t *); - int (*session_key_init)(struct re_crypto_context *, struct rtpengine_srtp *); + int (*session_key_init)(struct re_crypto_context *, const struct rtpengine_srtp *); }; struct re_hmac { @@ -493,8 +589,63 @@ struct rtp_parsed { int rtcp; }; +struct play_stream_packet { + struct list_head list; + ktime_t delay; + uint32_t ts; + uint32_t duration_ts; + uint16_t seq; + //struct sk_buff *skb; + char *data; + size_t len; +}; + +struct play_stream_packets { + atomic_t refcnt; + _rwlock_t lock; + struct list_head packets; + unsigned int len; + unsigned int table_id; + struct list_head table_entry; + unsigned int idx; +}; + +struct play_stream { + _spinlock_t lock; + atomic_t refcnt; + unsigned int idx; + struct rtpengine_play_stream_info info; + struct re_crypto_context encrypt; + struct play_stream_packets *packets; + ktime_t start_time; + struct play_stream_packet *position; + struct timer_thread *timer_thread; + uint64_t tree_index; + unsigned int table_id; + struct list_head table_entry; +}; + +struct timer_thread { + struct list_head list; + unsigned int idx; + struct task_struct *task; + + wait_queue_head_t queue; + atomic_t shutdown; + + _spinlock_t tree_lock; // XXX use mutex? + struct btree_head64 tree; // timer entries // XXX use rbtree? + bool tree_added; + struct play_stream *scheduled; + ktime_t scheduled_at; +}; +static void free_packet_stream(struct play_stream_packets *stream); +static void free_play_stream_packet(struct play_stream_packet *p); +static void free_play_stream(struct play_stream *s); +static void do_stop_stream(struct play_stream *stream); + static struct proc_dir_entry *my_proc_root; @@ -507,9 +658,18 @@ static rwlock_t table_lock; static struct re_auto_array calls; static struct re_auto_array streams; +static _rwlock_t media_player_lock; +static struct play_stream_packets **stream_packets; +static unsigned int num_stream_packets; +static atomic_t last_stream_packets_idx; +static struct play_stream **play_streams; +static unsigned int num_play_streams; +static atomic_t last_play_stream_idx; - +static struct timer_thread **timer_threads; +static unsigned int num_timer_threads; +static atomic_t last_timer_thread_idx; #if LINUX_VERSION_CODE < KERNEL_VERSION(5,6,0) # define PROC_OP_STRUCT file_operations @@ -756,7 +916,10 @@ static struct rtpengine_table *new_table(void) { INIT_LIST_HEAD(&t->calls); INIT_LIST_HEAD(&t->shm_list); spin_lock_init(&t->shm_lock); + INIT_LIST_HEAD(&t->packet_streams); + INIT_LIST_HEAD(&t->play_streams); t->id = -1; + _spin_lock_init(&t->player_lock); for (i = 0; i < ARRAY_SIZE(t->calls_hash); i++) { INIT_HLIST_HEAD(&t->calls_hash[i]); @@ -922,18 +1085,12 @@ static void target_put(struct rtpengine_target *t) { - - - static void target_get(struct rtpengine_target *t) { atomic_inc(&t->refcnt); } - - - static void clear_proc(struct proc_dir_entry **e) { struct proc_dir_entry *pde; @@ -947,6 +1104,29 @@ static void clear_proc(struct proc_dir_entry **e) { +static void __unref_play_stream(struct play_stream *s); +static void __unref_packet_stream(struct play_stream_packets *stream); +static void end_of_stream(struct play_stream *stream); + +#define unref_play_stream(s) do { \ + /* printk(KERN_WARNING "unref play stream %p (%i--) @ %s:%i\n", s, atomic_read(&(s)->refcnt), __FILE__, __LINE__); */ \ + __unref_play_stream(s); \ +} while (0) + +#define ref_play_stream(s) do { \ + /* printk(KERN_WARNING "ref play stream %p (%i++) @ %s:%i\n", s, atomic_read(&(s)->refcnt), __FILE__, __LINE__); */ \ + atomic_inc(&(s)->refcnt); \ +} while (0) + +#define ref_packet_stream(s) do { \ + /* printk(KERN_WARNING "ref packet stream %p (%i++) @ %s:%i\n", s, atomic_read(&(s)->refcnt), __FILE__, __LINE__); */ \ + atomic_inc(&(s)->refcnt); \ +} while (0) + +#define unref_packet_stream(s) do { \ + /* printk(KERN_WARNING "unref packet stream %p (%i--) @ %s:%i\n", s, atomic_read(&(s)->refcnt), __FILE__, __LINE__); */ \ + __unref_packet_stream(s); \ +} while (0) static void clear_table_proc_files(struct rtpengine_table *t) { clear_proc(&t->proc_status); @@ -956,6 +1136,41 @@ static void clear_table_proc_files(struct rtpengine_table *t) { clear_proc(&t->proc_root); } +static void clear_table_player(struct rtpengine_table *t) { + struct play_stream *stream, *ts; + struct play_stream_packets *packets, *tp; + unsigned int idx; + + list_for_each_entry_safe(stream, ts, &t->play_streams, table_entry) { + _spin_lock(&stream->lock); + stream->table_id = -1; + idx = stream->idx; + _spin_unlock(&stream->lock); + _write_lock(&media_player_lock); + if (play_streams[idx] == stream) { + play_streams[idx] = NULL; + unref_play_stream(stream); + } + _write_unlock(&media_player_lock); + do_stop_stream(stream); + unref_play_stream(stream); + } + + list_for_each_entry_safe(packets, tp, &t->packet_streams, table_entry) { + _write_lock(&packets->lock); + packets->table_id = -1; + idx = packets->idx; + _write_unlock(&packets->lock); + _write_lock(&media_player_lock); + if (stream_packets[idx] == packets) { + stream_packets[idx] = NULL; + unref_packet_stream(packets); + } + _write_unlock(&media_player_lock); + unref_packet_stream(packets); + } +} + static void table_put(struct rtpengine_table *t) { int i, j, k; struct re_dest_addr *rda; @@ -1004,6 +1219,7 @@ static void table_put(struct rtpengine_table *t) { } clear_table_proc_files(t); + clear_table_player(t); kfree(t); module_put(THIS_MODULE); @@ -1177,6 +1393,10 @@ static ssize_t proc_status(struct file *f, char __user *b, size_t l, loff_t *o) len += sprintf(buf + len, "Targets: %u\n", t->num_targets); read_unlock_irqrestore(&t->target_lock, flags); + // unlocked/unsafe read + len += sprintf(buf + len, "Players: %u\n", t->num_play_streams); + len += sprintf(buf + len, "PStreams: %u\n", t->num_packet_streams); + table_put(t); if (copy_to_user(b, buf, len)) @@ -1842,7 +2062,7 @@ static void *shm_map_resolve(void *p, size_t size) { -static int validate_srtp(struct rtpengine_srtp *s) { +static int validate_srtp(const struct rtpengine_srtp *s) { if (s->cipher <= REC_INVALID) return -1; if (s->cipher >= __REC_LAST) @@ -2006,7 +2226,7 @@ static int prf_n(unsigned char *out, int len, const unsigned char *key, unsigned return 0; } -static int gen_session_key(unsigned char *out, int len, struct rtpengine_srtp *s, unsigned char label, +static int gen_session_key(unsigned char *out, int len, const struct rtpengine_srtp *s, unsigned char label, unsigned int index_len) { unsigned char key_id[7]; @@ -2034,7 +2254,7 @@ static int gen_session_key(unsigned char *out, int len, struct rtpengine_srtp *s -static int aes_f8_session_key_init(struct re_crypto_context *c, struct rtpengine_srtp *s) { +static int aes_f8_session_key_init(struct re_crypto_context *c, const struct rtpengine_srtp *s) { unsigned char m[16]; int i, ret; @@ -2059,7 +2279,7 @@ error: return ret; } -static int gen_session_keys(struct re_crypto_context *c, struct rtpengine_srtp *s, unsigned int label_offset, +static int gen_session_keys(struct re_crypto_context *c, const struct rtpengine_srtp *s, unsigned int label_offset, unsigned int index_len) { int ret; @@ -2210,17 +2430,17 @@ error: printk(KERN_ERR "Failed to generate session keys: %s\n", err); return ret; } -static int gen_rtp_session_keys(struct re_crypto_context *c, struct rtpengine_srtp *s) { +static int gen_rtp_session_keys(struct re_crypto_context *c, const struct rtpengine_srtp *s) { return gen_session_keys(c, s, 0, 6); } -static int gen_rtcp_session_keys(struct re_crypto_context *c, struct rtpengine_srtp *s) { +static int gen_rtcp_session_keys(struct re_crypto_context *c, const struct rtpengine_srtp *s) { return gen_session_keys(c, s, 3, SRTCP_R_LENGTH); } -static void crypto_context_init(struct re_crypto_context *c, struct rtpengine_srtp *s) { +static void crypto_context_init(struct re_crypto_context *c, const struct rtpengine_srtp *s) { c->cipher = &re_ciphers[s->cipher]; c->hmac = &re_hmacs[s->hmac]; } @@ -2927,6 +3147,7 @@ static int table_new_call(struct rtpengine_table *table, struct rtpengine_call_i atomic_set(&call->refcnt, 1); call->table_id = table->id; INIT_LIST_HEAD(&call->streams); + INIT_LIST_HEAD(&call->table_entry); /* check for name collisions */ @@ -3103,6 +3324,7 @@ static int table_new_stream(struct rtpengine_table *table, struct rtpengine_stre atomic_set(&stream->refcnt, 1); INIT_LIST_HEAD(&stream->packet_list); + INIT_LIST_HEAD(&stream->call_entry); spin_lock_init(&stream->packet_list_lock); init_waitqueue_head(&stream->read_wq); init_waitqueue_head(&stream->close_wq); @@ -3231,8 +3453,8 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table) * they're closed. There can be no new open file references as the stream is set * to eof. */ DBG("del_stream() waiting for other refs\n"); - while (1) { - if (wait_event_interruptible(stream->close_wq, atomic_read(&stream->refcnt) == 2) == 0) + while (atomic_read(&stream->refcnt) != 2) { + if (wait_event_interruptible_timeout(stream->close_wq, atomic_read(&stream->refcnt) == 2, HZ / 10) == 0) break; } @@ -3308,7 +3530,7 @@ static ssize_t proc_stream_read(struct file *f, char __user *b, size_t l, loff_t goto out; DBG("going to sleep\n"); ret = -ERESTARTSYS; - if (wait_event_interruptible(stream->read_wq, !list_empty(&stream->packet_list) || stream->eof)) + if (wait_event_interruptible_timeout(stream->read_wq, !list_empty(&stream->packet_list) || stream->eof, HZ / 10)) goto out; DBG("awakened\n"); spin_lock_irqsave(&stream->packet_list_lock, flags); @@ -3585,6 +3807,870 @@ static void parse_rtcp(struct rtp_parsed *rtp, struct sk_buff *skb) { rtp->rtcp = 1; } +static void shut_threads(struct timer_thread **thr, unsigned int nt) { + unsigned int i; + + if (!thr) + return; + + for (i = 0; i < nt; i++) { + struct timer_thread *tt = thr[i]; + if (!tt) + continue; + //printk(KERN_WARNING "stopping %u\n", i); + atomic_set(&tt->shutdown, 1); + wake_up_interruptible(&tt->queue); + // thread frees itself + } + + kfree(thr); +} + +static void shut_all_threads(void) { + LIST_HEAD(list); + unsigned int nt; + struct timer_thread **thr; + + _write_lock(&media_player_lock); + + thr = timer_threads; + nt = num_timer_threads; + timer_threads = NULL; + num_timer_threads = 0; + _write_unlock(&media_player_lock); + + shut_threads(thr, nt); +} + +static void free_packet_stream(struct play_stream_packets *stream) { + struct play_stream_packet *packet, *tp; + struct rtpengine_table *t; + + //printk(KERN_WARNING "freeing packet stream %p\n", stream); + + list_for_each_entry_safe(packet, tp, &stream->packets, list) + free_play_stream_packet(packet); + + if (stream->table_id != -1 && !list_empty(&stream->table_entry)) { + t = get_table(stream->table_id); + if (t) { + _spin_lock(&t->player_lock); + list_del_init(&stream->table_entry); + t->num_packet_streams--; + _spin_unlock(&t->player_lock); + table_put(t); + } + } + kfree(stream); +} + +static void __unref_packet_stream(struct play_stream_packets *stream) { + if (atomic_dec_and_test(&stream->refcnt)) + free_packet_stream(stream); +} + + +// stream must be locked and started +static ktime_t play_stream_packet_time(struct play_stream *stream, struct play_stream_packet *packet) { + return ktime_add(stream->start_time, packet->delay); +} + +// stream must be locked, started, and non-empty +static void play_stream_next_packet(struct play_stream *stream) { + struct play_stream_packet *packet = stream->position; + struct play_stream_packets *packets = stream->packets; + _read_lock(&packets->lock); + stream->position = list_is_last(&packet->list, &packets->packets) ? NULL : list_next_entry(packet, list); + if (!stream->position) { + if (stream->info.repeat > 1) { + stream->info.repeat--; + stream->position = list_first_entry(&packets->packets, struct play_stream_packet, list); + stream->start_time = play_stream_packet_time(stream, packet); + stream->info.ts += packet->ts + packet->duration_ts; + stream->info.seq += packet->seq + 1; + } + } + _read_unlock(&packets->lock); +} + +// stream must be locked, started, and non-empty +// tt->tree_lock must be locked +static void play_stream_insert_packet_to_tree(struct play_stream *stream, struct timer_thread *tt, ktime_t scheduled) { + int64_t offset; + + // make sure key is unique + // negative as we only have btree_last(), no btree_first() + for (offset = 0; btree_lookup64(&tt->tree, -1 * ktime_to_ns(scheduled) + offset) != NULL; offset++) + { } + stream->tree_index = -1 * ktime_to_ns(scheduled) + offset; + btree_insert64(&tt->tree, stream->tree_index, stream, GFP_ATOMIC); +} + +// stream must be locked, started, and non-empty +// tree must not be locked +static void play_stream_schedule_packet_to_thread(struct play_stream *stream, struct timer_thread *tt, + bool reschedule) +{ + ktime_t scheduled; + struct play_stream_packet *packet; + + packet = stream->position; + scheduled = play_stream_packet_time(stream, packet); + + //if (sleeper) + //printk(KERN_WARNING "scheduling packet %u on thread %u\n", packet->seq, tt->idx); + //printk(KERN_WARNING "scheduling stream %p on thread %p (sleeper %i)\n", stream, tt, sleeper); + + _spin_lock(&tt->tree_lock); + + if (reschedule && !tt->scheduled && !tt->tree_added) { + // we know we are next. remember this + tt->scheduled = stream; + ref_play_stream(stream); + tt->scheduled_at = scheduled; + } + else { + // all other cases: add to tree, or put as next + if (tt->scheduled && ktime_before(scheduled, tt->scheduled_at)) { + // we are next. return previous entry to tree and put us as next + play_stream_insert_packet_to_tree(tt->scheduled, tt, tt->scheduled_at); + tt->scheduled = stream; + ref_play_stream(stream); + tt->scheduled_at = scheduled; + } + else { + // insert into tree + play_stream_insert_packet_to_tree(stream, tt, scheduled); + ref_play_stream(stream); + } + tt->tree_added = true; + } + + stream->timer_thread = tt; + + _spin_unlock(&tt->tree_lock); +} + +// stream must be locked, started, and non-empty +// threads->tree_lock must be unlocked (one will be locked) +// lock order: stream lock first, thread->tree_lock second +// num_timer_threads must be >0 +static void play_stream_schedule_packet(struct play_stream *stream) { + struct timer_thread *tt; + unsigned int idx; + + // XXX check if already scheduled + _read_lock(&media_player_lock); + idx = atomic_fetch_add(1, &last_timer_thread_idx) % num_timer_threads; + tt = timer_threads[idx]; + _read_unlock(&media_player_lock); + + play_stream_schedule_packet_to_thread(stream, tt, false); + + wake_up_interruptible(&tt->queue); // XXX need to refcount tt? for shutdown/free race? +} + +static void play_stream_send_packet(struct play_stream *stream, struct play_stream_packet *packet) { + struct sk_buff *skb; + struct rtp_parsed rtp; + + skb = alloc_skb(packet->len + MAX_HEADER + MAX_SKB_TAIL_ROOM, GFP_KERNEL); + if (!skb) + return; // XXX log/count error? + + // reserve head room (L2/L3 header) and copy data in + skb_reserve(skb, MAX_HEADER); + + // RTP header + rtp.header_len = sizeof(*rtp.rtp_header); + rtp.rtp_header = skb_put(skb, sizeof(*rtp.rtp_header)); + *rtp.rtp_header = (struct rtp_header) { + .v_p_x_cc = 0x80, + .m_pt = stream->info.pt, + .seq_num = htons(stream->info.seq + packet->seq), + .timestamp = htonl(stream->info.ts + packet->ts), + .ssrc = stream->info.ssrc, + }; + + // payload + rtp.payload = skb_put(skb, packet->len); + memcpy(rtp.payload, packet->data, packet->len); + rtp.payload_len = packet->len; + + rtp.ok = 1; + rtp.rtcp = 0; + + // XXX add TOS + proxy_packet_srtp_encrypt(skb, &stream->encrypt, &stream->info.encrypt, &rtp, 0, &stream->info.ssrc_stats); + send_proxy_packet(skb, &stream->info.src_addr, &stream->info.dst_addr, 0, NULL); + + atomic64_inc(&stream->info.stats->packets); + atomic64_add(packet->len, &stream->info.stats->bytes); + atomic64_inc(&stream->info.iface_stats->out.packets); + atomic64_add(packet->len, &stream->info.iface_stats->out.bytes); +} + +static void free_play_stream(struct play_stream *s) { + //printk(KERN_WARNING "freeing play stream %p\n", s); + free_crypto_context(&s->encrypt); + if (s->packets) + unref_packet_stream(s->packets); + kfree(s); +} + +static void __unref_play_stream(struct play_stream *s) { + if (atomic_dec_and_test(&s->refcnt)) + free_play_stream(s); +} + +static int timer_worker(void *p) { + struct timer_thread *tt = p; + + //printk(KERN_WARNING "cpu %u running\n", smp_processor_id()); + while (!atomic_read(&tt->shutdown)) { + int64_t timer_scheduled; + struct play_stream *stream; + ktime_t now, packet_scheduled; + int64_t sleeptime_ns; + struct play_stream_packet *packet; + struct play_stream_packets *packets; + + //printk(KERN_WARNING "cpu %u (%p) loop enter\n", smp_processor_id(), tt); + + _spin_lock(&tt->tree_lock); + // grab and remove next scheduled stream, either from predetermined entry or from tree + stream = tt->scheduled; + if (!stream) { + // XXX combine lookup and removal into one operation + stream = btree_last64(&tt->tree, &timer_scheduled); + if (stream) + btree_remove64(&tt->tree, timer_scheduled); + } + else { + tt->scheduled = NULL; + tt->scheduled_at = 0; + } + + tt->tree_added = false; // we're up to date before unlock + _spin_unlock(&tt->tree_lock); + + sleeptime_ns = 500000000LL; // 0.5 seconds + if (stream) { + //printk(KERN_WARNING "cpu %u got stream\n", smp_processor_id()); + + now = ktime_get(); + + _spin_lock(&stream->lock); + + if (stream->table_id == -1) { + // we've been descheduled + _spin_unlock(&stream->lock); + unref_play_stream(stream); + continue; + } + + stream->timer_thread = NULL; + packet = stream->position; + packet_scheduled = play_stream_packet_time(stream, packet); + //printk(KERN_WARNING "next packet %p at %li, time now %li\n", packet, + //(long int) ktime_to_ns(packet_scheduled), + //(long int) ktime_to_ns(now)); + + if (ktime_after(now, packet_scheduled)) { + //printk(KERN_WARNING "cpu %u sending packet %p from stream %p now\n", + //smp_processor_id(), packet, stream); + + _spin_unlock(&stream->lock); + + //printk(KERN_WARNING "cpu %u sending packet %u now\n", tt->idx, packet->seq); + play_stream_send_packet(stream, packet); + + _spin_lock(&stream->lock); + + if (stream->table_id != -1) + play_stream_next_packet(stream); + else + stream->position = NULL; + + packets = NULL; + + if (stream->position) { + play_stream_schedule_packet_to_thread(stream, tt, false); + sleeptime_ns = 0; // loop and get next packet from tree + _spin_unlock(&stream->lock); + unref_play_stream(stream); + stream = NULL; + } + else { + // end of stream + if (!stream->info.remove_at_end) + _spin_unlock(&stream->lock); + else { + // remove it + end_of_stream(stream); + _spin_unlock(&stream->lock); + _write_lock(&media_player_lock); + if (play_streams[stream->idx] == stream) { + play_streams[stream->idx] = NULL; + unref_play_stream(stream); + } + // else log error? + _write_unlock(&media_player_lock); + } + unref_play_stream(stream); + stream = NULL; + } + } + else { + // figure out sleep time + int64_t ns_diff = ktime_to_ns(ktime_sub(packet_scheduled, now)); + //printk(KERN_WARNING "stream time diff %li ns\n", (long int) ns_diff); + //if (diff == 0 && ns_diff > 0) + //printk(KERN_WARNING "stream time diff %li ns %li jiffies\n", + //(long int) ns_diff, (long int) diff); + //if (ns_diff > 0) + //printk(KERN_WARNING "sleep time %li ms for packet %u on cpu %u\n", + //(long int) (ns_diff / 1000000LL), packet->seq, + //tt->idx); + // return packet to tree + play_stream_schedule_packet_to_thread(stream, tt, true); + _spin_unlock(&stream->lock); + sleeptime_ns = min(sleeptime_ns, ns_diff); + unref_play_stream(stream); + stream = NULL; + } + } + + if (sleeptime_ns > 0) { + ktime_t a, b, c; + int64_t c_ns; + //printk(KERN_WARNING "cpu %u sleep %li ms, slack %li ns\n", tt->idx, + //(long int) (sleeptime_ns / 1000000LL), + //(long int) (current->timer_slack_ns / 1000000LL)); + a = ktime_get(); + wait_event_interruptible_hrtimeout(tt->queue, atomic_read(&tt->shutdown) || tt->tree_added, + ktime_set(0, sleeptime_ns)); + b = ktime_get(); + c = ktime_sub(b, a); + c_ns = ktime_to_ns(c); + //printk(KERN_WARNING "cpu %u wanted sleep %li ms, actual sleep %li ms\n", tt->idx, + //(long int) (sleeptime_ns / 1000000LL), (long int) (c_ns / 1000000LL)); + } + //printk(KERN_WARNING "cpu %u awoken\n", smp_processor_id()); + } + + //printk(KERN_WARNING "cpu %u exiting\n", smp_processor_id()); + btree_destroy64(&tt->tree); + kfree(tt); + return 0; +} + +static struct timer_thread *launch_thread(unsigned int cpu) { + struct timer_thread *tt; + int ret; + //printk(KERN_WARNING "try to launch %u\n", cpu); + tt = kzalloc(sizeof(*tt), GFP_KERNEL); + if (!tt) + return ERR_PTR(-ENOMEM); + init_waitqueue_head(&tt->queue); + atomic_set(&tt->shutdown, 0); + ret = btree_init64(&tt->tree); + if (ret) { + btree_destroy64(&tt->tree); + kfree(tt); + return ERR_PTR(ret); + } + _spin_lock_init(&tt->tree_lock); + tt->idx = cpu; + tt->task = kthread_create_on_node(timer_worker, tt, cpu_to_node(cpu), "rtpengine_%u", cpu); + if (IS_ERR(tt->task)) { + int ret = PTR_ERR(tt->task); + btree_destroy64(&tt->tree); + kfree(tt); + return ERR_PTR(ret); + } + kthread_bind(tt->task, cpu); + wake_up_process(tt->task); + //printk(KERN_WARNING "cpu %u ok\n", cpu); + return tt; +} + +static int init_play_streams(unsigned int n_play_streams, unsigned int n_stream_packets) { + int ret = 0; + struct timer_thread **threads_new = NULL; + unsigned int new_num_threads = 0; + bool need_threads; + struct play_stream **new_play_streams, **old_play_streams = NULL; + struct play_stream_packets **new_stream_packets, **old_stream_packets = NULL; + unsigned int cpu; + + _write_lock(&media_player_lock); + + if (num_play_streams >= n_play_streams && num_stream_packets >= n_stream_packets) + goto out; + + need_threads = timer_threads == NULL; + + _write_unlock(&media_player_lock); + + //printk(KERN_WARNING "allocating for %u/%u -> %u/%u streams\n", + //num_play_streams, n_play_streams, + //num_stream_packets, n_stream_packets); + + ret = -ENOMEM; + new_play_streams = kzalloc(sizeof(*new_play_streams) * n_play_streams, GFP_KERNEL); + if (!new_play_streams) + goto err; + new_stream_packets = kzalloc(sizeof(*new_stream_packets) * n_stream_packets, GFP_KERNEL); + if (!new_stream_packets) + goto err; + + if (need_threads) { + ret = -ENXIO; + new_num_threads = num_online_cpus(); + if (new_num_threads == 0) + goto err; + + threads_new = kzalloc(sizeof(*threads_new) * new_num_threads, GFP_KERNEL); + if (!threads_new) + goto err; + + for (cpu = 0; cpu < num_online_cpus(); cpu++) { + threads_new[cpu] = launch_thread(cpu); + if (IS_ERR(threads_new[cpu])) { + ret = PTR_ERR(threads_new[cpu]); + threads_new[cpu] = NULL; + goto err; + } + } + } + + _write_lock(&media_player_lock); + + // check again + ret = 0; + if (num_play_streams >= n_play_streams && num_stream_packets >= n_stream_packets) + goto out; + + memcpy(new_play_streams, play_streams, sizeof(*play_streams) * num_play_streams); + num_play_streams = n_play_streams; + old_play_streams = play_streams; + play_streams = new_play_streams; + + memcpy(new_stream_packets, stream_packets, sizeof(*stream_packets) * num_stream_packets); + num_stream_packets = n_stream_packets; + old_stream_packets = stream_packets; + stream_packets = new_stream_packets; + + if (!timer_threads) { + timer_threads = threads_new; + num_timer_threads = new_num_threads; + new_num_threads = 0; + threads_new = NULL; + } + +out: + _write_unlock(&media_player_lock); +err: + shut_threads(threads_new, new_num_threads); + kfree(old_play_streams); + kfree(old_stream_packets); + return ret; +} + +static int get_packet_stream(struct rtpengine_table *t, unsigned int *num) { + struct play_stream_packets *new_stream; + unsigned int idx = -1; + unsigned int i; + + new_stream = kzalloc(sizeof(*new_stream), GFP_KERNEL); + if (!new_stream) + return -ENOMEM; + + INIT_LIST_HEAD(&new_stream->packets); + INIT_LIST_HEAD(&new_stream->table_entry); + _rwlock_init(&new_stream->lock); + new_stream->table_id = t->id; + atomic_set(&new_stream->refcnt, 1); + + for (i = 0; i < num_stream_packets; i++) { + _write_lock(&media_player_lock); + idx = atomic_fetch_add(1, &last_stream_packets_idx) % num_stream_packets; + if (stream_packets[idx]) { + idx = -1; + _write_unlock(&media_player_lock); + continue; + } + stream_packets[idx] = new_stream; + new_stream->idx = idx; + ref_packet_stream(new_stream); + _write_unlock(&media_player_lock); + break; + } + + if (idx == -1) { + kfree(new_stream); + return -EBUSY; + } + + _spin_lock(&t->player_lock); + list_add(&new_stream->table_entry, &t->packet_streams); + // hand over ref + new_stream = NULL; + t->num_packet_streams++; + // XXX race between adding to list and stop/free? + _spin_unlock(&t->player_lock); + + *num = idx; + return 0; +} + +static void free_play_stream_packet(struct play_stream_packet *p) { + //printk(KERN_WARNING "freeing stream packet %u\n", p->seq); + kfree(p->data); + kfree(p); +} + +static int play_stream_packet(const struct rtpengine_play_stream_packet_info *info, size_t len) { + const char *data = info->data; + struct play_stream_packets *stream; + int ret = 0; + struct play_stream_packet *packet = NULL, *last; + + //printk(KERN_WARNING "size %zu\n", len); + + packet = kzalloc(sizeof(*packet), GFP_KERNEL); + if (!packet) + return -ENOMEM; + + packet->len = len; + packet->data = kmalloc(len, GFP_KERNEL); + if (!packet) + goto out; + + memcpy(packet->data, data, len); + packet->delay = ms_to_ktime(info->delay_ms); + packet->ts = info->delay_ts; + packet->duration_ts = info->duration_ts; + //printk(KERN_WARNING "new packet %p, delay %ld us\n", packet, (long int) ktime_to_us(packet->delay)); + // XXX alloc skb + + _read_lock(&media_player_lock); + + ret = -ERANGE; + if (info->packet_stream_idx >= num_stream_packets) + goto out; + + stream = stream_packets[info->packet_stream_idx]; + ret = -ENOENT; + if (!stream) + goto out; + + _write_lock(&stream->lock); + + if (!list_empty(&stream->packets)) { + last = list_last_entry(&stream->packets, struct play_stream_packet, list); + if (ktime_after(last->delay, packet->delay)) { + _write_unlock(&stream->lock); + ret = -ELOOP; + goto out; + } + } + list_add_tail(&packet->list, &stream->packets); + packet->seq = stream->len; + stream->len++; + + _write_unlock(&stream->lock); + + packet = NULL; + ret = 0; +out: + _read_unlock(&media_player_lock); + if (packet) + free_play_stream_packet(packet); + return ret; +} + +static int play_stream(struct rtpengine_table *t, const struct rtpengine_play_stream_info *info, unsigned int *num) { + struct play_stream *play_stream; + struct play_stream_packets *packets = NULL; + int ret; + unsigned int idx = -1; + unsigned int i; + struct interface_stats_block *iface_stats; + struct stream_stats *stats; + struct ssrc_stats *ssrc_stats; + + if (!is_valid_address(&info->src_addr)) + return -EINVAL; + if (!is_valid_address(&info->dst_addr)) + return -EINVAL; + if (info->dst_addr.family != info->src_addr.family) + return -EINVAL; + if (validate_srtp(&info->encrypt)) + return -EINVAL; + + iface_stats = shm_map_resolve(info->iface_stats, sizeof(*iface_stats)); + if (!iface_stats) + return -EFAULT; + stats = shm_map_resolve(info->stats, sizeof(*stats)); + if (!stats) + return -EFAULT; + ssrc_stats = shm_map_resolve(info->ssrc_stats, sizeof(*ssrc_stats)); + if (!ssrc_stats) + return -EFAULT; + + ret = -ENOMEM; + play_stream = kzalloc(sizeof(*play_stream), GFP_KERNEL); + if (!play_stream) + goto out; + + INIT_LIST_HEAD(&play_stream->table_entry); + play_stream->info = *info; + play_stream->table_id = t->id; + atomic_set(&play_stream->refcnt, 1); + _spin_lock_init(&play_stream->lock); + play_stream->info.stats = stats; + play_stream->info.iface_stats = iface_stats; + play_stream->info.ssrc_stats = ssrc_stats; + + ret = 0; + + _read_lock(&media_player_lock); + + if (info->packet_stream_idx >= num_stream_packets) + ret = -ERANGE; + else { + packets = stream_packets[info->packet_stream_idx]; + if (!packets) + ret = -ENOENT; + else + ref_packet_stream(packets); + } + + _read_unlock(&media_player_lock); + + if (ret) + goto out; + + _read_lock(&packets->lock); + + ret = -ENXIO; + if (list_empty(&packets->packets)) { + _read_unlock(&packets->lock); + goto out; + } + + play_stream->packets = packets; + play_stream->position = list_first_entry(&packets->packets, struct play_stream_packet, list); + + _read_unlock(&packets->lock); + + packets = NULL; // ref handed over + + for (i = 0; i < num_play_streams; i++) { + _write_lock(&media_player_lock); + idx = atomic_fetch_add(1, &last_play_stream_idx) % num_play_streams; + if (play_streams[idx]) { + _write_unlock(&media_player_lock); + idx = -1; + continue; + } + play_streams[idx] = play_stream; + ref_play_stream(play_stream); + play_stream->idx = idx; + _write_unlock(&media_player_lock); + break; + } + + ret = -EBUSY; + if (idx == -1) + goto out; + + _spin_lock(&t->player_lock); + list_add(&play_stream->table_entry, &t->play_streams); + ref_play_stream(play_stream); + t->num_play_streams++; + // XXX race between adding to list and stop/free? + _spin_unlock(&t->player_lock); + + _spin_lock(&play_stream->lock); + + play_stream->start_time = ktime_get(); + crypto_context_init(&play_stream->encrypt, &info->encrypt); + ret = gen_rtp_session_keys(&play_stream->encrypt, &info->encrypt); + if (ret) { + _spin_unlock(&play_stream->lock); + goto out; + } + //printk(KERN_WARNING "start time %ld us\n", (long int) ktime_to_us(play_stream->start_time)); + + play_stream_schedule_packet(play_stream); + + _spin_unlock(&play_stream->lock); + + *num = idx; + ret = 0; + +out: + if (play_stream) + unref_play_stream(play_stream); + if (packets) + unref_packet_stream(packets); + return ret; +} + +// stream must be locked, reference must be held +static void end_of_stream(struct play_stream *stream) { + struct rtpengine_table *t; + + if (stream->table_id != -1 && !list_empty(&stream->table_entry)) { + t = get_table(stream->table_id); + if (t) { + //printk(KERN_WARNING "removing stream %p from table\n", stream); + _spin_lock(&t->player_lock); + list_del_init(&stream->table_entry); + t->num_play_streams--; + _spin_unlock(&t->player_lock); + table_put(t); + unref_play_stream(stream); + } + } + stream->table_id = -1; +} + +// stream lock is not held, reference must be held +static void do_stop_stream(struct play_stream *stream) { + struct timer_thread *tt; + struct play_stream *old_stream; + + //printk(KERN_WARNING "stop stream %p\n", stream); + + _spin_lock(&stream->lock); + + end_of_stream(stream); + + tt = stream->timer_thread; + stream->timer_thread = NULL; + + if (tt) { + _spin_lock(&tt->tree_lock); + + if (tt->scheduled == stream) { + //printk(KERN_WARNING "stream %p was scheduled\n", stream); + tt->scheduled = NULL; + unref_play_stream(stream); + } + else { + old_stream = btree_lookup64(&tt->tree, stream->tree_index); + if (old_stream == stream) { + //printk(KERN_WARNING "stream %p was in tree\n", stream); + btree_remove64(&tt->tree, stream->tree_index); + unref_play_stream(stream); + } + else { + //printk(KERN_ERR "stream %p not scheduled!\n", stream); + } + } + + _spin_unlock(&tt->tree_lock); + } + + _spin_unlock(&stream->lock); +} + +static int stop_stream(struct rtpengine_table *t, unsigned int num) { + struct play_stream *stream; + int ret; + + ret = 0; + + _write_lock(&media_player_lock); + + if (num >= num_play_streams) + ret = -ERANGE; + else { + stream = play_streams[num]; + if (!stream) + ret = -ENOENT; + else + play_streams[num] = NULL;; + } + + _write_unlock(&media_player_lock); + + if (ret) + return ret; + + do_stop_stream(stream); + + // check if stream was released, wait if it wasn't + _spin_lock(&stream->lock); + while (stream->timer_thread) { + _spin_unlock(&stream->lock); + cpu_relax(); + schedule(); + _spin_lock(&stream->lock); + } + _spin_unlock(&stream->lock); + + unref_play_stream(stream); + + return 0; +} + +static int cmd_free_packet_stream(struct rtpengine_table *t, unsigned int idx) { + struct play_stream_packets *stream = NULL; + int ret; + + _write_lock(&media_player_lock); + + ret = -ERANGE; + if (idx >= num_stream_packets) + goto out; + + stream = stream_packets[idx]; + ret = -ENOENT; + if (!stream) + goto out; + + // steal reference + stream_packets[idx] = NULL; + + ret = 0; + +out: + _write_unlock(&media_player_lock); + + if (!stream) + return ret; + + _write_lock(&stream->lock); + idx = stream->idx; + stream->table_id = -1; + _write_unlock(&stream->lock); + + if (idx != -1) { + _write_lock(&media_player_lock); + if (stream_packets[idx] == stream) { + stream_packets[idx] = NULL; + unref_packet_stream(stream); + } + _write_unlock(&media_player_lock); + } + + if (!list_empty(&stream->table_entry)) { + _spin_lock(&t->player_lock); + list_del_init(&stream->table_entry); + t->num_packet_streams--; + _spin_unlock(&t->player_lock); + unref_packet_stream(stream); + } + + unref_packet_stream(stream); + + return 0; +} static const size_t min_req_sizes[__REMG_LAST] = { @@ -3597,6 +4683,12 @@ static const size_t min_req_sizes[__REMG_LAST] = { [REMG_ADD_STREAM] = sizeof(struct rtpengine_command_add_stream), [REMG_DEL_STREAM] = sizeof(struct rtpengine_command_del_stream), [REMG_PACKET] = sizeof(struct rtpengine_command_packet), + [REMG_INIT_PLAY_STREAMS]= sizeof(struct rtpengine_command_init_play_streams), + [REMG_GET_PACKET_STREAM]= sizeof(struct rtpengine_command_get_packet_stream), + [REMG_PLAY_STREAM_PACKET]= sizeof(struct rtpengine_command_play_stream_packet), + [REMG_PLAY_STREAM] = sizeof(struct rtpengine_command_play_stream), + [REMG_STOP_STREAM] = sizeof(struct rtpengine_command_stop_stream), + [REMG_FREE_PACKET_STREAM]= sizeof(struct rtpengine_command_free_packet_stream), }; static const size_t max_req_sizes[__REMG_LAST] = { @@ -3609,6 +4701,12 @@ static const size_t max_req_sizes[__REMG_LAST] = { [REMG_ADD_STREAM] = sizeof(struct rtpengine_command_add_stream), [REMG_DEL_STREAM] = sizeof(struct rtpengine_command_del_stream), [REMG_PACKET] = sizeof(struct rtpengine_command_packet) + 65535, + [REMG_INIT_PLAY_STREAMS]= sizeof(struct rtpengine_command_init_play_streams), + [REMG_GET_PACKET_STREAM]= sizeof(struct rtpengine_command_get_packet_stream), + [REMG_PLAY_STREAM_PACKET]= sizeof(struct rtpengine_command_play_stream_packet) + 65535, + [REMG_PLAY_STREAM] = sizeof(struct rtpengine_command_play_stream), + [REMG_STOP_STREAM] = sizeof(struct rtpengine_command_stop_stream), + [REMG_FREE_PACKET_STREAM]= sizeof(struct rtpengine_command_free_packet_stream), }; static int rtpengine_init_table(struct rtpengine_table *t, struct rtpengine_init_info *init) { @@ -3647,6 +4745,12 @@ static inline ssize_t proc_control_read_write(struct file *file, char __user *ub struct rtpengine_command_add_stream *add_stream; struct rtpengine_command_del_stream *del_stream; struct rtpengine_command_packet *packet; + struct rtpengine_command_init_play_streams *init_play_streams; + struct rtpengine_command_get_packet_stream *get_packet_stream; + struct rtpengine_command_play_stream_packet *play_stream_packet; + struct rtpengine_command_play_stream *play_stream; + struct rtpengine_command_stop_stream *stop_stream; + struct rtpengine_command_free_packet_stream *free_packet_stream; char *storage; } msg; @@ -3737,6 +4841,36 @@ static inline ssize_t proc_control_read_write(struct file *file, char __user *ub err = stream_packet(t, &msg.packet->packet, buflen - sizeof(*msg.packet)); break; + case REMG_INIT_PLAY_STREAMS: + err = init_play_streams(msg.init_play_streams->num_play_streams, + msg.init_play_streams->num_packet_streams); + break; + + case REMG_GET_PACKET_STREAM: + err = -EINVAL; + if (writeable) + err = get_packet_stream(t, &msg.get_packet_stream->packet_stream_idx); + break; + + case REMG_PLAY_STREAM_PACKET: + err = play_stream_packet(&msg.play_stream_packet->play_stream_packet, + buflen - sizeof(*msg.play_stream_packet)); + break; + + case REMG_PLAY_STREAM: + err = -EINVAL; + if (writeable) + err = play_stream(t, &msg.play_stream->info, &msg.play_stream->play_idx); + break; + + case REMG_STOP_STREAM: + err = stop_stream(t, msg.stop_stream->play_idx); + break; + + case REMG_FREE_PACKET_STREAM: + err = cmd_free_packet_stream(t, msg.free_packet_stream->packet_stream_idx); + break; + default: printk(KERN_WARNING "xt_RTPENGINE unimplemented op %u\n", cmd); err = -EINVAL; @@ -4031,7 +5165,7 @@ error: static uint32_t rtp_packet_index(struct re_crypto_context *c, struct rtpengine_srtp *s, struct rtp_header *rtp, int ssrc_idx, - struct ssrc_stats *ssrc_stats[RTPE_NUM_SSRC_TRACKING]) + struct ssrc_stats **ssrc_stats) { uint16_t seq; uint32_t index; @@ -4889,13 +6023,29 @@ static void proxy_packet_output_rtcp(struct sk_buff *skb, struct rtpengine_outpu atomic_set(&o->output.ssrc_stats[ssrc_idx]->rtcp_seq, tmp_idx); } +static uint32_t proxy_packet_srtp_encrypt(struct sk_buff *skb, struct re_crypto_context *ctx, + struct rtpengine_srtp *srtp, + struct rtp_parsed *rtp, int ssrc_idx, + struct ssrc_stats **ssrc_stats) +{ + uint32_t pkt_idx; + unsigned int pllen; + + pkt_idx = rtp_packet_index(ctx, srtp, rtp->rtp_header, ssrc_idx, ssrc_stats); + pllen = rtp->payload_len; + srtp_encrypt(ctx, srtp, rtp, pkt_idx); + srtp_authenticate(ctx, srtp, rtp, pkt_idx); + skb_put(skb, rtp->payload_len - pllen); + + return pkt_idx; +} + static bool proxy_packet_output_rtXp(struct sk_buff *skb, struct rtpengine_output *o, int rtp_pt_idx, struct rtp_parsed *rtp, int ssrc_idx) { - unsigned int pllen; - uint32_t pkt_idx; int i; + uint32_t pkt_idx; if (!rtp->ok) { proxy_packet_output_rtcp(skb, o, rtp, ssrc_idx); @@ -4930,13 +6080,8 @@ static bool proxy_packet_output_rtXp(struct sk_buff *skb, struct rtpengine_outpu rtp->rtp_header->ssrc = o->output.ssrc_out[ssrc_idx]; } - // SRTP - pkt_idx = rtp_packet_index(&o->encrypt_rtp, &o->output.encrypt, rtp->rtp_header, ssrc_idx, - o->output.ssrc_stats); - pllen = rtp->payload_len; - srtp_encrypt(&o->encrypt_rtp, &o->output.encrypt, rtp, pkt_idx); - srtp_authenticate(&o->encrypt_rtp, &o->output.encrypt, rtp, pkt_idx); - skb_put(skb, rtp->payload_len - pllen); + pkt_idx = proxy_packet_srtp_encrypt(skb, &o->encrypt_rtp, &o->output.encrypt, + rtp, ssrc_idx, o->output.ssrc_stats); if (ssrc_idx >= 0 && o->output.ssrc_stats[ssrc_idx]) { atomic64_inc(&o->output.ssrc_stats[ssrc_idx]->packets); @@ -5442,9 +6587,6 @@ skip: } - - - static int check(const struct xt_tgchk_param *par) { const struct xt_rtpengine_info *pinfo = par->targinfo; @@ -5524,9 +6666,12 @@ static int __init init(void) { if (ret) goto fail; + _rwlock_init(&media_player_lock); + return 0; fail: + shut_all_threads(); clear_proc(&proc_control); clear_proc(&proc_list); clear_proc(&my_proc_root); @@ -5540,12 +6685,17 @@ static void __exit fini(void) { printk(KERN_NOTICE "Unregistering xt_RTPENGINE module\n"); xt_unregister_targets(xt_rtpengine_regs, ARRAY_SIZE(xt_rtpengine_regs)); + shut_all_threads(); clear_proc(&proc_control); clear_proc(&proc_list); clear_proc(&my_proc_root); auto_array_free(&streams); auto_array_free(&calls); + + // these should be empty + kfree(play_streams); + kfree(stream_packets); } module_init(init); diff --git a/kernel-module/xt_RTPENGINE.h b/kernel-module/xt_RTPENGINE.h index 22a602de8..d5c9e45df 100644 --- a/kernel-module/xt_RTPENGINE.h +++ b/kernel-module/xt_RTPENGINE.h @@ -169,6 +169,12 @@ enum rtpengine_command { REMG_DEL_STREAM, REMG_PACKET, REMG_DEL_TARGET, + REMG_INIT_PLAY_STREAMS, + REMG_GET_PACKET_STREAM, + REMG_PLAY_STREAM_PACKET, + REMG_PLAY_STREAM, + REMG_STOP_STREAM, + REMG_FREE_PACKET_STREAM, __REMG_LAST }; @@ -184,6 +190,30 @@ struct rtpengine_command_init { struct rtpengine_init_info init; }; +struct rtpengine_play_stream_info { + struct re_address src_addr; + struct re_address dst_addr; + unsigned char pt; + uint32_t ssrc; + uint32_t ts; // start TS + uint16_t seq; // start seq + struct rtpengine_srtp encrypt; + unsigned int packet_stream_idx; + struct interface_stats_block *iface_stats; // for egress stats + struct stream_stats *stats; // for egress stats + struct ssrc_stats *ssrc_stats; + int repeat; + int remove_at_end; +}; + +struct rtpengine_play_stream_packet_info { + unsigned int packet_stream_idx; + unsigned long delay_ms; // first packet = 0 + uint32_t delay_ts; // first packet = 0 + uint32_t duration_ts; + unsigned char data[]; +}; + struct rtpengine_command_add_target { enum rtpengine_command cmd; struct rtpengine_target_info target; @@ -224,5 +254,37 @@ struct rtpengine_command_packet { struct rtpengine_packet_info packet; }; +struct rtpengine_command_init_play_streams { + enum rtpengine_command cmd; + unsigned int num_packet_streams; + unsigned int num_play_streams; +}; + +struct rtpengine_command_get_packet_stream { + enum rtpengine_command cmd; + unsigned int packet_stream_idx; // output +}; + +struct rtpengine_command_play_stream_packet { + enum rtpengine_command cmd; + struct rtpengine_play_stream_packet_info play_stream_packet; +}; + +struct rtpengine_command_play_stream { + enum rtpengine_command cmd; + struct rtpengine_play_stream_info info; // input + unsigned int play_idx; // output +}; + +struct rtpengine_command_stop_stream { + enum rtpengine_command cmd; + unsigned int play_idx; +}; + +struct rtpengine_command_free_packet_stream { + enum rtpengine_command cmd; + unsigned int packet_stream_idx; +}; + #endif diff --git a/utils/.gitignore b/utils/.gitignore new file mode 100644 index 000000000..e0a9911f1 --- /dev/null +++ b/utils/.gitignore @@ -0,0 +1 @@ +kplay-test diff --git a/utils/kplay-test.c b/utils/kplay-test.c new file mode 100644 index 000000000..31a7a64af --- /dev/null +++ b/utils/kplay-test.c @@ -0,0 +1,320 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define atomic64 uint64_t +#include "../kernel-module/xt_RTPENGINE.h" + +int main() { + int fd = open("/proc/rtpengine/control", O_WRONLY); + assert(fd >= 0); + ssize_t ret = write(fd, "add 0\n", 6); + assert(ret == 6 || (ret == -1 && errno == EEXIST)); + close(fd); + + fd = open("/proc/rtpengine/0/control", O_RDWR); + assert(fd >= 0); + + struct rtpengine_command_init init = { .cmd = REMG_INIT }; + + init.init = (struct rtpengine_init_info) { + .last_cmd = __REMG_LAST, + .msg_size = { + [REMG_INIT] = sizeof(struct rtpengine_command_init), + [REMG_ADD_TARGET] = sizeof(struct rtpengine_command_add_target), + [REMG_ADD_DESTINATION] = sizeof(struct rtpengine_command_destination), + [REMG_ADD_CALL] = sizeof(struct rtpengine_command_add_call), + [REMG_DEL_CALL] = sizeof(struct rtpengine_command_del_call), + [REMG_ADD_STREAM] = sizeof(struct rtpengine_command_add_stream), + [REMG_DEL_STREAM] = sizeof(struct rtpengine_command_del_stream), + [REMG_PACKET] = sizeof(struct rtpengine_command_packet), + [REMG_INIT_PLAY_STREAMS] = sizeof(struct rtpengine_command_init_play_streams), + [REMG_GET_PACKET_STREAM] = sizeof(struct rtpengine_command_get_packet_stream), + [REMG_PLAY_STREAM_PACKET] = sizeof(struct rtpengine_command_play_stream_packet), + [REMG_PLAY_STREAM] = sizeof(struct rtpengine_command_play_stream), + [REMG_STOP_STREAM] = sizeof(struct rtpengine_command_stop_stream), + [REMG_FREE_PACKET_STREAM] = sizeof(struct rtpengine_command_free_packet_stream), + }, + }; + + ret = write(fd, &init, sizeof(init)); + assert(ret == sizeof(init)); + + struct rtpengine_command_init_play_streams ips = { + .cmd = REMG_INIT_PLAY_STREAMS, + .num_packet_streams = 100, + .num_play_streams = 1000, + }; + ret = write(fd, &ips, sizeof(ips)); + assert(ret == sizeof(ips)); + + struct rtpengine_command_get_packet_stream gps = { .cmd = REMG_GET_PACKET_STREAM }; + ret = read(fd, &gps, sizeof(gps)); + assert(ret == sizeof(gps)); + printf("packet stream idx %u\n", gps.packet_stream_idx); + + struct { + struct rtpengine_command_play_stream_packet psp; + char buf[160]; + } psp = { + .psp = { + .cmd = REMG_PLAY_STREAM_PACKET, + .play_stream_packet = { + .packet_stream_idx = gps.packet_stream_idx, + }, + }, + }; + + for (unsigned int i = 0; i < 256; i++) { + psp.psp.play_stream_packet.delay_ms = i * 20; + psp.psp.play_stream_packet.delay_ts = i * 160; + memset(psp.psp.play_stream_packet.data, i, sizeof(psp.buf)); + ret = write(fd, &psp, sizeof(psp)); + assert(ret == sizeof(psp)); + } + printf("packets ok\n"); + + struct rtpengine_command_play_stream ps = { + .cmd = REMG_PLAY_STREAM, + .info = { + .src_addr = { + .family = AF_INET, + .u = { + .ipv4 = inet_addr("192.168.1.102"), + }, + .port = 6666, + }, + .dst_addr = { + .family = AF_INET, + .u = { + .ipv4 = inet_addr("192.168.1.66"), + }, + .port = 9999, + }, + .pt = 8, + .ssrc = 0x12345678, + .ts = 76543210, + .seq = 5432, + .encrypt = { + .cipher = REC_NULL, + .hmac = REH_NULL, + }, + .packet_stream_idx = 999999, + }, + }; + ret = read(fd, &ps, sizeof(ps)); + assert(ret == -1 && errno == ERANGE); + + ps = (__typeof(ps)) { + .cmd = REMG_PLAY_STREAM, + .info = { + .src_addr = { + .family = AF_INET, + .u = { + .ipv4 = inet_addr("192.168.1.102"), + }, + .port = 6666, + }, + .dst_addr = { + .family = AF_INET, + .u = { + .ipv4 = inet_addr("192.168.1.66"), + }, + .port = 9999, + }, + .pt = 8, + .ssrc = 0x12345678, + .ts = 76543210, + .seq = 5432, + .encrypt = { + .cipher = REC_NULL, + .hmac = REH_NULL, + }, + .packet_stream_idx = gps.packet_stream_idx + 1, + }, + }; + ret = read(fd, &ps, sizeof(ps)); + assert(ret == -1 && errno == ENOENT); + + ps = (__typeof(ps)) { + .cmd = REMG_PLAY_STREAM, + .info = { + .src_addr = { + .family = AF_INET, + .u = { + .ipv4 = inet_addr("192.168.1.102"), + }, + .port = 6666, + }, + .dst_addr = { + .family = AF_INET, + .u = { + .ipv4 = inet_addr("192.168.1.66"), + }, + .port = 9999, + }, + .pt = 8, + .ssrc = 0x12345678, + .ts = 76543210, + .seq = 5432, + .encrypt = { + .cipher = REC_NULL, + .hmac = REH_NULL, + }, + .packet_stream_idx = gps.packet_stream_idx, + .repeat = 3, + .remove_at_end = true, + }, + }; + ret = read(fd, &ps, sizeof(ps)); + assert(ret == sizeof(ps)); + printf("play stream idx %u\n", ps.play_idx); + + struct rtpengine_command_free_packet_stream fps = { + .cmd = REMG_FREE_PACKET_STREAM, + .packet_stream_idx = 9999999, + }; + ret = write(fd, &fps, sizeof(fps)); + assert(ret == -1 && errno == ERANGE); + printf("ok\n"); + + fps = (__typeof(fps)) { + .cmd = REMG_FREE_PACKET_STREAM, + .packet_stream_idx = gps.packet_stream_idx + 1, + }; + ret = write(fd, &fps, sizeof(fps)); + assert(ret == -1 && errno == ENOENT); + printf("ok\n"); + +// test: remove while in use +// fps = (__typeof(fps)) { +// .cmd = REMG_FREE_PACKET_STREAM, +// .packet_stream_idx = gps.packet_stream_idx, +// }; +// ret = write(fd, &fps, sizeof(fps)); +// assert(ret == sizeof(fps)); +// printf("ok\n"); + + printf("sleep\n"); + sleep(20); + + struct rtpengine_command_stop_stream ss = { + .cmd = REMG_STOP_STREAM, + .play_idx = ps.play_idx, + }; + ret = read(fd, &ss, sizeof(ss)); + assert(ret == -1 && errno == ENOENT); + + ps = (__typeof(ps)) { + .cmd = REMG_PLAY_STREAM, + .info = { + .src_addr = { + .family = AF_INET, + .u = { + .ipv4 = inet_addr("192.168.1.102"), + }, + .port = 6666, + }, + .dst_addr = { + .family = AF_INET, + .u = { + .ipv4 = inet_addr("192.168.1.66"), + }, + .port = 9999, + }, + .pt = 8, + .ssrc = 0x12345678, + .ts = 76543210, + .seq = 5432, + .encrypt = { + .cipher = REC_NULL, + .hmac = REH_NULL, + }, + .packet_stream_idx = gps.packet_stream_idx, + }, + }; + ret = read(fd, &ps, sizeof(ps)); + assert(ret == sizeof(ps)); + printf("play stream idx %u\n", ps.play_idx); + + printf("sleep\n"); + sleep(2); + +// test: remove while in use +// fps = (__typeof(fps)) { +// .cmd = REMG_FREE_PACKET_STREAM, +// .packet_stream_idx = gps.packet_stream_idx, +// }; +// ret = write(fd, &fps, sizeof(fps)); +// assert(ret == -1 && errno == EBUSY); + + ss = (__typeof(ss)) { + .cmd = REMG_STOP_STREAM, + .play_idx = 999999, + }; + ret = read(fd, &ss, sizeof(ss)); + assert(ret == -1 && errno == ERANGE); + + ss = (__typeof(ss)) { + .cmd = REMG_STOP_STREAM, + .play_idx = ps.play_idx + 1, + }; + ret = read(fd, &ss, sizeof(ss)); + assert(ret == -1 && errno == ENOENT); + + ss = (__typeof(ss)) { + .cmd = REMG_STOP_STREAM, + .play_idx = ps.play_idx, + }; + ret = read(fd, &ss, sizeof(ss)); + assert(ret == sizeof(ss)); + printf("stop ok\n"); + + ss = (__typeof(ss)) { + .cmd = REMG_STOP_STREAM, + .play_idx = ps.play_idx, + }; + ret = read(fd, &ss, sizeof(ss)); + assert(ret == -1 && errno == ENOENT); + + fps = (__typeof(fps)) { + .cmd = REMG_FREE_PACKET_STREAM, + .packet_stream_idx = gps.packet_stream_idx, + }; + ret = write(fd, &fps, sizeof(fps)); + printf("%zi %s\n", ret, strerror(errno)); + assert(ret == sizeof(fps)); + printf("free ok\n"); + + sleep(3); + + fps = (__typeof(fps)) { + .cmd = REMG_FREE_PACKET_STREAM, + .packet_stream_idx = gps.packet_stream_idx, + }; + ret = write(fd, &fps, sizeof(fps)); + assert(ret == -1 && errno == ENOENT); + + sleep(3); + + printf("closing fd\n"); + close(fd); + + sleep(3); + + fd = open("/proc/rtpengine/control", O_WRONLY); + assert(fd >= 0); + ret = write(fd, "del 0\n", 6); + assert(ret == 6); + close(fd); + + return 0; +} diff --git a/utils/kplay-test2.c b/utils/kplay-test2.c new file mode 100644 index 000000000..3d96c0aa2 --- /dev/null +++ b/utils/kplay-test2.c @@ -0,0 +1,166 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define atomic64 uint64_t +#include "../kernel-module/xt_RTPENGINE.h" + +#define PAGE_SIZE 4096 + +int main() { + int fd = open("/proc/rtpengine/control", O_WRONLY); + assert(fd >= 0); + ssize_t ret = write(fd, "add 0\n", 6); + assert(ret == 6 || (ret == -1 && errno == EEXIST)); + close(fd); + + fd = open("/proc/rtpengine/0/control", O_RDWR); + assert(fd >= 0); + + struct global_stats_counter *rtpe_stats = mmap(NULL, PAGE_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + assert(rtpe_stats != NULL && rtpe_stats != MAP_FAILED); + + struct rtpengine_command_init init = { .cmd = REMG_INIT }; + + init.init = (struct rtpengine_init_info) { + .last_cmd = __REMG_LAST, + .msg_size = { + [REMG_INIT] = sizeof(struct rtpengine_command_init), + [REMG_ADD_TARGET] = sizeof(struct rtpengine_command_add_target), + [REMG_DEL_TARGET] = sizeof(struct rtpengine_command_del_target), + [REMG_ADD_DESTINATION] = sizeof(struct rtpengine_command_destination), + [REMG_ADD_CALL] = sizeof(struct rtpengine_command_add_call), + [REMG_DEL_CALL] = sizeof(struct rtpengine_command_del_call), + [REMG_ADD_STREAM] = sizeof(struct rtpengine_command_add_stream), + [REMG_DEL_STREAM] = sizeof(struct rtpengine_command_del_stream), + [REMG_PACKET] = sizeof(struct rtpengine_command_packet), + [REMG_INIT_PLAY_STREAMS] = sizeof(struct rtpengine_command_init_play_streams), + [REMG_GET_PACKET_STREAM] = sizeof(struct rtpengine_command_get_packet_stream), + [REMG_PLAY_STREAM_PACKET] = sizeof(struct rtpengine_command_play_stream_packet), + [REMG_PLAY_STREAM] = sizeof(struct rtpengine_command_play_stream), + [REMG_STOP_STREAM] = sizeof(struct rtpengine_command_stop_stream), + [REMG_FREE_PACKET_STREAM] = sizeof(struct rtpengine_command_free_packet_stream), + }, + .rtpe_stats = rtpe_stats, + }; + + ret = write(fd, &init, sizeof(init)); + assert(ret == sizeof(init)); + + struct rtpengine_command_init_play_streams ips = { + .cmd = REMG_INIT_PLAY_STREAMS, + .num_packet_streams = 100, + .num_play_streams = 40960, + }; + ret = write(fd, &ips, sizeof(ips)); + assert(ret == sizeof(ips)); + + struct rtpengine_command_get_packet_stream gps = { .cmd = REMG_GET_PACKET_STREAM }; + ret = read(fd, &gps, sizeof(gps)); + assert(ret == sizeof(gps)); + printf("packet stream idx %u\n", gps.packet_stream_idx); + + struct { + struct rtpengine_command_play_stream_packet psp; + char buf[160]; + } psp = { + .psp = { + .cmd = REMG_PLAY_STREAM_PACKET, + .play_stream_packet = { + .packet_stream_idx = gps.packet_stream_idx, + }, + }, + }; + + for (unsigned int i = 0; i < 256; i++) { + psp.psp.play_stream_packet.delay_ms = i * 20; + psp.psp.play_stream_packet.delay_ts = i * 160; + memset(psp.psp.play_stream_packet.data, i, sizeof(psp.buf)); + ret = write(fd, &psp, sizeof(psp)); + assert(ret == sizeof(psp)); + } + printf("packets ok\n"); + + unsigned play_idx[4096]; + const unsigned int num_plays = sizeof(play_idx)/sizeof(*play_idx); + + struct { + struct interface_stats_block iface_stats[num_plays]; + struct stream_stats stream_stats[num_plays]; + struct ssrc_stats ssrc_stats[num_plays]; + } *all_stats; + const unsigned int map_size = PAGE_SIZE * 512; + assert(sizeof(*all_stats) <= map_size); + + all_stats = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + assert(all_stats != NULL && all_stats != MAP_FAILED); + + for (unsigned int i = 0; i < num_plays; i++) { + struct rtpengine_command_play_stream ps = { + .cmd = REMG_PLAY_STREAM, + .info = { + .src_addr = { + .family = AF_INET, + .u = { + .ipv4 = inet_addr("192.168.1.102"), + }, + .port = 6666 + i, + }, + .dst_addr = { + .family = AF_INET, + .u = { + .ipv4 = inet_addr("192.168.1.66"), + }, + .port = 9999, + }, + .pt = 8, + .ssrc = 0x12345678 + i, + .ts = 76543210 + i, + .seq = 5432 + i, + .encrypt = { + .cipher = REC_NULL, + .hmac = REH_NULL, + }, + .packet_stream_idx = gps.packet_stream_idx, + .repeat = 50, + .remove_at_end = false, + .iface_stats = &all_stats->iface_stats[i], + .stats = &all_stats->stream_stats[i], + .ssrc_stats = &all_stats->ssrc_stats[i], + }, + }; + ret = read(fd, &ps, sizeof(ps)); + assert(ret == sizeof(ps)); + printf("play stream idx %u\n", ps.play_idx); + play_idx[i] = ps.play_idx; + + usleep(50000); + } + + printf("sleep\n"); + sleep(350); + + printf("close fd, sleep\n"); + sleep(10); + close(fd); + munmap(rtpe_stats, PAGE_SIZE); + munmap(all_stats, map_size); + + printf("del table\n"); + fd = open("/proc/rtpengine/control", O_WRONLY); + assert(fd >= 0); + ret = write(fd, "del 0\n", 6); + assert(ret == 6); + close(fd); + + return 0; +}