MT#55283 make eof flag atomic

Change-Id: I0a76772c92bf999d1356dc1b91670ddc7583070a
pull/2090/head
Richard Fuchs 2 months ago
parent b1aec46de8
commit 11dea691e8

@ -427,7 +427,7 @@ struct re_stream {
unsigned int list_count;
wait_queue_head_t read_wq;
wait_queue_head_t close_wq;
int eof; /* protected by packet_list_lock */
atomic_t eof;
};
struct re_shm {
@ -3046,6 +3046,9 @@ static struct re_stream *get_stream(struct re_call *call, unsigned int idx) {
return NULL;
if (call && ret->info.idx.call_idx != call->info.call_idx)
return NULL;
if (atomic_read(&ret->eof))
return NULL;
return ret;
}
/* handles the locking (read) and reffing */
@ -3178,6 +3181,7 @@ out:
return err;
}
/* must be called lock-free */
static void del_call(struct re_call *call, struct rtpengine_table *table) {
struct re_stream *stream;
@ -3205,9 +3209,9 @@ static void del_call(struct re_call *call, struct rtpengine_table *table) {
_w_lock(&streams.lock, flags);
while (!list_empty(&call->streams)) {
stream = list_first_entry(&call->streams, struct re_stream, call_entry);
ref_get(stream);
list_del_init(&stream->call_entry); // remove from list and steal reference
_w_unlock(&streams.lock, flags);
del_stream(stream, table); /* removes it from this list */
del_stream(stream, table);
DBG("re-locking streams.lock\n");
_w_lock(&streams.lock, flags);
}
@ -3362,21 +3366,13 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table)
DBG("del_stream()\n");
DBG("locking stream's packet list lock\n");
spin_lock_irqsave(&stream->packet_list_lock, flags);
if (stream->eof) {
/* already done this */
spin_unlock_irqrestore(&stream->packet_list_lock, flags);
if (atomic_cmpxchg(&stream->eof, 0, 1)) {
// someone else already doing this
DBG("stream is EOF\n");
stream_put(stream);
return;
}
stream->eof = 1;
spin_unlock_irqrestore(&stream->packet_list_lock, flags);
clear_stream_packets(stream);
DBG("stream is finished (EOF), waking up threads\n");
@ -3399,15 +3395,6 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table)
}
_w_unlock(&streams.lock, flags);
/* At this point, there should only be 2 references left: ours, and the entry in
* the "streams" array. Any other references are open files and we must wait until
* they're closed. There can be no new open file references as the stream is set
* to eof. */
DBG("del_stream() waiting for other refs\n");
while (atomic_read(&stream->refcnt) != 2)
wait_event_interruptible_timeout(stream->close_wq, atomic_read(&stream->refcnt) == 2, HZ / 10);
DBG("clearing stream's stream_idx entry\n");
_w_lock(&streams.lock, flags);
if (streams.array[stream->info.idx.stream_idx] == stream) {
auto_array_clear_index(&streams, stream->info.idx.stream_idx);
@ -3417,6 +3404,12 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table)
printk(KERN_WARNING "BUG in del_stream with streams.array\n");
_w_unlock(&streams.lock, flags);
// wait until we're the last ref. this waits until all open files are closed.
// no now refs can be obtained due to eof == 1
DBG("del_stream() waiting for other refs\n");
while (atomic_read(&stream->refcnt) != 1)
wait_event_interruptible_timeout(stream->close_wq, atomic_read(&stream->refcnt) == 1, HZ / 10);
DBG("del_stream() releasing last ref\n");
stream_put(stream);
}
@ -3471,7 +3464,7 @@ static ssize_t proc_stream_read(struct file *f, char __user *b, size_t l, loff_t
DBG("locking stream's packet list lock\n");
spin_lock_irqsave(&stream->packet_list_lock, flags);
while (list_empty(&stream->packet_list) && !stream->eof) {
while (list_empty(&stream->packet_list) && !atomic_read(&stream->eof)) {
spin_unlock_irqrestore(&stream->packet_list_lock, flags);
DBG("list is empty\n");
ret = -EAGAIN;
@ -3479,14 +3472,15 @@ static ssize_t proc_stream_read(struct file *f, char __user *b, size_t l, loff_t
goto out;
DBG("going to sleep\n");
ret = -ERESTARTSYS;
if (wait_event_interruptible_timeout(stream->read_wq, !list_empty(&stream->packet_list) || stream->eof, HZ / 10))
if (wait_event_interruptible_timeout(stream->read_wq, !list_empty(&stream->packet_list)
|| atomic_read(&stream->eof), HZ / 10))
goto out;
DBG("awakened\n");
spin_lock_irqsave(&stream->packet_list_lock, flags);
}
ret = 0;
if (stream->eof) {
if (atomic_read(&stream->eof)) {
DBG("eof\n");
spin_unlock_irqrestore(&stream->packet_list_lock, flags);
goto out;
@ -3575,7 +3569,7 @@ static unsigned int proc_stream_poll(struct file *f, struct poll_table_struct *p
if (!list_empty(&stream->packet_list))
ret |= POLLIN | POLLRDNORM;
if (stream->eof)
if (atomic_read(&stream->eof))
ret |= POLLIN | POLLRDNORM | POLLHUP | POLLRDHUP;
DBG("returning from proc_stream_poll()\n");
@ -3591,7 +3585,6 @@ static int proc_stream_open(struct inode *i, struct file *f) {
int err;
unsigned int stream_idx = (unsigned int) (unsigned long) PDE_DATA(f->f_path.dentry->d_inode);
struct re_stream *stream;
unsigned long flags;
DBG("entering proc_stream_open()\n");
@ -3602,13 +3595,10 @@ static int proc_stream_open(struct inode *i, struct file *f) {
if (!stream)
return -EIO;
spin_lock_irqsave(&stream->packet_list_lock, flags);
if (stream->eof) {
spin_unlock_irqrestore(&stream->packet_list_lock, flags);
if (atomic_read(&stream->eof)) {
stream_put(stream);
return -ETXTBSY;
}
spin_unlock_irqrestore(&stream->packet_list_lock, flags);
return 0;
}
@ -3646,7 +3636,7 @@ static void add_stream_packet(struct re_stream *stream, struct re_stream_packet
spin_lock_irqsave(&stream->packet_list_lock, flags);
err = 0;
if (stream->eof)
if (atomic_read(&stream->eof))
goto err; /* we accept, but ignore/discard */
DBG("adding packet to queue\n");

Loading…
Cancel
Save