From 440a1f8a4e423cd1d1d35d8619ef1214d222252f Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 13 Sep 2017 11:01:03 -0400 Subject: [PATCH] delay proc stream/call file deletion until all references are cleared fixes #384 Change-Id: I1c94027644a18a280077ba00d023c28070d2c9d7 --- kernel-module/xt_RTPENGINE.c | 113 ++++++++++++++++++++++++++--------- recording-daemon/stream.c | 2 + 2 files changed, 87 insertions(+), 28 deletions(-) diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index c2e09ff54..d79477d49 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -316,6 +316,7 @@ struct re_call { struct rtpengine_call_info info; unsigned int table_id; u32 hash_bucket; + int deleted; /* protected by calls.lock */ struct proc_dir_entry *root; @@ -338,6 +339,7 @@ struct re_stream { u32 hash_bucket; struct proc_dir_entry *file; + struct re_call *call; /* holds a reference */ struct list_head call_entry; /* protected by streams.lock */ struct hlist_node streams_hash_entry; @@ -345,8 +347,9 @@ struct re_stream { spinlock_t packet_list_lock; struct list_head packet_list; unsigned int list_count; - wait_queue_head_t wq; - int eof; + wait_queue_head_t read_wq; + wait_queue_head_t close_wq; + int eof; /* protected by packet_list_lock */ }; #define RE_HASH_BITS 8 /* make configurable? */ @@ -653,7 +656,11 @@ static struct rtpengine_table *new_table(void) { -#define ref_get(o) atomic_inc(&(o)->refcnt) +static inline void __ref_get(void *p, atomic_t *refcnt) { + DBG("ref_get(%p) - refcnt is %u\n", p, atomic_read(refcnt)); + atomic_inc(refcnt); +} +#define ref_get(o) __ref_get(o, &(o)->refcnt) @@ -917,23 +924,34 @@ static void clear_stream_packets(struct re_stream *stream) { } } static void stream_put(struct re_stream *stream) { - DBG("stream_put()\n"); + DBG("stream_put(%p) - refcnt is %u\n", + stream, + stream ? atomic_read(&stream->refcnt) : (unsigned) -1); if (!stream) return; - if (!atomic_dec_and_test(&stream->refcnt)) + if (!atomic_dec_and_test(&stream->refcnt)) { + /* if this is an open file being closed and there's a del_stream() + * waiting for us, we need to wake up the sleeping del_stream() */ + wake_up_interruptible(&stream->close_wq); return; + } DBG("Freeing stream object\n"); clear_stream_packets(stream); clear_proc(&stream->file); + if (stream->call) + call_put(stream->call); + kfree(stream); } static void call_put(struct re_call *call) { - DBG("call_put()\n"); + DBG("call_put(%p) - refcnt is %u\n", + call, + call ? atomic_read(&call->refcnt) : (unsigned) -1); if (!call) return; @@ -946,6 +964,9 @@ static void call_put(struct re_call *call) { if (!list_empty(&call->streams)) panic("BUG! streams list not empty in call"); + DBG("clearing call proc files\n"); + clear_proc(&call->root); + kfree(call); } @@ -2400,6 +2421,8 @@ static struct re_call *get_call(struct rtpengine_table *table, unsigned int idx) return NULL; if (table && ret->table_id != table->id) return NULL; + if (ret->deleted) + return NULL; return ret; } /* handles the locking (read) and reffing */ @@ -2586,15 +2609,14 @@ static void del_call(struct re_call *call, struct rtpengine_table *table) { _w_lock(&calls.lock, flags); - if (!list_empty(&call->table_entry)) { - list_del_init(&call->table_entry); + if (call->deleted) { + /* already doing this */ + _w_unlock(&calls.lock, flags); call_put(call); + return; } - if (calls.array[call->info.call_idx] == call) { - auto_array_clear_index(&calls, call->info.call_idx); - call_put(call); - } + call->deleted = 1; _w_unlock(&calls.lock, flags); @@ -2610,9 +2632,6 @@ static void del_call(struct re_call *call, struct rtpengine_table *table) { } _w_unlock(&streams.lock, flags); - DBG("clearing call proc files\n"); - clear_proc(&call->root); - DBG("locking table's call hash\n"); spin_lock_irqsave(&table->calls_hash_lock[call->hash_bucket], flags); if (!hlist_unhashed(&call->calls_hash_entry)) { @@ -2621,6 +2640,20 @@ static void del_call(struct re_call *call, struct rtpengine_table *table) { } spin_unlock_irqrestore(&table->calls_hash_lock[call->hash_bucket], flags); + _w_lock(&calls.lock, flags); + + if (!list_empty(&call->table_entry)) { + list_del_init(&call->table_entry); + call_put(call); + } + + if (calls.array[call->info.call_idx] == call) { + auto_array_clear_index(&calls, call->info.call_idx); + call_put(call); + } + + _w_unlock(&calls.lock, flags); + DBG("del_call() done, releasing ref\n"); call_put(call); /* might be the last ref */ } @@ -2665,7 +2698,8 @@ static int table_new_stream(struct rtpengine_table *table, struct rtpengine_stre atomic_set(&stream->refcnt, 1); INIT_LIST_HEAD(&stream->packet_list); spin_lock_init(&stream->packet_list_lock); - init_waitqueue_head(&stream->wq); + init_waitqueue_head(&stream->read_wq); + init_waitqueue_head(&stream->close_wq); /* check for name collisions */ @@ -2715,6 +2749,9 @@ not_found: list_add(&stream->call_entry, &call->streams); /* new ref here */ ref_get(stream); + stream->call = call; + ref_get(call); + _w_unlock(&streams.lock, flags); /* proc_ functions may sleep, so this must be done outside of the lock */ @@ -2769,13 +2806,9 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table) clear_stream_packets(stream); DBG("stream is finished (EOF), waking up threads\n"); - wake_up_interruptible(&stream->wq); + wake_up_interruptible(&stream->read_wq); /* sleeping readers will now close files */ - DBG("clearing stream proc file\n"); - /* this blocks until the files have been closed */ - clear_proc(&stream->file); - DBG("clearing stream from streams_hash\n"); spin_lock_irqsave(&table->streams_hash_lock[stream->hash_bucket], flags); if (!hlist_unhashed(&stream->streams_hash_entry)) { @@ -2790,15 +2823,30 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table) list_del_init(&stream->call_entry); stream_put(stream); } + _w_unlock(&streams.lock, flags); + + /* At this point, there should only be 2 references left: ours, and the entry in + * the "streams" array. Any other references are open files and we must wait until + * they're closed. There can be no new open file references as the stream is set + * to eof. */ + DBG("del_stream() waiting for other refs\n"); + while (1) { + if (wait_event_interruptible(stream->close_wq, atomic_read(&stream->refcnt) == 2) == 0) + break; + } + + DBG("clearing stream's stream_idx entry\n"); + _w_lock(&streams.lock, flags); if (streams.array[stream->info.stream_idx] == stream) { - DBG("clearing stream's stream_idx entry\n"); auto_array_clear_index(&streams, stream->info.stream_idx); - stream_put(stream); + stream_put(stream); /* down to 1 ref */ } + else + printk(KERN_WARNING "BUG in del_stream with streams.array\n"); _w_unlock(&streams.lock, flags); - DBG("del_stream() done, releasing ref\n"); - stream_put(stream); /* might be the last ref */ + DBG("del_stream() releasing last ref\n"); + stream_put(stream); } static int table_del_stream(struct rtpengine_table *table, const struct rtpengine_stream_info *info) { @@ -2855,7 +2903,7 @@ static ssize_t proc_stream_read(struct file *f, char __user *b, size_t l, loff_t goto out; DBG("going to sleep\n"); ret = -ERESTARTSYS; - if (wait_event_interruptible(stream->wq, !list_empty(&stream->packet_list) || stream->eof)) + if (wait_event_interruptible(stream->read_wq, !list_empty(&stream->packet_list) || stream->eof)) goto out; DBG("awakened\n"); spin_lock_irqsave(&stream->packet_list_lock, flags); @@ -2925,7 +2973,7 @@ static unsigned int proc_stream_poll(struct file *f, struct poll_table_struct *p spin_unlock_irqrestore(&stream->packet_list_lock, flags); - poll_wait(f, &stream->wq, p); + poll_wait(f, &stream->read_wq, p); stream_put(stream); @@ -2936,6 +2984,7 @@ static int proc_stream_open(struct inode *i, struct file *f) { int err; unsigned int stream_idx = (unsigned int) (unsigned long) PDE_DATA(f->f_path.dentry->d_inode); struct re_stream *stream; + unsigned long flags; DBG("entering proc_stream_open()\n"); @@ -2946,6 +2995,14 @@ static int proc_stream_open(struct inode *i, struct file *f) { if (!stream) return -EIO; + spin_lock_irqsave(&stream->packet_list_lock, flags); + if (stream->eof) { + spin_unlock_irqrestore(&stream->packet_list_lock, flags); + stream_put(stream); + return -ETXTBSY; + } + spin_unlock_irqrestore(&stream->packet_list_lock, flags); + return 0; } @@ -3004,7 +3061,7 @@ static void add_stream_packet(struct re_stream *stream, struct re_stream_packet DBG("stream's packet list lock is unlocked, now awakening processes\n"); - wake_up_interruptible(&stream->wq); + wake_up_interruptible(&stream->read_wq); while (!list_empty(&delete_list)) { packet = list_first_entry(&delete_list, struct re_stream_packet, list_entry); diff --git a/recording-daemon/stream.c b/recording-daemon/stream.c index 0df86ca55..62f023b9e 100644 --- a/recording-daemon/stream.c +++ b/recording-daemon/stream.c @@ -58,6 +58,8 @@ static void stream_handler(handler_t *handler) { goto out; } else if (ret < 0) { + if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) + goto out; ilog(LOG_INFO, "Read error on stream %s: %s", stream->name, strerror(errno)); stream_close(stream); goto out;