mirror of https://github.com/sipwise/rtpengine.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
241 lines
4.2 KiB
241 lines
4.2 KiB
#include "streambuf.h"
|
|
|
|
#include <stdio.h>
|
|
#include <glib.h>
|
|
#include <stdlib.h>
|
|
#include <unistd.h>
|
|
#include <errno.h>
|
|
#include <stdarg.h>
|
|
#include <time.h>
|
|
|
|
#include "poller.h"
|
|
#include "auxlib.h"
|
|
|
|
|
|
|
|
|
|
static ssize_t __fd_write(void *, const void *, size_t);
|
|
static ssize_t __fd_read(void *, void *, size_t);
|
|
|
|
static const struct streambuf_funcs __fd_funcs = {
|
|
.write = __fd_write,
|
|
.read = __fd_read,
|
|
};
|
|
|
|
static ssize_t __fd_write(void *fd, const void *b, size_t s) {
|
|
return write(GPOINTER_TO_INT(fd), b, s);
|
|
}
|
|
static ssize_t __fd_read(void *fd, void *b, size_t s) {
|
|
return read(GPOINTER_TO_INT(fd), b, s);
|
|
}
|
|
|
|
struct streambuf *streambuf_new_ptr(struct poller *p, void *fd_ptr, const struct streambuf_funcs *funcs) {
|
|
struct streambuf *b;
|
|
|
|
b = g_slice_alloc0(sizeof(*b));
|
|
|
|
mutex_init(&b->lock);
|
|
b->buf = g_string_new("");
|
|
b->fd_ptr = fd_ptr;
|
|
b->poller = p;
|
|
b->active = rtpe_now.tv_sec;
|
|
b->funcs = funcs;
|
|
|
|
return b;
|
|
}
|
|
struct streambuf *streambuf_new(struct poller *p, int fd) {
|
|
return streambuf_new_ptr(p, GINT_TO_POINTER(fd), &__fd_funcs);
|
|
}
|
|
|
|
|
|
void streambuf_destroy(struct streambuf *b) {
|
|
g_string_free(b->buf, TRUE);
|
|
g_slice_free1(sizeof(*b), b);
|
|
}
|
|
|
|
|
|
int streambuf_writeable(struct streambuf *b) {
|
|
int ret;
|
|
unsigned int out;
|
|
|
|
mutex_lock(&b->lock);
|
|
|
|
for (;;) {
|
|
if (!b->buf->len)
|
|
break;
|
|
|
|
out = (b->buf->len > 1024) ? 1024 : b->buf->len;
|
|
ret = b->funcs->write(b->fd_ptr, b->buf->str, out);
|
|
|
|
if (ret < 0) {
|
|
if (errno == EINTR)
|
|
continue;
|
|
if (errno != EAGAIN && errno != EWOULDBLOCK) {
|
|
mutex_unlock(&b->lock);
|
|
return -1;
|
|
}
|
|
ret = 0;
|
|
}
|
|
|
|
if (ret > 0) {
|
|
g_string_erase(b->buf, 0, ret);
|
|
b->active = rtpe_now.tv_sec;
|
|
}
|
|
|
|
if (ret != out) {
|
|
poller_blocked(b->poller, b->fd_ptr);
|
|
break;
|
|
}
|
|
}
|
|
|
|
mutex_unlock(&b->lock);
|
|
return 0;
|
|
}
|
|
|
|
int streambuf_readable(struct streambuf *b) {
|
|
int ret;
|
|
char buf[1024];
|
|
|
|
mutex_lock(&b->lock);
|
|
|
|
for (;;) {
|
|
ret = b->funcs->read(b->fd_ptr, buf, 1024);
|
|
|
|
if (ret == 0) {
|
|
// don't discard already read data in the buffer
|
|
b->eof = 1;
|
|
ret = b->buf->len ? -2 : -1;
|
|
mutex_unlock(&b->lock);
|
|
return ret;
|
|
}
|
|
if (ret < 0) {
|
|
if (errno == EINTR)
|
|
continue;
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
|
break;
|
|
mutex_unlock(&b->lock);
|
|
return -1;
|
|
}
|
|
|
|
g_string_append_len(b->buf, buf, ret);
|
|
b->active = rtpe_now.tv_sec;
|
|
}
|
|
|
|
mutex_unlock(&b->lock);
|
|
return 0;
|
|
}
|
|
|
|
|
|
char *streambuf_getline(struct streambuf *b) {
|
|
char *p;
|
|
int len, to_del;
|
|
char *s = NULL;
|
|
|
|
mutex_lock(&b->lock);
|
|
|
|
for (;;) {
|
|
if (s) {
|
|
free(s);
|
|
s = NULL;
|
|
}
|
|
|
|
p = memchr(b->buf->str, '\n', b->buf->len);
|
|
if (!p) {
|
|
if (b->eof) {
|
|
// use entire string
|
|
len = b->buf->len;
|
|
to_del = len;
|
|
}
|
|
else
|
|
break;
|
|
}
|
|
else {
|
|
len = p - b->buf->str;
|
|
to_del = len + 1;
|
|
if (len == 0) {
|
|
// blank line, skip it
|
|
g_string_erase(b->buf, 0, 1);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
s = malloc(len + 1);
|
|
memcpy(s, b->buf->str, len);
|
|
s[len] = '\0';
|
|
g_string_erase(b->buf, 0, to_del);
|
|
|
|
if (s[--len] == '\r') {
|
|
if (len == 0)
|
|
continue;
|
|
s[len] = '\0';
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
mutex_unlock(&b->lock);
|
|
return s;
|
|
}
|
|
|
|
unsigned int streambuf_bufsize(struct streambuf *b) {
|
|
return b->buf->len;
|
|
}
|
|
|
|
|
|
void streambuf_vprintf(struct streambuf *b, const char *f, va_list va) {
|
|
GString *gs;
|
|
|
|
gs = g_string_new("");
|
|
g_string_vprintf(gs, f, va);
|
|
|
|
streambuf_write(b, gs->str, gs->len);
|
|
g_string_free(gs, TRUE);
|
|
}
|
|
|
|
void streambuf_printf(struct streambuf *b, const char *f, ...) {
|
|
va_list va;
|
|
|
|
va_start(va, f);
|
|
streambuf_vprintf(b, f, va);
|
|
va_end(va);
|
|
}
|
|
|
|
void streambuf_write(struct streambuf *b, const char *s, unsigned int len) {
|
|
unsigned int out;
|
|
int ret;
|
|
|
|
if (!b)
|
|
return;
|
|
|
|
mutex_lock(&b->lock);
|
|
|
|
while (len && !poller_isblocked(b->poller, b->fd_ptr)) {
|
|
out = (len > 1024) ? 1024 : len;
|
|
ret = b->funcs->write(b->fd_ptr, s, out);
|
|
|
|
if (ret < 0) {
|
|
if (errno == EINTR)
|
|
continue;
|
|
if (errno != EAGAIN && errno != EWOULDBLOCK) {
|
|
poller_error(b->poller, b->fd_ptr);
|
|
break;
|
|
}
|
|
poller_blocked(b->poller, b->fd_ptr);
|
|
break;
|
|
}
|
|
if (ret == 0)
|
|
break;
|
|
|
|
s += ret;
|
|
len -= ret;
|
|
b->active = rtpe_now.tv_sec;
|
|
}
|
|
|
|
if (b->buf->len > 5242880)
|
|
poller_error(b->poller, b->fd_ptr);
|
|
else if (len)
|
|
g_string_append_len(b->buf, s, len);
|
|
|
|
mutex_unlock(&b->lock);
|
|
}
|