delay proc stream/call file deletion until all references are cleared

fixes #384

Change-Id: I1c94027644a18a280077ba00d023c28070d2c9d7
pull/400/head
Richard Fuchs 8 years ago
parent 00cc28dff4
commit 440a1f8a4e

@ -316,6 +316,7 @@ struct re_call {
struct rtpengine_call_info info; struct rtpengine_call_info info;
unsigned int table_id; unsigned int table_id;
u32 hash_bucket; u32 hash_bucket;
int deleted; /* protected by calls.lock */
struct proc_dir_entry *root; struct proc_dir_entry *root;
@ -338,6 +339,7 @@ struct re_stream {
u32 hash_bucket; u32 hash_bucket;
struct proc_dir_entry *file; struct proc_dir_entry *file;
struct re_call *call; /* holds a reference */
struct list_head call_entry; /* protected by streams.lock */ struct list_head call_entry; /* protected by streams.lock */
struct hlist_node streams_hash_entry; struct hlist_node streams_hash_entry;
@ -345,8 +347,9 @@ struct re_stream {
spinlock_t packet_list_lock; spinlock_t packet_list_lock;
struct list_head packet_list; struct list_head packet_list;
unsigned int list_count; unsigned int list_count;
wait_queue_head_t wq; wait_queue_head_t read_wq;
int eof; wait_queue_head_t close_wq;
int eof; /* protected by packet_list_lock */
}; };
#define RE_HASH_BITS 8 /* make configurable? */ #define RE_HASH_BITS 8 /* make configurable? */
@ -653,7 +656,11 @@ static struct rtpengine_table *new_table(void) {
#define ref_get(o) atomic_inc(&(o)->refcnt) static inline void __ref_get(void *p, atomic_t *refcnt) {
DBG("ref_get(%p) - refcnt is %u\n", p, atomic_read(refcnt));
atomic_inc(refcnt);
}
#define ref_get(o) __ref_get(o, &(o)->refcnt)
@ -917,23 +924,34 @@ static void clear_stream_packets(struct re_stream *stream) {
} }
} }
static void stream_put(struct re_stream *stream) { static void stream_put(struct re_stream *stream) {
DBG("stream_put()\n"); DBG("stream_put(%p) - refcnt is %u\n",
stream,
stream ? atomic_read(&stream->refcnt) : (unsigned) -1);
if (!stream) if (!stream)
return; return;
if (!atomic_dec_and_test(&stream->refcnt)) if (!atomic_dec_and_test(&stream->refcnt)) {
/* if this is an open file being closed and there's a del_stream()
* waiting for us, we need to wake up the sleeping del_stream() */
wake_up_interruptible(&stream->close_wq);
return; return;
}
DBG("Freeing stream object\n"); DBG("Freeing stream object\n");
clear_stream_packets(stream); clear_stream_packets(stream);
clear_proc(&stream->file); clear_proc(&stream->file);
if (stream->call)
call_put(stream->call);
kfree(stream); kfree(stream);
} }
static void call_put(struct re_call *call) { static void call_put(struct re_call *call) {
DBG("call_put()\n"); DBG("call_put(%p) - refcnt is %u\n",
call,
call ? atomic_read(&call->refcnt) : (unsigned) -1);
if (!call) if (!call)
return; return;
@ -946,6 +964,9 @@ static void call_put(struct re_call *call) {
if (!list_empty(&call->streams)) if (!list_empty(&call->streams))
panic("BUG! streams list not empty in call"); panic("BUG! streams list not empty in call");
DBG("clearing call proc files\n");
clear_proc(&call->root);
kfree(call); kfree(call);
} }
@ -2400,6 +2421,8 @@ static struct re_call *get_call(struct rtpengine_table *table, unsigned int idx)
return NULL; return NULL;
if (table && ret->table_id != table->id) if (table && ret->table_id != table->id)
return NULL; return NULL;
if (ret->deleted)
return NULL;
return ret; return ret;
} }
/* handles the locking (read) and reffing */ /* handles the locking (read) and reffing */
@ -2586,15 +2609,14 @@ static void del_call(struct re_call *call, struct rtpengine_table *table) {
_w_lock(&calls.lock, flags); _w_lock(&calls.lock, flags);
if (!list_empty(&call->table_entry)) { if (call->deleted) {
list_del_init(&call->table_entry); /* already doing this */
_w_unlock(&calls.lock, flags);
call_put(call); call_put(call);
return;
} }
if (calls.array[call->info.call_idx] == call) { call->deleted = 1;
auto_array_clear_index(&calls, call->info.call_idx);
call_put(call);
}
_w_unlock(&calls.lock, flags); _w_unlock(&calls.lock, flags);
@ -2610,9 +2632,6 @@ static void del_call(struct re_call *call, struct rtpengine_table *table) {
} }
_w_unlock(&streams.lock, flags); _w_unlock(&streams.lock, flags);
DBG("clearing call proc files\n");
clear_proc(&call->root);
DBG("locking table's call hash\n"); DBG("locking table's call hash\n");
spin_lock_irqsave(&table->calls_hash_lock[call->hash_bucket], flags); spin_lock_irqsave(&table->calls_hash_lock[call->hash_bucket], flags);
if (!hlist_unhashed(&call->calls_hash_entry)) { if (!hlist_unhashed(&call->calls_hash_entry)) {
@ -2621,6 +2640,20 @@ static void del_call(struct re_call *call, struct rtpengine_table *table) {
} }
spin_unlock_irqrestore(&table->calls_hash_lock[call->hash_bucket], flags); spin_unlock_irqrestore(&table->calls_hash_lock[call->hash_bucket], flags);
_w_lock(&calls.lock, flags);
if (!list_empty(&call->table_entry)) {
list_del_init(&call->table_entry);
call_put(call);
}
if (calls.array[call->info.call_idx] == call) {
auto_array_clear_index(&calls, call->info.call_idx);
call_put(call);
}
_w_unlock(&calls.lock, flags);
DBG("del_call() done, releasing ref\n"); DBG("del_call() done, releasing ref\n");
call_put(call); /* might be the last ref */ call_put(call); /* might be the last ref */
} }
@ -2665,7 +2698,8 @@ static int table_new_stream(struct rtpengine_table *table, struct rtpengine_stre
atomic_set(&stream->refcnt, 1); atomic_set(&stream->refcnt, 1);
INIT_LIST_HEAD(&stream->packet_list); INIT_LIST_HEAD(&stream->packet_list);
spin_lock_init(&stream->packet_list_lock); spin_lock_init(&stream->packet_list_lock);
init_waitqueue_head(&stream->wq); init_waitqueue_head(&stream->read_wq);
init_waitqueue_head(&stream->close_wq);
/* check for name collisions */ /* check for name collisions */
@ -2715,6 +2749,9 @@ not_found:
list_add(&stream->call_entry, &call->streams); /* new ref here */ list_add(&stream->call_entry, &call->streams); /* new ref here */
ref_get(stream); ref_get(stream);
stream->call = call;
ref_get(call);
_w_unlock(&streams.lock, flags); _w_unlock(&streams.lock, flags);
/* proc_ functions may sleep, so this must be done outside of the lock */ /* proc_ functions may sleep, so this must be done outside of the lock */
@ -2769,13 +2806,9 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table)
clear_stream_packets(stream); clear_stream_packets(stream);
DBG("stream is finished (EOF), waking up threads\n"); DBG("stream is finished (EOF), waking up threads\n");
wake_up_interruptible(&stream->wq); wake_up_interruptible(&stream->read_wq);
/* sleeping readers will now close files */ /* sleeping readers will now close files */
DBG("clearing stream proc file\n");
/* this blocks until the files have been closed */
clear_proc(&stream->file);
DBG("clearing stream from streams_hash\n"); DBG("clearing stream from streams_hash\n");
spin_lock_irqsave(&table->streams_hash_lock[stream->hash_bucket], flags); spin_lock_irqsave(&table->streams_hash_lock[stream->hash_bucket], flags);
if (!hlist_unhashed(&stream->streams_hash_entry)) { if (!hlist_unhashed(&stream->streams_hash_entry)) {
@ -2790,15 +2823,30 @@ static void del_stream(struct re_stream *stream, struct rtpengine_table *table)
list_del_init(&stream->call_entry); list_del_init(&stream->call_entry);
stream_put(stream); stream_put(stream);
} }
if (streams.array[stream->info.stream_idx] == stream) { _w_unlock(&streams.lock, flags);
/* At this point, there should only be 2 references left: ours, and the entry in
* the "streams" array. Any other references are open files and we must wait until
* they're closed. There can be no new open file references as the stream is set
* to eof. */
DBG("del_stream() waiting for other refs\n");
while (1) {
if (wait_event_interruptible(stream->close_wq, atomic_read(&stream->refcnt) == 2) == 0)
break;
}
DBG("clearing stream's stream_idx entry\n"); DBG("clearing stream's stream_idx entry\n");
_w_lock(&streams.lock, flags);
if (streams.array[stream->info.stream_idx] == stream) {
auto_array_clear_index(&streams, stream->info.stream_idx); auto_array_clear_index(&streams, stream->info.stream_idx);
stream_put(stream); stream_put(stream); /* down to 1 ref */
} }
else
printk(KERN_WARNING "BUG in del_stream with streams.array\n");
_w_unlock(&streams.lock, flags); _w_unlock(&streams.lock, flags);
DBG("del_stream() done, releasing ref\n"); DBG("del_stream() releasing last ref\n");
stream_put(stream); /* might be the last ref */ stream_put(stream);
} }
static int table_del_stream(struct rtpengine_table *table, const struct rtpengine_stream_info *info) { static int table_del_stream(struct rtpengine_table *table, const struct rtpengine_stream_info *info) {
@ -2855,7 +2903,7 @@ static ssize_t proc_stream_read(struct file *f, char __user *b, size_t l, loff_t
goto out; goto out;
DBG("going to sleep\n"); DBG("going to sleep\n");
ret = -ERESTARTSYS; ret = -ERESTARTSYS;
if (wait_event_interruptible(stream->wq, !list_empty(&stream->packet_list) || stream->eof)) if (wait_event_interruptible(stream->read_wq, !list_empty(&stream->packet_list) || stream->eof))
goto out; goto out;
DBG("awakened\n"); DBG("awakened\n");
spin_lock_irqsave(&stream->packet_list_lock, flags); spin_lock_irqsave(&stream->packet_list_lock, flags);
@ -2925,7 +2973,7 @@ static unsigned int proc_stream_poll(struct file *f, struct poll_table_struct *p
spin_unlock_irqrestore(&stream->packet_list_lock, flags); spin_unlock_irqrestore(&stream->packet_list_lock, flags);
poll_wait(f, &stream->wq, p); poll_wait(f, &stream->read_wq, p);
stream_put(stream); stream_put(stream);
@ -2936,6 +2984,7 @@ static int proc_stream_open(struct inode *i, struct file *f) {
int err; int err;
unsigned int stream_idx = (unsigned int) (unsigned long) PDE_DATA(f->f_path.dentry->d_inode); unsigned int stream_idx = (unsigned int) (unsigned long) PDE_DATA(f->f_path.dentry->d_inode);
struct re_stream *stream; struct re_stream *stream;
unsigned long flags;
DBG("entering proc_stream_open()\n"); DBG("entering proc_stream_open()\n");
@ -2946,6 +2995,14 @@ static int proc_stream_open(struct inode *i, struct file *f) {
if (!stream) if (!stream)
return -EIO; return -EIO;
spin_lock_irqsave(&stream->packet_list_lock, flags);
if (stream->eof) {
spin_unlock_irqrestore(&stream->packet_list_lock, flags);
stream_put(stream);
return -ETXTBSY;
}
spin_unlock_irqrestore(&stream->packet_list_lock, flags);
return 0; return 0;
} }
@ -3004,7 +3061,7 @@ static void add_stream_packet(struct re_stream *stream, struct re_stream_packet
DBG("stream's packet list lock is unlocked, now awakening processes\n"); DBG("stream's packet list lock is unlocked, now awakening processes\n");
wake_up_interruptible(&stream->wq); wake_up_interruptible(&stream->read_wq);
while (!list_empty(&delete_list)) { while (!list_empty(&delete_list)) {
packet = list_first_entry(&delete_list, struct re_stream_packet, list_entry); packet = list_first_entry(&delete_list, struct re_stream_packet, list_entry);

@ -58,6 +58,8 @@ static void stream_handler(handler_t *handler) {
goto out; goto out;
} }
else if (ret < 0) { else if (ret < 0) {
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)
goto out;
ilog(LOG_INFO, "Read error on stream %s: %s", stream->name, strerror(errno)); ilog(LOG_INFO, "Read error on stream %s: %s", stream->name, strerror(errno));
stream_close(stream); stream_close(stream);
goto out; goto out;

Loading…
Cancel
Save