diff --git a/kernel-module/nft_rtpengine.c b/kernel-module/nft_rtpengine.c index c0382dd98..db251f327 100644 --- a/kernel-module/nft_rtpengine.c +++ b/kernel-module/nft_rtpengine.c @@ -427,7 +427,7 @@ struct re_stream { unsigned int list_count; wait_queue_head_t read_wq; wait_queue_head_t close_wq; - int eof; /* protected by packet_list_lock */ + atomic_t eof; }; struct re_shm { @@ -3046,6 +3046,9 @@ static struct re_stream *get_stream(struct re_call *call, unsigned int idx) { return NULL; if (call && ret->info.idx.call_idx != call->info.call_idx) return NULL; + if (atomic_read(&ret->eof)) + return NULL; + return ret; } /* handles the locking (read) and reffing */ @@ -3178,6 +3181,7 @@ out: return err; } + /* must be called lock-free */ static void del_call(struct re_call *call, struct rtpengine_table *table) { struct re_stream *stream; @@ -3205,9 +3209,9 @@ static void del_call(struct re_call *call, struct rtpengine_table *table) { _w_lock(&streams.lock, flags); while (!list_empty(&call->streams)) { stream = list_first_entry(&call->streams, struct re_stream, call_entry); - ref_get(stream); + list_del_init(&stream->call_entry); // remove from list and steal reference _w_unlock(&streams.lock, flags); - del_stream(stream, table); /* removes it from this list */ + del_stream(stream, table); DBG("re-locking streams.lock\n"); _w_lock(&streams.lock, flags); } @@ -3362,21 +3366,13 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table) DBG("del_stream()\n"); - DBG("locking stream's packet list lock\n"); - spin_lock_irqsave(&stream->packet_list_lock, flags); - - if (stream->eof) { - /* already done this */ - spin_unlock_irqrestore(&stream->packet_list_lock, flags); + if (atomic_cmpxchg(&stream->eof, 0, 1)) { + // someone else already doing this DBG("stream is EOF\n"); stream_put(stream); return; } - stream->eof = 1; - - spin_unlock_irqrestore(&stream->packet_list_lock, flags); - clear_stream_packets(stream); DBG("stream is finished (EOF), waking up threads\n"); @@ -3399,15 +3395,6 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table) } _w_unlock(&streams.lock, flags); - /* At this point, there should only be 2 references left: ours, and the entry in - * the "streams" array. Any other references are open files and we must wait until - * they're closed. There can be no new open file references as the stream is set - * to eof. */ - DBG("del_stream() waiting for other refs\n"); - while (atomic_read(&stream->refcnt) != 2) - wait_event_interruptible_timeout(stream->close_wq, atomic_read(&stream->refcnt) == 2, HZ / 10); - - DBG("clearing stream's stream_idx entry\n"); _w_lock(&streams.lock, flags); if (streams.array[stream->info.idx.stream_idx] == stream) { auto_array_clear_index(&streams, stream->info.idx.stream_idx); @@ -3417,6 +3404,12 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table) printk(KERN_WARNING "BUG in del_stream with streams.array\n"); _w_unlock(&streams.lock, flags); + // wait until we're the last ref. this waits until all open files are closed. + // no now refs can be obtained due to eof == 1 + DBG("del_stream() waiting for other refs\n"); + while (atomic_read(&stream->refcnt) != 1) + wait_event_interruptible_timeout(stream->close_wq, atomic_read(&stream->refcnt) == 1, HZ / 10); + DBG("del_stream() releasing last ref\n"); stream_put(stream); } @@ -3471,7 +3464,7 @@ static ssize_t proc_stream_read(struct file *f, char __user *b, size_t l, loff_t DBG("locking stream's packet list lock\n"); spin_lock_irqsave(&stream->packet_list_lock, flags); - while (list_empty(&stream->packet_list) && !stream->eof) { + while (list_empty(&stream->packet_list) && !atomic_read(&stream->eof)) { spin_unlock_irqrestore(&stream->packet_list_lock, flags); DBG("list is empty\n"); ret = -EAGAIN; @@ -3479,14 +3472,15 @@ static ssize_t proc_stream_read(struct file *f, char __user *b, size_t l, loff_t goto out; DBG("going to sleep\n"); ret = -ERESTARTSYS; - if (wait_event_interruptible_timeout(stream->read_wq, !list_empty(&stream->packet_list) || stream->eof, HZ / 10)) + if (wait_event_interruptible_timeout(stream->read_wq, !list_empty(&stream->packet_list) + || atomic_read(&stream->eof), HZ / 10)) goto out; DBG("awakened\n"); spin_lock_irqsave(&stream->packet_list_lock, flags); } ret = 0; - if (stream->eof) { + if (atomic_read(&stream->eof)) { DBG("eof\n"); spin_unlock_irqrestore(&stream->packet_list_lock, flags); goto out; @@ -3575,7 +3569,7 @@ static unsigned int proc_stream_poll(struct file *f, struct poll_table_struct *p if (!list_empty(&stream->packet_list)) ret |= POLLIN | POLLRDNORM; - if (stream->eof) + if (atomic_read(&stream->eof)) ret |= POLLIN | POLLRDNORM | POLLHUP | POLLRDHUP; DBG("returning from proc_stream_poll()\n"); @@ -3591,7 +3585,6 @@ static int proc_stream_open(struct inode *i, struct file *f) { int err; unsigned int stream_idx = (unsigned int) (unsigned long) PDE_DATA(f->f_path.dentry->d_inode); struct re_stream *stream; - unsigned long flags; DBG("entering proc_stream_open()\n"); @@ -3602,13 +3595,10 @@ static int proc_stream_open(struct inode *i, struct file *f) { if (!stream) return -EIO; - spin_lock_irqsave(&stream->packet_list_lock, flags); - if (stream->eof) { - spin_unlock_irqrestore(&stream->packet_list_lock, flags); + if (atomic_read(&stream->eof)) { stream_put(stream); return -ETXTBSY; } - spin_unlock_irqrestore(&stream->packet_list_lock, flags); return 0; } @@ -3646,7 +3636,7 @@ static void add_stream_packet(struct re_stream *stream, struct re_stream_packet spin_lock_irqsave(&stream->packet_list_lock, flags); err = 0; - if (stream->eof) + if (atomic_read(&stream->eof)) goto err; /* we accept, but ignore/discard */ DBG("adding packet to queue\n");