commit 593121d5516e1282084d35bec20aa21bb19ffee3 Author: Richard Fuchs Date: Fri Mar 11 20:40:56 2011 +0000 create tags and branches diff --git a/daemon/Makefile b/daemon/Makefile new file mode 100644 index 0000000..5901d97 --- /dev/null +++ b/daemon/Makefile @@ -0,0 +1,40 @@ +CC= gcc +CFLAGS= -g -Wall `pkg-config --cflags glib-2.0` `pcre-config --cflags` -I/lib/modules/`uname -r`/build/include/ -I../kernel-module/ +#CFLAGS+= -O2 +LDFLAGS= `pkg-config --libs glib-2.0` `pcre-config --libs` + +SRCS= main.c kernel.c poller.c aux.c control.c streambuf.c call.c + +OBJS= $(SRCS:.c=.o) + + +.PHONY: all dep clean tests + +all: + make mediaproxy-ng + +tests: + make aux-test poller-test + +dep: .depend + +clean: + rm -f $(OBJS) mediaproxy-ng aux-test poller-test aux-test.o poller-test.o .depend build_time.h core + +.depend: $(SRCS) Makefile build_time.h + $(CC) $(CFLAGS) -M $(SRCS) | sed -e 's/:/ .depend:/' > .depend + +build_time.h: $(SRCS) Makefile + date +"#define BUILD_TIME \"%Y-%m-%d %H:%M:%S\"" > build_time.h + +mediaproxy-ng: $(OBJS) .depend build_time.h + $(CC) $(CFLAGS) -o $@ $(OBJS) $(LDFLAGS) + +aux-test: aux.o aux-test.o .depend build_time.h + $(CC) $(CFLAGS) -o $@ aux-test.o aux.o $(LDFLAGS) + +poller-test: poller.o poller-test.o aux.o .depend build_time.h + $(CC) $(CFLAGS) -o $@ poller-test.o poller.o aux.o $(LDFLAGS) + + +include .depend diff --git a/daemon/aux-test.c b/daemon/aux-test.c new file mode 100644 index 0000000..52e0d2c --- /dev/null +++ b/daemon/aux-test.c @@ -0,0 +1,305 @@ +#include +#include "aux.h" + + +int test[32] = { + 0x10, 0x20, 0x30, 0x40, 0x50, 0x60, 0x70, 0x80, + 0x90, 0xa0, 0xb0, 0xc0, 0xd0, 0xe0, 0xf0, 0xff, + + 0x10ff, 0x20ff, 0x30ff, 0x40ff, 0x50ff, 0x60ff, 0x70ff, 0x80ff, + 0x90ff, 0xa0ff, 0xb0ff, 0xc0ff, 0xd0ff, 0xe0ff, 0xf0ff, 0xffff, +}; + +void exsrx(int x, int exp, unsigned int s, int ex) { + int ret = mybsearch(test, s, sizeof(int), &x, 0, sizeof(x), ex); + if (ret != exp) + fprintf(stderr, "TEST FAILED! params=%u %i; result=%i, expected=%i\n", s, ex, ret, exp); +} + +void exsr1(int x, int exp) { + exsrx(x, exp, 16, 1); +} + +void exsr2(int x, int exp) { + exsrx(x, exp, 15, 1); +} + +void exsr3(int x, int exp) { + exsrx(x, exp, 2, 1); +} + +void exsr4(int x, int exp) { + exsrx(x, exp, 1, 1); +} + +void exsr5(int x, int exp) { + exsrx(x, exp, 32, 1); +} + +void exsr6(int x, int exp) { + exsrx(x, exp, 31, 1); +} + +void exsr7(int x, int exp) { + exsrx(x, exp, 16, 0); +} + +void exsr8(int x, int exp) { + exsrx(x, exp, 15, 0); +} + +void exsr9(int x, int exp) { + exsrx(x, exp, 2, 0); +} + +void exsr10(int x, int exp) { + exsrx(x, exp, 1, 0); +} + +void exsr11(int x, int exp) { + exsrx(x, exp, 32, 0); +} + +void exsr12(int x, int exp) { + exsrx(x, exp, 31, 0); +} + +int main() { + exsr1(0x10, 0); + exsr1(0x20, 1); + exsr1(0x30, 2); + exsr1(0x40, 3); + exsr1(0x50, 4); + exsr1(0x60, 5); + exsr1(0x70, 6); + exsr1(0x80, 7); + exsr1(0x90, 8); + exsr1(0xa0, 9); + exsr1(0xb0, 10); + exsr1(0xc0, 11); + exsr1(0xd0, 12); + exsr1(0xe0, 13); + exsr1(0xf0, 14); + exsr1(0xff, 15); + exsr1(0xffff, -1); + exsr1(0xfe, -1); + + exsr2(0x10, 0); + exsr2(0x20, 1); + exsr2(0x30, 2); + exsr2(0x40, 3); + exsr2(0x50, 4); + exsr2(0x60, 5); + exsr2(0x70, 6); + exsr2(0x80, 7); + exsr2(0x90, 8); + exsr2(0xa0, 9); + exsr2(0xb0, 10); + exsr2(0xc0, 11); + exsr2(0xd0, 12); + exsr2(0xe0, 13); + exsr2(0xf0, 14); + exsr2(0xff, -1); + + exsr3(0x10, 0); + exsr3(0x20, 1); + exsr3(0x30, -1); + + exsr4(0x10, 0); + exsr4(0x20, -1); + + exsr5(0x10, 0); + exsr5(0x20, 1); + exsr5(0x30, 2); + exsr5(0x40, 3); + exsr5(0x50, 4); + exsr5(0x60, 5); + exsr5(0x70, 6); + exsr5(0x80, 7); + exsr5(0x90, 8); + exsr5(0xa0, 9); + exsr5(0xb0, 10); + exsr5(0xc0, 11); + exsr5(0xd0, 12); + exsr5(0xe0, 13); + exsr5(0xf0, 14); + exsr5(0xff, 15); + exsr5(0x10ff, 16); + exsr5(0x20ff, 17); + exsr5(0x30ff, 18); + exsr5(0x40ff, 19); + exsr5(0x50ff, 20); + exsr5(0x60ff, 21); + exsr5(0x70ff, 22); + exsr5(0x80ff, 23); + exsr5(0x90ff, 24); + exsr5(0xa0ff, 25); + exsr5(0xb0ff, 26); + exsr5(0xc0ff, 27); + exsr5(0xd0ff, 28); + exsr5(0xe0ff, 29); + exsr5(0xf0ff, 30); + exsr5(0xffff, 31); + exsr5(0xfff3, -1); + exsr5(0xffffff, -1); + + exsr6(0x10, 0); + exsr6(0x20, 1); + exsr6(0x30, 2); + exsr6(0x40, 3); + exsr6(0x50, 4); + exsr6(0x60, 5); + exsr6(0x70, 6); + exsr6(0x80, 7); + exsr6(0x90, 8); + exsr6(0xa0, 9); + exsr6(0xb0, 10); + exsr6(0xc0, 11); + exsr6(0xd0, 12); + exsr6(0xe0, 13); + exsr6(0xf0, 14); + exsr6(0xff, 15); + exsr6(0x10ff, 16); + exsr6(0x20ff, 17); + exsr6(0x30ff, 18); + exsr6(0x40ff, 19); + exsr6(0x50ff, 20); + exsr6(0x60ff, 21); + exsr6(0x70ff, 22); + exsr6(0x80ff, 23); + exsr6(0x90ff, 24); + exsr6(0xa0ff, 25); + exsr6(0xb0ff, 26); + exsr6(0xc0ff, 27); + exsr6(0xd0ff, 28); + exsr6(0xe0ff, 29); + exsr6(0xf0ff, 30); + exsr6(0xffff, -1); + + + + + + exsr7(0x10, 0); + exsr7(0x20, 1); + exsr7(0x30, 2); + exsr7(0x40, 3); + exsr7(0x50, 4); + exsr7(0x60, 5); + exsr7(0x70, 6); + exsr7(0x80, 7); + exsr7(0x90, 8); + exsr7(0xa0, 9); + exsr7(0xb0, 10); + exsr7(0xc0, 11); + exsr7(0xd0, 12); + exsr7(0xe0, 13); + exsr7(0xf0, 14); + exsr7(0xff, 15); + exsr7(0xffff, -17); + exsr7(0xfe, -16); + exsr7(0x00, -1); + exsr8(0x05, -1); + exsr8(0x15, -2); + + exsr8(0x10, 0); + exsr8(0x20, 1); + exsr8(0x30, 2); + exsr8(0x40, 3); + exsr8(0x50, 4); + exsr8(0x60, 5); + exsr8(0x70, 6); + exsr8(0x80, 7); + exsr8(0x90, 8); + exsr8(0xa0, 9); + exsr8(0xb0, 10); + exsr8(0xc0, 11); + exsr8(0xd0, 12); + exsr8(0xe0, 13); + exsr8(0xf0, 14); + exsr8(0xff, -16); + exsr8(0xffff, -16); + exsr8(0xef, -15); + exsr8(0x00, -1); + exsr8(0x05, -1); + exsr8(0x15, -2); + + exsr9(0x10, 0); + exsr9(0x20, 1); + exsr9(0x30, -3); + + exsr10(0x10, 0); + exsr10(0x20, -2); + + exsr11(0x10, 0); + exsr11(0x20, 1); + exsr11(0x30, 2); + exsr11(0x40, 3); + exsr11(0x50, 4); + exsr11(0x60, 5); + exsr11(0x70, 6); + exsr11(0x80, 7); + exsr11(0x90, 8); + exsr11(0xa0, 9); + exsr11(0xb0, 10); + exsr11(0xc0, 11); + exsr11(0xd0, 12); + exsr11(0xe0, 13); + exsr11(0xf0, 14); + exsr11(0xff, 15); + exsr11(0x10ff, 16); + exsr11(0x20ff, 17); + exsr11(0x30ff, 18); + exsr11(0x40ff, 19); + exsr11(0x50ff, 20); + exsr11(0x60ff, 21); + exsr11(0x70ff, 22); + exsr11(0x80ff, 23); + exsr11(0x90ff, 24); + exsr11(0xa0ff, 25); + exsr11(0xb0ff, 26); + exsr11(0xc0ff, 27); + exsr11(0xd0ff, 28); + exsr11(0xe0ff, 29); + exsr11(0xf0ff, 30); + exsr11(0xffff, 31); + exsr11(0xfff3, -16); + exsr11(0xffffff, -33); + + exsr12(0x10, 0); + exsr12(0x20, 1); + exsr12(0x30, 2); + exsr12(0x40, 3); + exsr12(0x50, 4); + exsr12(0x60, 5); + exsr12(0x70, 6); + exsr12(0x80, 7); + exsr12(0x90, 8); + exsr12(0xa0, 9); + exsr12(0xb0, 10); + exsr12(0xc0, 11); + exsr12(0xd0, 12); + exsr12(0xe0, 13); + exsr12(0xf0, 14); + exsr12(0xff, 15); + exsr12(0x10ff, 16); + exsr12(0x20ff, 17); + exsr12(0x30ff, 18); + exsr12(0x40ff, 19); + exsr12(0x50ff, 20); + exsr12(0x60ff, 21); + exsr12(0x70ff, 22); + exsr12(0x80ff, 23); + exsr12(0x90ff, 24); + exsr12(0xa0ff, 25); + exsr12(0xb0ff, 26); + exsr12(0xc0ff, 27); + exsr12(0xd0ff, 28); + exsr12(0xe0ff, 29); + exsr12(0xf0ff, 30); + exsr12(0xffff, -32); + + printf("all done\n"); + + return 0; +} diff --git a/daemon/aux.c b/daemon/aux.c new file mode 100644 index 0000000..04e65d3 --- /dev/null +++ b/daemon/aux.c @@ -0,0 +1,161 @@ +#define _GNU_SOURCE +#include +#include +#include +#include + +#include "aux.h" + + + +#if 0 +#define BSDB(x...) fprintf(stderr, x) +#else +#define BSDB(x...) ((void)0) +#endif + + + +int mybsearch(void *base, unsigned int len, unsigned int size, void *key, unsigned int key_off, unsigned int key_size, int exact) { + unsigned char *cbase = base; + int pos; + unsigned char *cur; + int res; + unsigned int num; + + if (!len) { + BSDB("zero length array\n"); + return -1; + } + + pos = len / 2; + num = pos; + num += 3; + num /= 2; + pos--; + if (pos < 0) + pos = 0; + + BSDB("starting pos=%u, num=%u\n", pos, num); + + for (;;) { + cur = cbase + (pos * size); + res = memcmp(cur + key_off, key, key_size); + BSDB("compare=%i\n", res); + if (!res) + return pos; + if (!num) { + BSDB("nothing found\n"); + if (exact) + return -1; + if (res > 0) /* cur > key */ + return -1 * pos - 1; + return -1 * pos - 2; + } + + if (res < 0) { /* cur < key */ + pos += num; + if (pos >= len) + pos = len - 1; + } + else { + pos -= num; + if (pos < 0) + pos = 0; + } + + BSDB("new pos=%u\n", pos); + + if (num == 1) + num = 0; + else { + num++; + num /= 2; + } + + BSDB("new num=%u\n", num); + } +} + + +GList *g_list_link(GList *list, GList *el) { + el->prev = NULL; + el->next = list; + if (list) + list->prev = el; + return el; +} + + +GQueue *pcre_multi_match(pcre **re, pcre_extra **ree, const char *rex, const char *s, unsigned int num, parse_func f, void *p) { + GQueue *q; + const char *errptr; + int erroff; + unsigned int start, len; + int ovec[60]; + int *ov; + char **el; + unsigned int i; + void *ins; + + if (!*re) { + *re = pcre_compile(rex, PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL); + if (!*re) + return NULL; + *ree = pcre_study(*re, 0, &errptr); + } + + q = g_queue_new(); + el = malloc(sizeof(*el) * num); + + for (start = 0, len = strlen(s); pcre_exec(*re, *ree, s + start, len - start, 0, 0, ovec, ARRAY_SIZE(ovec)) > 0; start += ovec[1]) { + for (i = 0; i < num; i++) { + ov = ovec + 2 + i*2; + el[i] = (ov[0] == -1) ? NULL : g_strndup(s + start + ov[0], ov[1] - ov[0]); + } + + if (!f(el, &ins, p)) + g_queue_push_tail(q, ins); + + for (i = 0; i < num; i++) { + if (el[i]) + free(el[i]); + } + } + + free(el); + + return q; +} + + +void strmove(char **d, char **s) { + if (*d) + free(*d); + *d = *s; + *s = strdup(""); +} + +void strdupfree(char **d, const char *s) { + if (*d) + free(*d); + *d = strdup(s); +} + + + +#if !GLIB_CHECK_VERSION(2,14,0) + +void g_string_vprintf(GString *string, const gchar *format, va_list args) { + char *s; + int r; + + r = vasprintf(&s, format, args); + if (r < 0) + return; + + g_string_assign(string, s); + free(s); +} + +#endif diff --git a/daemon/aux.h b/daemon/aux.h new file mode 100644 index 0000000..3c104eb --- /dev/null +++ b/daemon/aux.h @@ -0,0 +1,48 @@ +#ifndef __AUX_H__ +#define __AUX_H__ + + + +#include +#include +#include +#include +#include +#include +#include +#include + + + + +#define OFFSET_OF(t,e) ((unsigned int) (unsigned long) &(((t *) 0)->e)) +#define ARRAY_SIZE(x) (sizeof(x) / sizeof(*(x))) +#define ZERO(x) memset(&(x), 0, sizeof(x)) + +#define IPF "%u.%u.%u.%u" +#define IPP(x) ((unsigned char *) (&(x)))[0], ((unsigned char *) (&(x)))[1], ((unsigned char *) (&(x)))[2], ((unsigned char *) (&(x)))[3] +#define DF IPF ":%u" +#define DP(x) IPP((x).sin_addr.s_addr), ntohs((x).sin_port) + +#define NONBLOCK(x) fcntl(x, F_SETFL, O_NONBLOCK) +#define REUSEADDR(x) do { int ONE = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &ONE, sizeof(ONE)); } while (0) + + + + +typedef int (*parse_func)(char **, void **, void *); + +int mybsearch(void *, unsigned int, unsigned int, void *, unsigned int, unsigned int, int); +GList *g_list_link(GList *, GList *); +GQueue *pcre_multi_match(pcre **, pcre_extra **, const char *, const char *, unsigned int, parse_func, void *); +void strmove(char **, char **); +void strdupfree(char **, const char *); + + +#if !GLIB_CHECK_VERSION(2,14,0) +void g_string_vprintf(GString *string, const gchar *format, va_list args); +#endif + + + +#endif diff --git a/daemon/call.c b/daemon/call.c new file mode 100644 index 0000000..5d49891 --- /dev/null +++ b/daemon/call.c @@ -0,0 +1,989 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "call.h" +#include "poller.h" +#include "aux.h" +#include "log.h" +#include "kernel.h" +#include "control.h" +#include "streambuf.h" + + + +#if 0 +#define DBG(x...) mylog(LOG_DEBUG, x) +#else +#define DBG(x...) ((void)0) +#endif + + + +static pcre *info_re; +static pcre_extra *info_ree; +static pcre *streams_re; +static pcre_extra *streams_ree; + + + + +static char *rtp_codecs[] = { + [0] = "G711u", + [1] = "1016", + [2] = "G721", + [3] = "GSM", + [4] = "G723", + [5] = "DVI4", + [6] = "DVI4", + [7] = "LPC", + [8] = "G711a", + [9] = "G722", + [10] = "L16", + [11] = "L16", + [14] = "MPA", + [15] = "G728", + [18] = "G729", + [25] = "CelB", + [26] = "JPEG", + [28] = "nv", + [31] = "H261", + [32] = "MPV", + [33] = "MP2T", + [34] = "H263", +}; + + + + + + +static void call_destroy(struct call *); +static void unkernelize(struct peer *); + + + + + + + +static void stream_closed(int fd, void *p) { + struct streamrelay *r = p; + struct call *c; + + c = r->up->up->call; + + mylog(LOG_WARNING, "[%s] Read error on RTP socket", c->callid); + + call_destroy(c); +} + + + + +static void kernelize(struct callstream *c) { + int i, j; + struct peer *p, *pp; + struct streamrelay *r, *rp; + struct kernel_stream ks; + + mylog(LOG_DEBUG, "[%s] Kernelizing RTP streams", c->call->callid); + + ZERO(ks); + + for (i = 0; i < 2; i++) { + p = &c->peers[i]; + pp = &c->peers[i ^ 1]; + + if (p->kernelized) + continue; + + for (j = 0; j < 2; j++) { + r = &p->rtps[j]; + rp = &pp->rtps[j]; + + if (!r->peer.ip || !r->peer.port) + continue; + + ZERO(r->kstats); + + ks.local_port = r->localport; + ks.src.ip = c->call->callmaster->ip; + ks.src.port = rp->localport; + ks.dest.ip = r->peer.ip; + ks.dest.port = r->peer.port; + ks.tos = c->call->callmaster->tos; + + kernel_add_stream(c->call->callmaster->kernelfd, &ks, 0); + } + + p->kernelized = 1; + } +} + + + + +static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_in *fsin) { + struct streamrelay *p, *p2; + struct peer *pe, *pe2; + struct callstream *cs; + int ret; + struct sockaddr_in sin; + struct msghdr mh; + struct iovec iov; + unsigned char buf[256]; + struct cmsghdr *ch; + struct in_pktinfo *pi; + struct call *c; + struct callmaster *m; + unsigned char cc; + + pe = r->up; + cs = pe->up; + pe2 = &cs->peers[pe->idx ^ 1]; + p = &pe2->rtps[r->idx]; + c = cs->call; + m = c->callmaster; + + if (p->fd == -1) { + mylog(LOG_WARNING, "[%s] RTP packet discarded from " DF, c->callid, DP(*fsin)); + r->stats.errors++; + m->statsps.errors++; + return 0; + } + + if (!pe->confirmed && pe->filled && r->idx == 0) { + if (l < 2) + goto skip; + + if (!pe->codec) { + cc = b[1]; + cc &= 0x7f; + if (cc < ARRAY_SIZE(rtp_codecs)) + pe->codec = rtp_codecs[cc] ? : "unknown"; + else + pe->codec = "unknown"; + } + + mylog(LOG_DEBUG, "[%s] Confirmed peer information - " DF, c->callid, DP(*fsin)); + + p->peer.ip = fsin->sin_addr.s_addr; + p->peer.port = ntohs(fsin->sin_port); + + p2 = &p->up->rtps[p->idx ^ 1]; + p2->peer.ip = fsin->sin_addr.s_addr; + p2->peer.port = p->peer.port + ((int) (p2->idx * 2) - 1); + + pe->confirmed = 1; + + + if (pe2->confirmed && pe2->filled) + kernelize(cs); + } + +skip: + if (!r->peer.ip || !r->peer.port) + goto drop; + + ZERO(sin); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = r->peer.ip; + sin.sin_port = htons(r->peer.port); + + ZERO(iov); + iov.iov_base = b; + iov.iov_len = l; + + ZERO(mh); + mh.msg_name = &sin; + mh.msg_namelen = sizeof(sin); + mh.msg_iov = &iov; + mh.msg_iovlen = 1; + mh.msg_control = buf; + mh.msg_controllen = sizeof(buf); + + ch = CMSG_FIRSTHDR(&mh); + ZERO(*ch); + ch->cmsg_len = CMSG_LEN(sizeof(*pi)); + ch->cmsg_level = IPPROTO_IP; + ch->cmsg_type = IP_PKTINFO; + + pi = (void *) CMSG_DATA(ch); + ZERO(*pi); + pi->ipi_spec_dst.s_addr = m->ip; + + mh.msg_controllen = CMSG_SPACE(sizeof(*pi)); + + ret = sendmsg(p->fd, &mh, 0); + + if (ret == -1 && errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { + r->stats.errors++; + m->statsps.errors++; + return -1; + } + +drop: + r->stats.packets++; + r->stats.bytes += l; + m->statsps.packets++; + m->statsps.bytes += l; + r->last = m->poller->now; + + return 0; +} + + + + +static void stream_readable(int fd, void *p) { + struct streamrelay *r = p; + char buf[1024]; + int ret; + struct sockaddr_in sin; + unsigned int sinlen; + + for (;;) { + sinlen = sizeof(sin); + ret = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *) &sin, &sinlen); + + if (ret == 0) + goto err; + else if (ret < 0) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) + break; +err: + stream_closed(fd, r); + break; + } + + if (stream_packet(r, buf, ret, &sin)) { + mylog(LOG_WARNING, "Write error on RTP socket"); + call_destroy(r->up->up->call); + return; + } + } +} + + + + + +static int info_parse_func(char **a, void **ret, void *p) { + GHashTable *h = p; + + g_hash_table_replace(h, strdup(a[0]), a[1] ? strdup(a[1]) : NULL); + + return -1; +} + + +static GHashTable *info_parse(const char *s, GHashTable **h) { + GQueue *q; + + if (!*h) + *h = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); + + q = pcre_multi_match(&info_re, &info_ree, "^([^:,]+)(?::(.*?))?(?:$|,)", s, 2, info_parse_func, *h); + g_queue_free(q); + + return *h; +} + + +static int streams_parse_func(char **a, void **ret, void *p) { + struct stream *st; + + st = malloc(sizeof(*st)); + ZERO(*st); + + st->ip = inet_addr(a[0]); + st->port = atoi(a[1]); + st->mediatype = strdup(a[2] ? : ""); + + if (st->ip == -1) + goto fail; + if (!st->port) + goto fail; + + *ret = st; + return 0; + +fail: + free(st); + return -1; +} + + +static GQueue *streams_parse(const char *s) { + return pcre_multi_match(&streams_re, &streams_ree, "^([\\d.]+):(\\d+)(?::(.*?))?(?:$|,)", s, 3, streams_parse_func, NULL); +} + +static void streams_free(GQueue *q) { + struct stream *s; + + while (q->head) { + s = g_queue_pop_head(q); + free(s->mediatype); + free(s); + } + + g_queue_free(q); +} + + + +struct iterator_helper { + GList *del; + struct streamrelay *ports[0x10000]; +}; + + +static void call_timer_iterator(void *key, void *val, void *ptr) { + struct call *c = val; + struct iterator_helper *hlp = ptr; + GList *it; + struct callstream *cs; + int i; + struct peer *p; + struct poller *po; + struct callmaster *cm; + unsigned int check; + + if (!c->callstreams->head) + goto drop; + + cm = c->callmaster; + po = cm->poller; + + for (it = c->callstreams->head; it; it = it->next) { + cs = it->data; + + for (i = 0; i < 2; i++) { + p = &cs->peers[i]; + + hlp->ports[p->rtps[0].localport] = &p->rtps[0]; + hlp->ports[p->rtps[1].localport] = &p->rtps[1]; + + check = cm->timeout; + if (!p->rtps[0].peer.ip || !p->rtps[0].peer.port) + check = cm->silent_timeout; + + if (po->now - p->rtps[0].last < check) + goto good; + } + } + + mylog(LOG_INFO, "[%s] Closing call due to timeout", c->callid); + +drop: + hlp->del = g_list_prepend(hlp->del, c); + return; + +good: + ; +} + + +#define DS(x) do { \ + if (ke->stats.x < sr->kstats.x) \ + d = 0; \ + else \ + d = ke->stats.x - sr->kstats.x; \ + sr->stats.x += d; \ + m->statsps.x += d; \ + } while (0) +static void callmaster_timer(void *ptr) { + struct callmaster *m = ptr; + struct iterator_helper hlp; + GList *i; + struct call *c; + struct mediaproxy_list_entry *ke; + struct streamrelay *sr; + struct poller *po; + u_int64_t d; + + ZERO(hlp); + po = m->poller; + + g_hash_table_foreach(m->callhash, call_timer_iterator, &hlp); + + memcpy(&m->stats, &m->statsps, sizeof(m->stats)); + ZERO(m->statsps); + + i = kernel_list(m->kernelid); + while (i) { + ke = i->data; + + sr = hlp.ports[ke->target.target_port]; + if (!sr) + goto next; + + DS(packets); + DS(bytes); + DS(errors); + + if (ke->stats.packets != sr->kstats.packets) + sr->last = po->now; + + memcpy(&sr->kstats, &ke->stats, sizeof(sr->kstats)); + +next: + free(ke); + i = g_list_delete_link(i, i); + } + + for (i = hlp.del; i; i = i->next) { + c = i->data; + call_destroy(c); + } + + g_list_free(i); +} +#undef DS + + +struct callmaster *callmaster_new(struct poller *p) { + struct callmaster *c; + + c = malloc(sizeof(*c)); + ZERO(*c); + + c->callhash = g_hash_table_new(g_str_hash, g_str_equal); + if (!c->callhash) + goto fail; + c->poller = p; + + poller_timer(p, callmaster_timer, c); + + return c; + +fail: + free(c); + return NULL; +} + + + +static int get_port(struct streamrelay *r, u_int16_t p) { + int fd; + struct sockaddr_in sin; + + fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd < 0) + return -1; + + NONBLOCK(fd); + REUSEADDR(fd); + setsockopt(fd, SOL_IP, IP_TOS, &r->up->up->call->callmaster->tos, sizeof(r->up->up->call->callmaster->tos)); + + ZERO(sin); + sin.sin_family = AF_INET; + sin.sin_port = htons(p); + if (bind(fd, (struct sockaddr *) &sin, sizeof(sin))) + goto fail; + + r->fd = fd; + r->localport = p; + + return 0; + +fail: + close(fd); + return -1; +} + + +static void get_port_pair(struct peer *p) { + struct call *c; + struct callmaster *m; + struct streamrelay *a, *b; + u_int16_t port; + + c = p->up->call; + m = c->callmaster; + a = &p->rtps[0]; + b = &p->rtps[1]; + + assert(a->fd == -1 && b->fd == -1); + + port = m->lastport + 1; + + for (;;) { + if (port < 1024) + port = 1024; + + if (port == m->lastport) + goto fail; + + if ((port & 1)) + goto next; + + if (get_port(a, port)) + goto next; + + port++; + if (get_port(b, port)) + goto tryagain; + + break; + +tryagain: + close(a->fd); +next: + port++; + } + + m->lastport = port; + mylog(LOG_DEBUG, "[%s] Opened ports %u/%u for RTP", c->callid, a->localport, b->localport); + return; + +fail: + mylog(LOG_ERR, "[%s] Failed to get RTP port pair", c->callid); + if (a->fd != -1) + close(a->fd); + if (b->fd != -1) + close(b->fd); + a->fd = b->fd = -1; +} + + + +static int setup_peer(struct peer *p, struct stream *s, char *tag) { + struct streamrelay *a, *b; + + a = &p->rtps[0]; + b = &p->rtps[1]; + + if (a->peer.ip != s->ip || a->peer.port != b->peer.port) { + p->confirmed = 0; + if (p->kernelized) + unkernelize(p); + } + + a->peer.ip = b->peer.ip = s->ip; + a->peer.port = b->peer.port = s->port; + if (b->peer.port) + b->peer.port++; + + strdupfree(&p->mediatype, s->mediatype); + strdupfree(&p->tag, tag); + p->filled = 1; + + return 0; +} + +static void steal_peer(struct peer *p, struct streamrelay *r) { + struct peer *s = r->up; + int i; + struct poller_item pi; + struct streamrelay *sr, *srs; + struct call *c; + struct poller *po; + + ZERO(pi); + c = s->up->call; + po = c->callmaster->poller; + + mylog(LOG_DEBUG, "[%s] Re-using existing open RTP ports", c->callid); + + if (s->kernelized) + unkernelize(s); + + p->filled = 1; + strmove(&p->mediatype, &s->mediatype); + strmove(&p->tag, &s->tag); + //p->kernelized = s->kernelized; + //s->kernelized = 0; + + for (i = 0; i < 2; i++) { + sr = &p->rtps[i]; + srs = &s->rtps[i]; + + if (sr->fd != -1) { + close(sr->fd); + poller_del_item(po, sr->fd); + } + + sr->fd = srs->fd; + + sr->peer.ip = srs->peer.ip; + sr->peer.port = srs->peer.port; + + sr->localport = srs->localport; + + + srs->fd = -1; + srs->peer.ip = 0; + srs->peer.port = 0; + srs->localport = 0; + + pi.fd = sr->fd; + pi.ptr = sr; + pi.readable = stream_readable; + pi.closed = stream_closed; + + poller_update_item(po, &pi); + } +} + + +static void callstream_init(struct callstream *s, struct call *ca) { + int i, j; + struct peer *p; + struct streamrelay *r; + struct poller_item pi; + struct poller *po; + + po = ca->callmaster->poller; + + ZERO(*s); + ZERO(pi); + + s->call = ca; + + for (i = 0; i < 2; i++) { + p = &s->peers[i]; + + p->idx = i; + p->up = s; + p->tag = strdup(""); + p->mediatype = strdup(""); + + for (j = 0; j < 2; j++) { + r = &p->rtps[j]; + + r->fd = -1; + r->idx = j; + r->up = p; + r->last = po->now; + } + + get_port_pair(p); + + for (j = 0; j < 2; j++) { + r = &p->rtps[j]; + + pi.fd = r->fd; + pi.ptr = r; + pi.readable = stream_readable; + pi.closed = stream_closed; + + poller_add_item(po, &pi); + } + } +} + + + +static unsigned int call_streams(struct call *c, GQueue *s, char *tag, int opmode) { + GQueue *q; + GList *i, *l; + struct stream *t; + int x; + struct streamrelay *r; + struct callstream *cs; + struct peer *p; + unsigned int ret; + + q = g_queue_new(); /* new callstreams list */ + + if (!tag) + tag = ""; + + for (i = s->head; i; i = i->next) { + t = i->data; + + p = NULL; + + if (!opmode) { + DBG("creating new callstream"); + cs = malloc(sizeof(*cs)); + callstream_init(cs, c); + p = &cs->peers[0]; + } + else { + l = c->callstreams->head; + if (!l) { + mylog(LOG_WARNING, "[%s] Got LOOKUP, but no callstreams found", c->callid); + break; + } + cs = l->data; +#if 0 + if (cs->peers[1].filled) { + mylog(LOG_WARNING, "[%s] Got LOOKUP, but no incomplete callstreams found", c->callid); + break; + } +#endif + g_queue_delete_link(c->callstreams, l); + p = &cs->peers[1]; + } + + + for (l = c->callstreams->head; l; l = l->next) { + cs = l->data; + for (x = 0; x < 2; x++) { + r = &cs->peers[x].rtps[0]; + if (r->peer.ip != t->ip) + continue; + if (r->peer.port != t->port) + continue; + if (strcmp(cs->peers[x].tag, tag)) + continue; + DBG("found existing call stream to steal"); + goto found; + } + } + + /* not found */ + setup_peer(p, t, tag); + g_queue_push_tail(q, p->up); + continue; + +found: + steal_peer(p, r); + g_queue_push_tail(q, p->up); + } + + ret = q->length; + + if (!q->head) + g_queue_free(q); + else { + if (c->callstreams->head) { + q->tail->next = c->callstreams->head; + c->callstreams->head->prev = q->tail; + q->tail = c->callstreams->tail; + q->length += c->callstreams->length; + c->callstreams->head = c->callstreams->tail = NULL; + c->callstreams->length = 0; + } + g_queue_free(c->callstreams); + c->callstreams = q; + } + + return ret; +} + + + + +static void unkernelize(struct peer *p) { + struct streamrelay *r; + int i; + + if (!p->kernelized) + return; + + for (i = 0; i < 2; i++) { + r = &p->rtps[i]; + + kernel_del_stream(p->up->call->callmaster->kernelfd, r->localport); + + } + + p->kernelized = 0; +} + + + +static void kill_callstream(struct callstream *s) { + int i, j; + struct peer *p; + struct streamrelay *r; + + for (i = 0; i < 2; i++) { + p = &s->peers[i]; + + unkernelize(p); + + free(p->tag); + free(p->mediatype); + + for (j = 0; j < 2; j++) { + r = &p->rtps[j]; + + if (r->fd != -1) + close(r->fd); + poller_del_item(s->call->callmaster->poller, r->fd); + } + } + + free(s); +} + + + +static void call_destroy(struct call *c) { + struct callmaster *m = c->callmaster; + struct callstream *s; + + g_hash_table_remove(m->callhash, c->callid); + + free(c->callid); + g_hash_table_destroy(c->infohash); + if (c->calling_agent) + free(c->calling_agent); + if (c->called_agent) + free(c->called_agent); + + while (c->callstreams->head) { + s = g_queue_pop_head(c->callstreams); + kill_callstream(s); + } + g_queue_free(c->callstreams); + + free(c); +} + + + +static char *streams_print(GQueue *s, unsigned int num, unsigned int off) { + GString *o; + int i; + GList *l; + struct callstream *t; + struct streamrelay *x; + + o = g_string_new(""); + + if (!s->head) + goto out; + + t = s->head->data; + g_string_append_printf(o, IPF, IPP(t->call->callmaster->ip)); + + for (i = 0, l = s->head; i < num && l; i++, l = l->next) { + t = l->data; + x = &t->peers[off].rtps[0]; + g_string_append_printf(o, " %u", x->localport); + } + +out: + g_string_append(o, "\n"); + + return g_string_free(o, FALSE); +} + + + +char *call_request(const char **o, struct callmaster *m) { + struct call *c; + GQueue *s; + unsigned int num; + + c = g_hash_table_lookup(m->callhash, o[2]); + if (!c) { + mylog(LOG_NOTICE, "[%s] Creating new call", o[2]); + c = malloc(sizeof(*c)); + ZERO(*c); + c->callmaster = m; + c->callid = strdup(o[2]); + c->callstreams = g_queue_new(); + c->created = m->poller->now; + g_hash_table_insert(m->callhash, c->callid, c); + } + + strdupfree(&c->calling_agent, o[9] ? : "UNKNOWN"); + info_parse(o[10], &c->infohash); + s = streams_parse(o[3]); + num = call_streams(c, s, g_hash_table_lookup(c->infohash, "fromtag"), 0); + streams_free(s); + + return streams_print(c->callstreams, num, 0); +} + +char *call_lookup(const char **o, struct callmaster *m) { + struct call *c; + GQueue *s; + unsigned int num; + + c = g_hash_table_lookup(m->callhash, o[2]); + if (!c) { + mylog(LOG_WARNING, "[%s] Got LOOKUP for unknown call-id", o[2]); + return NULL; + } + + strdupfree(&c->called_agent, o[9] ? : "UNKNOWN"); + info_parse(o[10], &c->infohash); + s = streams_parse(o[3]); + num = call_streams(c, s, g_hash_table_lookup(c->infohash, "totag"), 1); + streams_free(s); + + return streams_print(c->callstreams, num, 1); +} + +void call_delete(const char **o, struct callmaster *m) { + struct call *c; + + c = g_hash_table_lookup(m->callhash, o[12]); + if (!c) + return; + + call_destroy(c); +} + + + +static void call_status_iterator(void *key, void *val, void *ptr) { + struct call *c = val; + struct control_stream *s = ptr; + GList *l; + struct callstream *cs; + struct peer *p; + struct streamrelay *r1, *r2; + struct streamrelay *rx1, *rx2; + struct callmaster *m; + + m = c->callmaster; + + streambuf_printf(s->outbuf, "session %s %s %s %s %s %i\n", + c->callid, + (char *) g_hash_table_lookup(c->infohash, "from"), + (char *) g_hash_table_lookup(c->infohash, "to"), + c->calling_agent, c->called_agent, + (int) (m->poller->now - c->created)); + + for (l = c->callstreams->head; l; l = l->next) { + cs = l->data; + + p = &cs->peers[0]; + r1 = &p->rtps[0]; + r2 = &cs->peers[1].rtps[0]; + rx1 = &p->rtps[1]; + rx2 = &cs->peers[1].rtps[1]; + + if (r1->fd == -1 || r2->fd == -1) + continue; + + streambuf_printf(s->outbuf, "stream " IPF ":%u " IPF ":%u " IPF ":%u %llu/%llu/%llu %s %s %s %i\n", + IPP(r1->peer.ip), r1->peer.port, + IPP(r2->peer.ip), r2->peer.port, + IPP(m->ip), r1->localport, + (long long unsigned int) r1->stats.bytes + rx1->stats.bytes, + (long long unsigned int) r2->stats.bytes + rx2->stats.bytes, + (long long unsigned int) r1->stats.bytes + rx1->stats.bytes + r2->stats.bytes + rx2->stats.bytes, + "active", + p->codec ? : "unknown", + p->mediatype, (int) (m->poller->now - r1->last)); + + } + +} + +void calls_status(struct callmaster *m, struct control_stream *s) { + streambuf_printf(s->outbuf, "proxy %u %llu/%llu/%llu\n", + g_hash_table_size(m->callhash), + (long long unsigned int) m->stats.bytes, + (long long unsigned int) m->stats.bytes - m->stats.errors, + (long long unsigned int) m->stats.bytes * 2 - m->stats.errors); + + g_hash_table_foreach(m->callhash, call_status_iterator, s); +} diff --git a/daemon/call.h b/daemon/call.h new file mode 100644 index 0000000..341c16b --- /dev/null +++ b/daemon/call.h @@ -0,0 +1,98 @@ +#ifndef __CALL_H__ +#define __CALL_H__ + + + + +#include +#include "ipt_MEDIAPROXY.h" + + + +struct poller; +struct control_stream; + + + +struct peer; +struct callstream; +struct call; +struct callmaster; + + + + +struct stream { + u_int32_t ip; + u_int16_t port; + char *mediatype; +}; +struct streamrelay { + int fd; + struct stream peer; + u_int16_t localport; + unsigned char idx; + struct peer *up; + struct mediaproxy_stats stats; + struct mediaproxy_stats kstats; + time_t last; +}; +struct peer { + struct streamrelay rtps[2]; + char *tag; + char *mediatype; + char *codec; + unsigned char idx; + struct callstream *up; + int kernelized:1; + int filled:1; + int confirmed:1; +}; +struct callstream { + struct peer peers[2]; + struct call *call; +}; + +struct call { + struct callmaster *callmaster; + + GQueue *callstreams; + + char *callid; + time_t created; + char *calling_agent; + char *called_agent; + GHashTable *infohash; +}; + +struct callmaster { + GHashTable *callhash; + u_int16_t lastport; + struct mediaproxy_stats statsps; + struct mediaproxy_stats stats; + + struct poller *poller; + int kernelfd; + unsigned int kernelid; + u_int32_t ip; + unsigned int timeout; + unsigned int silent_timeout; + unsigned char tos; +}; + + + + +struct callmaster *callmaster_new(struct poller *); + + + +char *call_request(const char **, struct callmaster *); +char *call_lookup(const char **, struct callmaster *); +void call_delete(const char **, struct callmaster *); + +void calls_status(struct callmaster *, struct control_stream *); + + + +#endif diff --git a/daemon/control.c b/daemon/control.c new file mode 100644 index 0000000..da5e139 --- /dev/null +++ b/daemon/control.c @@ -0,0 +1,250 @@ +#include +#include +#include +#include +#include +#include + +#include "control.h" +#include "poller.h" +#include "aux.h" +#include "streambuf.h" +#include "log.h" +#include "build_time.h" +#include "call.h" + + + +static pcre *parse_re; +static pcre_extra *parse_ree; + + + + +static void control_stream_closed(int fd, void *p) { + struct control_stream *s = p; + struct control *c; + + mylog(LOG_INFO, "Control connection from " DF " closed", DP(s->inaddr)); + + c = s->control; + + c->stream_head = g_list_remove_link(c->stream_head, &s->link); + + close(fd); + if (poller_del_item(s->poller, fd)) + abort(); + + streambuf_destroy(s->inbuf); + streambuf_destroy(s->outbuf); + free(s); +} + + +static void control_list(struct control *c, struct control_stream *s) { + struct control_stream *i; + + for (i = (void *) c->stream_head; i; i = (void *) i->link.next) + streambuf_printf(s->outbuf, DF "\n", DP(s->inaddr)); + + streambuf_printf(s->outbuf, "End.\n"); +} + + +static int control_stream_parse(struct control_stream *s, char *line) { + const char *errptr; + int erroff; + int ovec[60]; + int ret; + const char **out; + struct control *c = s->control; + char *output = NULL; + + if (!parse_re) { + parse_re = pcre_compile( + /* reqtype callid streams ip fromdom fromtype todom totype agent info |reqtype callid info | reqtype */ + "^(?:(request|lookup)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+info=(\\S*)|(delete)\\s+(\\S+)\\s+info=(\\S*)|(build|version|controls|quit|exit|status))$", + PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL); + parse_ree = pcre_study(parse_re, 0, &errptr); + } + + ret = pcre_exec(parse_re, parse_ree, line, strlen(line), 0, 0, ovec, ARRAY_SIZE(ovec)); + if (ret <= 0) { + mylog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(s->inaddr), line); + return -1; + } + + mylog(LOG_INFO, "Got valid command from " DF ": %s", DP(s->inaddr), line); + + pcre_get_substring_list(line, ovec, ret, &out); + + + if (!strcmp(out[1], "request")) + output = call_request(out, c->callmaster); + else if (!strcmp(out[1], "lookup")) + output = call_lookup(out, c->callmaster); + else if (!strcmp(out[11], "delete")) + call_delete(out, c->callmaster); + else if (!strcmp(out[14], "status")) + calls_status(c->callmaster, s); + else if (!strcmp(out[14], "build") | !strcmp(out[14], "version")) + streambuf_printf(s->outbuf, "Build: %s\n", BUILD_TIME); + else if (!strcmp(out[14], "controls")) + control_list(c, s); + else if (!strcmp(out[14], "quit") || !strcmp(out[14], "exit")) + ; + + if (output) { + streambuf_write(s->outbuf, output, strlen(output)); + free(output); + } + + pcre_free(out); + return -1; +} + + +static void control_stream_timer(int fd, void *p) { + struct control_stream *s = p; + struct poller *o = s->poller; + + if ((o->now - s->inbuf->active) >= 60 || (o->now - s->outbuf->active) >= 60) + control_stream_closed(s->fd, s); +} + + +static void control_stream_readable(int fd, void *p) { + struct control_stream *s = p; + char *line; + int ret; + + if (streambuf_readable(s->inbuf)) + goto close; + + while ((line = streambuf_getline(s->inbuf))) { + mylog(LOG_DEBUG, "Got control line from " DF ": %s", DP(s->inaddr), line); + ret = control_stream_parse(s, line); + free(line); + if (ret) + goto close; + } + + if (streambuf_bufsize(s->inbuf) > 1024) { + mylog(LOG_WARNING, "Buffer length exceeded in control connection from " DF, DP(s->inaddr)); + goto close; + } + + return; + +close: + control_stream_closed(fd, s); +} + +static void control_stream_writeable(int fd, void *p) { + struct control_stream *s = p; + + if (streambuf_writeable(s->outbuf)) + control_stream_closed(fd, s); +} + +static void control_closed(int fd, void *p) { + abort(); +} + +static void control_incoming(int fd, void *p) { + int nfd; + struct control *c = p; + struct control_stream *s; + struct poller_item i; + struct sockaddr_in sin; + socklen_t sinl; + + sinl = sizeof(sin); + nfd = accept(fd, (struct sockaddr *) &sin, &sinl); + if (nfd == -1) + return; + NONBLOCK(nfd); + + mylog(LOG_INFO, "New control connection from " DF, DP(sin)); + + s = malloc(sizeof(*s)); + ZERO(*s); + + ZERO(i); + i.fd = nfd; + i.closed = control_stream_closed; + i.readable = control_stream_readable; + i.writeable = control_stream_writeable; + i.timer = control_stream_timer; + i.ptr = s; + if (poller_add_item(c->poller, &i)) + goto fail; + s->fd = nfd; + s->control = c; + s->poller = c->poller; + s->inbuf = streambuf_new(c->poller, nfd); + s->outbuf = streambuf_new(c->poller, nfd); + memcpy(&s->inaddr, &sin, sizeof(s->inaddr)); + + c->stream_head = g_list_link(c->stream_head, &s->link); + + return; + +fail: + free(s); + close(nfd); +} + + +struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, struct callmaster *m) { + int fd; + struct control *c; + struct poller_item i; + struct sockaddr_in sin; + + if (!p) + return NULL; + if (!m) + return NULL; + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd == -1) + return NULL; + + NONBLOCK(fd); + REUSEADDR(fd); + + ZERO(sin); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = ip; + sin.sin_port = htons(port); + if (bind(fd, (struct sockaddr *) &sin, sizeof(sin))) + goto fail; + + if (listen(fd, 5)) + goto fail; + + + c = malloc(sizeof(*c)); + ZERO(*c); + + c->fd = fd; + c->poller = p; + c->callmaster = m; + + ZERO(i); + i.fd = fd; + i.closed = control_closed; + i.readable = control_incoming; + i.ptr = c; + if (poller_add_item(p, &i)) + goto fail2; + + return c; + +fail2: + free(c); +fail: + close(fd); + return NULL; +} diff --git a/daemon/control.h b/daemon/control.h new file mode 100644 index 0000000..75c62d2 --- /dev/null +++ b/daemon/control.h @@ -0,0 +1,48 @@ +#ifndef __CONTROL_H__ +#define __CONTROL_H__ + + + +#include +#include +#include +#include +#include + + + +struct poller; +struct control; +struct streambuf; +struct callmaster; + + + +struct control_stream { + GList link; /* must be first */ + + int fd; + struct streambuf *inbuf; + struct streambuf *outbuf; + struct sockaddr_in inaddr; + + struct control *control; + struct poller *poller; +}; + + +struct control { + int fd; + + GList *stream_head; + + struct poller *poller; + struct callmaster *callmaster; +}; + + +struct control *control_new(struct poller *, u_int32_t, u_int16_t, struct callmaster *); + + + +#endif diff --git a/daemon/kernel.c b/daemon/kernel.c new file mode 100644 index 0000000..5c19ef7 --- /dev/null +++ b/daemon/kernel.c @@ -0,0 +1,128 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "ipt_MEDIAPROXY.h" + +#include "aux.h" +#include "kernel.h" + + + + +#if 1 +#define PREFIX "/proc/mediaproxy" +#else +#define PREFIX "/tmp/mediaproxy" +#endif + + + + + +int kernel_create_table(unsigned int id) { + char str[64]; + int fd; + int i; + + fd = open(PREFIX "/control", O_WRONLY | O_TRUNC); + if (fd == -1) + return -1; + sprintf(str, "add %u\n", id); + i = write(fd, str, strlen(str)); + if (i == -1) + goto fail; + close(fd); + + return 0; + +fail: + close(fd); + return -1; +} + + +int kernel_open_table(unsigned int id) { + char str[64]; + int fd; + struct mediaproxy_message msg; + int i; + + sprintf(str, PREFIX "/%u/control", id); + fd = open(str, O_WRONLY | O_TRUNC); + if (fd == -1) + return -1; + + ZERO(msg); + msg.cmd = MMG_NOOP; + i = write(fd, &msg, sizeof(msg)); + if (i <= 0) + goto fail; + + return fd; + +fail: + close(fd); + return -1; +} + + +int kernel_add_stream(int fd, struct kernel_stream *info, int update) { + struct mediaproxy_message msg; + + ZERO(msg); + msg.cmd = update ? MMG_UPDATE : MMG_ADD; + msg.target.target_port = info->local_port; + msg.target.src_ip = info->src.ip; + msg.target.dst_ip = info->dest.ip; + msg.target.src_port = info->src.port; + msg.target.dst_port = info->dest.port; + msg.target.mirror_ip = info->mirror.ip; + msg.target.mirror_port = info->mirror.port; + msg.target.tos = info->tos; + + return write(fd, &msg, sizeof(msg)) <= 0 ? -1 : 0; +} + + +int kernel_del_stream(int fd, u_int16_t p) { + struct mediaproxy_message msg; + + ZERO(msg); + msg.cmd = MMG_DEL; + msg.target.target_port = p; + + return write(fd, &msg, sizeof(msg)) <= 0 ? -1 : 0; +} + + +GList *kernel_list(unsigned int id) { + char str[64]; + int fd; + struct mediaproxy_list_entry *buf; + GList *li = NULL; + int ret; + + sprintf(str, PREFIX "/%u/blist", id); + fd = open(str, O_RDONLY); + if (fd == -1) + return NULL; + + + for (;;) { + buf = malloc(sizeof(*buf)); + ret = read(fd, buf, sizeof(*buf)); + if (ret != sizeof(*buf)) + break; + li = g_list_prepend(li, buf); + } + + free(buf); + close(fd); + + return li; +} diff --git a/daemon/kernel.h b/daemon/kernel.h new file mode 100644 index 0000000..f3c34af --- /dev/null +++ b/daemon/kernel.h @@ -0,0 +1,38 @@ +#ifndef __KERNEL_H__ +#define __KERNEL_H__ + + + +#include +#include + + + + +struct ip_port { + u_int32_t ip; + u_int16_t port; +}; + +struct kernel_stream { + u_int16_t local_port; + struct ip_port src; + struct ip_port dest; + struct ip_port mirror; + unsigned char tos; +}; + + + + +int kernel_create_table(unsigned int); +int kernel_open_table(unsigned int); + +int kernel_add_stream(int, struct kernel_stream *, int); +int kernel_del_stream(int, u_int16_t); +GList *kernel_list(unsigned int); + + + + +#endif diff --git a/daemon/log.h b/daemon/log.h new file mode 100644 index 0000000..7767281 --- /dev/null +++ b/daemon/log.h @@ -0,0 +1,12 @@ +#ifndef __LOG_H__ +#define __LOG_H__ + + +#include + + +#define mylog(x,y...) syslog(x,y) + + + +#endif diff --git a/daemon/main.c b/daemon/main.c new file mode 100644 index 0000000..e027694 --- /dev/null +++ b/daemon/main.c @@ -0,0 +1,199 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "poller.h" +#include "control.h" +#include "aux.h" +#include "log.h" +#include "call.h" +#include "kernel.h" + + + + +#define die(x...) do { fprintf(stderr, x); exit(-1); } while(0) + + + + + +static char *pidfile; +static gboolean foreground; +static u_int32_t ip; +static u_int32_t listenp; +static u_int16_t listenport; +static int tos; +static int table; +static int timeout; +static int silent_timeout; + + + + +static void signals(void) { + signal(SIGPIPE, SIG_IGN); +} + +static int rlim(int res, rlim_t val) { + struct rlimit rlim; + + ZERO(rlim); + rlim.rlim_cur = rlim.rlim_max = val; + return setrlimit(res, &rlim); +} + +static void resources(void) { + int tryv; + + rlim(RLIMIT_CORE, RLIM_INFINITY); + for (tryv = ((1<<16) - 1); tryv && rlim(RLIMIT_NOFILE, tryv) == -1; tryv >>= 1) + ; + + rlim(RLIMIT_DATA, RLIM_INFINITY); + rlim(RLIMIT_RSS, RLIM_INFINITY); + rlim(RLIMIT_AS, RLIM_INFINITY); +} + + + +static void options(int *argc, char ***argv) { + static char *ips; + static char *listenps; + static GOptionEntry e[] = { + { "table", 't', 0, G_OPTION_ARG_INT, &table, "Kernel table to use", "INT" }, + { "ip", 'i', 0, G_OPTION_ARG_STRING, &ips, "Local IP address", "IP" }, + { "listen", 'l', 0, G_OPTION_ARG_STRING, &listenps, "Port to listen on", "[IP:]PORT" }, + { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "TOS value to set on streams", "INT" }, + { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, + { "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" }, + { "pidfile", 'p', 0, G_OPTION_ARG_STRING, &pidfile, "Write PID to file", "FILE" }, + { "foreground", 'f', 0, G_OPTION_ARG_NONE, &foreground, "Don't fork to background", NULL }, + { NULL, } + }; + + GOptionContext *c; + GError *er = NULL; + char *p; + + c = g_option_context_new(" - next-generation media proxy"); + g_option_context_add_main_entries(c, e, NULL); + if (!g_option_context_parse(c, argc, argv, &er)) + die("Bad command line: %s\n", er->message); + + if (!ips) + die("Missing option IP\n"); + if (!listenps) + die("Missing option LISTEN\n"); + + ip = inet_addr(ips); + if (ip == -1) + die("Invalid IP\n"); + + p = strchr(listenps, ':'); + if (p) { + *p++ = 0; + listenp = inet_addr(listenps); + if (listenp == -1) + die("Invalid IP\n"); + listenport = atoi(p); + } + else { + if (strchr(listenps, '.')) + die("Invalid port\n"); + listenport = atoi(listenps); + } + if (!listenport) + die("Invalid port\n"); + if (p) + *--p = ':'; + + if (tos < 0 || tos > 255) + die("Invalid TOS value"); + + if (timeout <= 0) + timeout = 60; + if (silent_timeout <= 0) + silent_timeout = 3600; +} + + +static void daemonize(void) { + printf("Going to background...\n"); + if (fork()) + _exit(0); + freopen("/dev/null", "r", stdin); + freopen("/dev/null", "w", stdout); + freopen("/dev/null", "w", stderr); + setpgrp(); +} + +static void wpidfile(void) { + FILE *fp; + + if (!pidfile) + return; + + fp = fopen(pidfile, "w"); + if (fp) { + fprintf(fp, "%u\n", getpid()); + fclose(fp); + } +} + + +int main(int argc, char **argv) { + struct poller *p; + struct callmaster *m; + struct control *c; + int kfd; + int ret; + + options(&argc, &argv); + signals(); + resources(); + + + if (kernel_create_table(table)) + die("Failed to create kernel table %i\n", table); + kfd = kernel_open_table(table); + if (kfd == -1) + die("Failed to open kernel table %i\n", table); + + p = poller_new(); + if (!p) + die("poller creation failed\n"); + + m = callmaster_new(p); + if (!m) + return -1; + m->kernelfd = kfd; + m->kernelid = table; + m->ip = ip; + m->timeout = timeout; + m->silent_timeout = silent_timeout; + m->tos = tos; + + c = control_new(p, listenp, listenport, m); + if (!c) + die("Failed to open control connection port\n"); + + mylog(LOG_INFO, "Startup complete"); + + if (!foreground) + daemonize(); + wpidfile(); + + for (;;) { + ret = poller_poll(p, 100); + if (ret == -1) + break; + } + + return 0; +} diff --git a/daemon/poller-test.c b/daemon/poller-test.c new file mode 100644 index 0000000..6a2aef8 --- /dev/null +++ b/daemon/poller-test.c @@ -0,0 +1,60 @@ +#include +#include +#include "poller.h" + + +void dummy(int a, void *b) { +} + + +int main() { + struct poller *p; + struct poller_item i; + + p = poller_new(); + if (!p) { + fprintf(stderr, "poller creation failed\n"); + return -1; + } + + assert(p->items_size == 0); + assert(p->pollfds_size == 0); + + i.readable = dummy; + i.writeable = dummy; + i.closed = dummy; + + + i.fd = 3; + assert(poller_add_item(p, &i) == 0); + i.fd = 4; + assert(poller_add_item(p, &i) == 0); + i.fd = 2; + assert(poller_add_item(p, &i) == 0); + i.fd = 6; + assert(poller_add_item(p, &i) == 0); + i.fd = 0; + assert(poller_add_item(p, &i) == 0); + i.fd = 1; + assert(poller_add_item(p, &i) == 0); + i.fd = 5; + assert(poller_add_item(p, &i) == 0); + i.fd = 7; + assert(poller_add_item(p, &i) == 0); + i.fd = 9; + assert(poller_add_item(p, &i) == 0); + + + assert(poller_del_item(p, 10) == -1); + assert(poller_del_item(p, 6) == 0); + assert(poller_del_item(p, 8) == -1); + assert(poller_del_item(p, 0) == 0); + assert(poller_del_item(p, 3) == 0); + assert(poller_del_item(p, 11) == -1); + assert(poller_del_item(p, 9) == 0); + assert(poller_del_item(p, 11) == -1); + assert(poller_del_item(p, 4) == 0); + + + return 0; +} diff --git a/daemon/poller.c b/daemon/poller.c new file mode 100644 index 0000000..9bc7e1f --- /dev/null +++ b/daemon/poller.c @@ -0,0 +1,311 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "poller.h" +#include "aux.h" + + + + +#define POLLER_BSEARCH(b,l,k,e) mybsearch(b, l, sizeof(struct pollfd), k, OFFSET_OF(struct pollfd, fd), sizeof(*(k)), e) + + + + +struct timer_item { + void (*func)(void *); + void *ptr; +}; + + + + + +struct poller *poller_new(void) { + struct poller *p; + + p = malloc(sizeof(*p)); + memset(p, 0, sizeof(*p)); + p->now = time(NULL); + + return p; +} + + +int poller_add_item(struct poller *p, struct poller_item *i) { + struct poller_item *ip; + struct pollfd *pf; + int idx; + unsigned int u; + + if (!p || !i) + return -1; + if (i->fd < 0) + return -1; + if (!i->readable && !i->writeable) + return -1; + if (!i->closed) + return -1; + + if (i->fd < p->items_size && p->items[i->fd]) + return -1; + + idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &i->fd, 0); + assert(idx < 0); + + idx *= -1; + idx--; + + p->pollfds_size++; + p->pollfds = realloc(p->pollfds, p->pollfds_size * sizeof(*p->pollfds)); + memmove(p->pollfds + idx + 1, p->pollfds + idx, (p->pollfds_size - idx - 1) * sizeof(*p->pollfds)); + pf = &p->pollfds[idx]; + + pf->fd = i->fd; + pf->events = POLLHUP | POLLERR | ((i->writeable && i->blocked) ? POLLOUT : 0) | (i->readable ? POLLIN : 0); + pf->revents = 0; + + if (i->fd >= p->items_size) { + u = p->items_size; + p->items_size = i->fd + 1; + p->items = realloc(p->items, sizeof(*p->items) * p->items_size); + memset(p->items + u, 0, sizeof(*p->items) * (p->items_size - u - 1)); + } + + ip = malloc(sizeof(*ip)); + memcpy(ip, i, sizeof(*ip)); + p->items[i->fd] = ip; + + return 0; +} + + +int poller_del_item(struct poller *p, int fd) { + int idx; + + if (!p || fd < 0) + return -1; + if (fd >= p->items_size) + return -1; + if (!p->items || !p->items[fd]) + return -1; + if (!p->pollfds || !p->pollfds_size) + return -1; + + idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &fd, 1); + assert(idx != -1); + + memmove(p->pollfds + idx, p->pollfds + idx + 1, (p->pollfds_size - idx - 1) * sizeof(*p->pollfds)); + p->pollfds_size--; + p->pollfds = realloc(p->pollfds, p->pollfds_size * sizeof(*p->pollfds)); + + if (p->pollfds_work) { + idx = POLLER_BSEARCH(p->pollfds_work, p->pollfds_work_size, &fd, 1); + + if (idx != -1) + p->pollfds_work[idx].fd = -1; + } + + free(p->items[fd]); + p->items[fd] = NULL; + + return 0; +} + + +int poller_update_item(struct poller *p, struct poller_item *i) { + struct poller_item *np; + + if (!p || !i) + return -1; + if (i->fd < 0) + return -1; + if (!i->readable && !i->writeable) + return -1; + if (!i->closed) + return -1; + + if (i->fd >= p->items_size || !(np = p->items[i->fd])) + return poller_add_item(p, i); + + np->ptr = i->ptr; + np->readable = i->readable; + np->writeable = i->writeable; + np->closed = i->closed; + np->timer = i->timer; + + return 0; +} + + +int poller_poll(struct poller *p, int timeout) { + struct pollfd *pfd, *pf; + int ret, i; + struct poller_item *it; + int idx; + time_t last; + int do_timer; + GList *li; + struct timer_item *ti; + + if (!p) + return -1; + if (!p->pollfds || !p->pollfds_size) + return -1; + if (!p->items || !p->items_size) + return -1; + + p->pollfds_work_size = i = p->pollfds_size; + p->pollfds_work = pfd = malloc(sizeof(*pfd) * i); + memcpy(pfd, p->pollfds, sizeof(*pfd) * i); + + do_timer = 0; + last = p->now; + p->now = time(NULL); + if (last != p->now) { + do_timer = 1; + ret = i; + + for (li = p->timers; li; li = li->next) { + ti = li->data; + ti->func(ti->ptr); + } + } + else { + ret = poll(pfd, i, timeout); + if (errno == EINTR) + ret = 0; + if (ret < 0) + goto out; + } + + pf = pfd; + for (pf = pfd; i; pf++) { + i--; + + if (pf->fd < 0) + continue; + + it = (pf->fd < p->items_size) ? p->items[pf->fd] : NULL; + if (!it) + continue; + + if (do_timer) { + if (it->timer) + it->timer(it->fd, it->ptr); + continue; + } + + if (it->error) { + it->closed(it->fd, it->ptr); + continue; + } + + if ((pf->revents & (POLLERR | POLLHUP))) + it->closed(it->fd, it->ptr); + else if ((pf->revents & POLLOUT)) { + it->blocked = 0; + + idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &it->fd, 1); + assert(idx != -1); + + p->pollfds[idx].events &= ~POLLOUT; + + it->writeable(it->fd, it->ptr); + } + else if ((pf->revents & POLLIN)) + it->readable(it->fd, it->ptr); + else if (!pf->revents) + continue; + else + abort(); + } + + +out: + free(pfd); + p->pollfds_work = NULL; + p->pollfds_work_size = 0; + return ret; +} + + +void poller_blocked(struct poller *p, int fd) { + int idx; + + if (!p || fd < 0) + return; + if (fd >= p->items_size) + return; + if (!p->items || !p->items[fd]) + return; + if (!p->pollfds || !p->pollfds_size) + return; + if (!p->items[fd]->writeable) + return; + + p->items[fd]->blocked = 1; + + idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &fd, 1); + assert(idx != -1); + + p->pollfds[idx].events |= POLLOUT; +} + +void poller_error(struct poller *p, int fd) { + if (!p || fd < 0) + return; + if (fd >= p->items_size) + return; + if (!p->items || !p->items[fd]) + return; + if (!p->pollfds || !p->pollfds_size) + return; + if (!p->items[fd]->writeable) + return; + + p->items[fd]->error = 1; + p->items[fd]->blocked = 1; +} + +int poller_isblocked(struct poller *p, int fd) { + if (!p || fd < 0) + return -1; + if (fd >= p->items_size) + return -1; + if (!p->items || !p->items[fd]) + return -1; + if (!p->pollfds || !p->pollfds_size) + return -1; + if (!p->items[fd]->writeable) + return -1; + + return p->items[fd]->blocked; +} + + + + +int poller_timer(struct poller *p, void (*f)(void *), void *ptr) { + struct timer_item *i; + + if (!p || !f) + return -1; + + i = malloc(sizeof(*i)); + ZERO(*i); + + i->func = f; + i->ptr = ptr; + + p->timers = g_list_prepend(p->timers, i); + + return 0; +} diff --git a/daemon/poller.h b/daemon/poller.h new file mode 100644 index 0000000..946c9d8 --- /dev/null +++ b/daemon/poller.h @@ -0,0 +1,51 @@ +#ifndef __POLLER_H__ +#define __POLLER_H__ + + + +#include +#include +#include + + + +struct poller_item { + int fd; + void *ptr; + + void (*readable)(int, void *); + void (*writeable)(int, void *); + void (*closed)(int, void *); + void (*timer)(int, void *); + + int blocked:1; + int error:1; +}; + +struct poller { + struct poller_item **items; + unsigned int items_size; + struct pollfd *pollfds; + unsigned int pollfds_size; + GList *timers; + + time_t now; + + struct pollfd *pollfds_work; + unsigned int pollfds_work_size; +}; + + +struct poller *poller_new(void); +int poller_add_item(struct poller *, struct poller_item *); +int poller_update_item(struct poller *, struct poller_item *); +int poller_del_item(struct poller *, int); +int poller_poll(struct poller *, int); +void poller_blocked(struct poller *, int); +int poller_isblocked(struct poller *, int); +void poller_error(struct poller *, int); + +int poller_timer(struct poller *, void (*)(void *), void *); + + +#endif diff --git a/daemon/streambuf.c b/daemon/streambuf.c new file mode 100644 index 0000000..ccb55c0 --- /dev/null +++ b/daemon/streambuf.c @@ -0,0 +1,172 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "streambuf.h" +#include "poller.h" +#include "aux.h" + + + +struct streambuf *streambuf_new(struct poller *p, int fd) { + struct streambuf *b; + + b = malloc(sizeof(*b)); + ZERO(*b); + + b->buf = g_string_new(""); + b->fd = fd; + b->poller = p; + b->active = p->now; + + return b; +} + + +void streambuf_destroy(struct streambuf *b) { + g_string_free(b->buf, TRUE); + free(b); +} + + +int streambuf_writeable(struct streambuf *b) { + int ret; + unsigned int out; + + for (;;) { + if (!b->buf->len) + break; + + out = (b->buf->len > 1024) ? 1024 : b->buf->len; + ret = write(b->fd, b->buf->str, out); + + if (ret < 0) { + if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) + return -1; + ret = 0; + } + + if (ret > 0) { + g_string_erase(b->buf, 0, ret); + b->active = b->poller->now; + } + + if (ret != out) { + poller_blocked(b->poller, b->fd); + break; + } + } + + return 0; +} + +int streambuf_readable(struct streambuf *b) { + int ret; + char buf[1024]; + + for (;;) { + ret = read(b->fd, buf, 1024); + + if (ret == 0) + return -1; + if (ret < 0) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) + break; + return -1; + } + + g_string_append_len(b->buf, buf, ret); + b->active = b->poller->now; + } + + return 0; +} + + +char *streambuf_getline(struct streambuf *b) { + char *p; + int len; + char *s = NULL; + + for (;;) { + if (s) + free(s); + + p = memchr(b->buf->str, '\n', b->buf->len); + if (!p) + return NULL; + + len = p - b->buf->str; + if (len == 0) { + g_string_erase(b->buf, 0, 1); + continue; + } + + s = malloc(len + 1); + memcpy(s, b->buf->str, len); + s[len] = '\0'; + g_string_erase(b->buf, 0, len + 1); + + if (s[--len] == '\r') { + if (len == 0) + continue; + s[len] = '\0'; + } + + break; + } + + return s; +} + +unsigned int streambuf_bufsize(struct streambuf *b) { + return b->buf->len; +} + + +void streambuf_printf(struct streambuf *b, char *f, ...) { + va_list va; + GString *gs; + + va_start(va, f); + gs = g_string_new(""); + g_string_vprintf(gs, f, va); + va_end(va); + + streambuf_write(b, gs->str, gs->len); + g_string_free(gs, TRUE); +} + +void streambuf_write(struct streambuf *b, char *s, unsigned int len) { + unsigned int out; + int ret; + + while (len && !poller_isblocked(b->poller, b->fd)) { + out = (len > 1024) ? 1024 : len; + ret = write(b->fd, s, out); + + if (ret < 0) { + if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { + poller_error(b->poller, b->fd); + break; + } + poller_blocked(b->poller, b->fd); + break; + } + if (ret == 0) + break; + + s += ret; + len -= ret; + b->active = b->poller->now; + } + + if (b->buf->len > 5242880) + poller_error(b->poller, b->fd); + else if (len) + g_string_append_len(b->buf, s, len); +} diff --git a/daemon/streambuf.h b/daemon/streambuf.h new file mode 100644 index 0000000..7a9d7d4 --- /dev/null +++ b/daemon/streambuf.h @@ -0,0 +1,35 @@ +#ifndef __BUFFER_H__ +#define __BUFFER_H__ + + + +#include +#include +#include + + + +struct poller; + + + +struct streambuf { + GString *buf; + int fd; + struct poller *poller; + time_t active; +}; + + + +struct streambuf *streambuf_new(struct poller *, int); +void streambuf_destroy(struct streambuf *); +int streambuf_writeable(struct streambuf *); +int streambuf_readable(struct streambuf *); +char *streambuf_getline(struct streambuf *); +unsigned int streambuf_bufsize(struct streambuf *); +void streambuf_printf(struct streambuf *, char *, ...) __attribute__ ((format (printf, 2, 3))); +void streambuf_write(struct streambuf *, char *, unsigned int); + + +#endif diff --git a/debian/changelog b/debian/changelog new file mode 100644 index 0000000..631fdea --- /dev/null +++ b/debian/changelog @@ -0,0 +1,5 @@ +mediaproxy-ng (0.1) unstable; urgency=low + + * Initial release. + + -- Michael Prokop Thu, 11 Feb 2010 23:06:08 +0100 diff --git a/debian/compat b/debian/compat new file mode 100644 index 0000000..7ed6ff8 --- /dev/null +++ b/debian/compat @@ -0,0 +1 @@ +5 diff --git a/debian/control b/debian/control new file mode 100644 index 0000000..9c092fe --- /dev/null +++ b/debian/control @@ -0,0 +1,19 @@ +Source: mediaproxy-ng +Section: net +Priority: extra +Maintainer: Michael Prokop +Build-Depends: debhelper (>= 5), iptables-dev, libglib2.0-dev, libpcre3-dev +Standards-Version: 3.8.4 +Homepage: http://sipwise.com/ + +Package: mediaproxy-ng-daemon +Architecture: any +Depends: ${shlibs:Depends}, ${misc:Depends} +Description: TODO + TODO + +Package: mediaproxy-ng-iptables +Architecture: any +Depends: ${shlibs:Depends}, ${misc:Depends} +Description: TODO + TODO diff --git a/debian/copyright b/debian/copyright new file mode 100644 index 0000000..04f88fe --- /dev/null +++ b/debian/copyright @@ -0,0 +1,7 @@ +Upstream Author: The Sipwise Team - http://sipwise.com/ +Copyright: Copyright (c) 2007-2010 Sipwise GmbH, Austria +License: All software included in this package is + Copyright (c) Sipwise GmbH, Austria. + All rights reserved. You may not copy, distribute + or modify without prior written permission from + Sipwise GmbH, Austria. diff --git a/debian/dirs b/debian/dirs new file mode 100644 index 0000000..236670a --- /dev/null +++ b/debian/dirs @@ -0,0 +1 @@ +usr/sbin diff --git a/debian/mediaproxy-ng-daemon.install b/debian/mediaproxy-ng-daemon.install new file mode 100644 index 0000000..77a48e8 --- /dev/null +++ b/debian/mediaproxy-ng-daemon.install @@ -0,0 +1 @@ +daemon/mediaproxy-ng /usr/sbin/ diff --git a/debian/mediaproxy-ng-iptables.install b/debian/mediaproxy-ng-iptables.install new file mode 100644 index 0000000..d5b5d02 --- /dev/null +++ b/debian/mediaproxy-ng-iptables.install @@ -0,0 +1 @@ +iptables-extension/libipt_MEDIAPROXY.so /lib/xtables/ diff --git a/debian/rules b/debian/rules new file mode 100755 index 0000000..c5140d8 --- /dev/null +++ b/debian/rules @@ -0,0 +1,81 @@ +#!/usr/bin/make -f +# -*- makefile -*- +# Sample debian/rules that uses debhelper. +# This file was originally written by Joey Hess and Craig Small. +# As a special exception, when this file is copied by dh-make into a +# dh-make output file, you may use that output file without restriction. +# This special exception was added by Craig Small in version 0.37 of dh-make. + +# Uncomment this to turn on verbose mode. +#export DH_VERBOSE=1 + +b=$(CURDIR)/debian/build + +build: build-stamp + +build-stamp: + dh_testdir + cd iptables-extension/ && gcc -O2 -Wall -shared -o libipt_MEDIAPROXY.so libipt_MEDIAPROXY-1.4.c && cd .. + cd daemon && $(MAKE) && cd .. + touch $@ + +clean: + dh_testdir + dh_testroot + rm -f build-stamp + rm -f iptables-extension/libipt_MEDIAPROXY.so + rm -f daemon/mediaproxy-ng daemon/build_time.h daemon/.depend kernel-module/.ipt_MEDIAPROXY.o.d + rm -rf kernel-module/.tmp_versions + dh_clean + -rm -rf debian/build + +install: build + dh_testdir + dh_testroot + dh_clean -k + dh_installdirs + +mediaproxy-ng-daemon: install + @echo "--- Building: $@" + dh_installdirs -p$@ -P$(b)/$@ + dh_link -p$@ -P$(b)/$@ + dh_installdocs -p$@ -P$(b)/$@ + dh_installchangelogs -p$@ -P$(b)/$@ + dh_install -p$@ -P$(b)/$@ + dh_strip -p$@ -P$(b)/$@ + dh_compress -p$@ -P$(b)/$@ + dh_fixperms -p$@ -P$(b)/$@ + dh_makeshlibs -p$@ -P$(b)/$@ -V + dh_installdeb -p$@ -P$(b)/$@ + dh_shlibdeps -p$@ -P$(b)/$@ + dh_installdebconf -p$@ -P$(b)/$@ + dh_gencontrol -p$@ -P$(b)/$@ + dh_md5sums -p$@ -P$(b)/$@ + dh_builddeb -p$@ -P$(b)/$@ + +mediaproxy-ng-iptables: install + @echo "--- Building: $@" + dh_installdirs -p$@ -P$(b)/$@ + dh_link -p$@ -P$(b)/$@ + dh_installdocs -p$@ -P$(b)/$@ + dh_installchangelogs -p$@ -P$(b)/$@ + dh_install -p$@ -P$(b)/$@ + dh_strip -p$@ -P$(b)/$@ + dh_compress -p$@ -P$(b)/$@ + dh_fixperms -p$@ -P$(b)/$@ + dh_makeshlibs -p$@ -P$(b)/$@ -V + dh_installdeb -p$@ -P$(b)/$@ + dh_shlibdeps -p$@ -P$(b)/$@ + dh_installdebconf -p$@ -P$(b)/$@ + dh_gencontrol -p$@ -P$(b)/$@ + dh_md5sums -p$@ -P$(b)/$@ + dh_builddeb -p$@ -P$(b)/$@ + +# Build architecture-dependent files here. +binary-all: build install + +# Build architecture-independent files here. +binary-indep: build install mediaproxy-ng-daemon mediaproxy-ng-iptables + +binary: binary-indep binary-arch +.PHONY: build clean binary-indep binary-arch binary install diff --git a/iptables-extension/libipt_MEDIAPROXY-1.4.c b/iptables-extension/libipt_MEDIAPROXY-1.4.c new file mode 100644 index 0000000..d44dfbc --- /dev/null +++ b/iptables-extension/libipt_MEDIAPROXY-1.4.c @@ -0,0 +1,81 @@ +/* gcc -O2 -Wall -shared -o libipt_MEDIAPROXY.so libipt_MEDIAPROXY.c */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "../kernel-module/ipt_MEDIAPROXY.h" + + +static void help(void) { + printf( + "MEDIAPROXY target options:\n" + " --id \n" + " Unique ID for this instance\n" + ); +} + +static int parse(int c, + char **argv, + int invert, + unsigned int *flags, + const void *entry, + struct xt_entry_target **target) { + + struct ipt_mediaproxy_info *info = (void *) *target; + + if (c == '1') { + info->id = atoi(optarg); + if (flags) + *flags = 1; + } + else + return 0; + + return 1; +} + +static void final_check(unsigned int flags) { + if (!flags) + exit_error(PARAMETER_PROBLEM, "You must specify --id"); +} + +static void print(const void *ip, const struct xt_entry_target *target, int numeric) { + struct ipt_mediaproxy_info *info = (void *) target; + + printf("id %u", info->id); +} + +static void save(const void *ip, const struct xt_entry_target *target) { + struct ipt_mediaproxy_info *info = (void *) target; + + printf("--id %u", info->id); +} + +static struct option opts[] = { + { "id", 1, NULL, '1' }, + { NULL, }, +}; + + +static struct xtables_target mediaproxy = { + .name = "MEDIAPROXY", + .family = AF_INET, + .version = "1.4.2", + .size = XT_ALIGN(sizeof(struct ipt_mediaproxy_info)), + .userspacesize = XT_ALIGN(sizeof(struct ipt_mediaproxy_info)), + .help = help, + .parse = parse, + .final_check = final_check, + .print = print, + .save = save, + .extra_opts = opts, +}; + +void _init(void) { + xtables_register_target(&mediaproxy); +} diff --git a/iptables-extension/libipt_MEDIAPROXY.c b/iptables-extension/libipt_MEDIAPROXY.c new file mode 100644 index 0000000..ca663b1 --- /dev/null +++ b/iptables-extension/libipt_MEDIAPROXY.c @@ -0,0 +1,80 @@ +/* gcc -O2 -Wall -shared -o libipt_MEDIAPROXY.so libipt_MEDIAPROXY.c */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "../kernel-module/ipt_MEDIAPROXY.h" + + +static void help(void) { + printf( + "MEDIAPROXY target options:\n" + " --id \n" + " Unique ID for this instance\n" + ); +} + +static int parse(int c, + char **argv, + int invert, + unsigned int *flags, + const struct ipt_entry *entry, + struct ipt_entry_target **target) { + + struct ipt_mediaproxy_info *info = (void *) (*target)->data; + + if (c == '1') { + info->id = atoi(optarg); + if (flags) + *flags = 1; + } + else + return 0; + + return 1; +} + +static void final_check(unsigned int flags) { + if (!flags) + exit_error(PARAMETER_PROBLEM, "You must specify --id"); +} + +static void print(const struct ipt_ip *ip, const struct ipt_entry_target *target, int numeric) { + struct ipt_mediaproxy_info *info = (void *) target->data; + + printf("id %u", info->id); +} + +static void save(const struct ipt_ip *ip, const struct ipt_entry_target *target) { + struct ipt_mediaproxy_info *info = (void *) target->data; + + printf("--id %u", info->id); +} + +static struct option opts[] = { + { "id", 1, NULL, '1' }, + { NULL, }, +}; + + +static struct iptables_target mediaproxy = { + .name = "MEDIAPROXY", + .version = "1.3.6", + .size = IPT_ALIGN(sizeof(struct ipt_mediaproxy_info)), + .userspacesize = IPT_ALIGN(sizeof(struct ipt_mediaproxy_info)), + .help = help, + .parse = parse, + .final_check = final_check, + .print = print, + .save = save, + .extra_opts = opts, +}; + +void _init(void) { + register_target(&mediaproxy); +} diff --git a/kernel-module/Makefile b/kernel-module/Makefile new file mode 100644 index 0000000..d76c628 --- /dev/null +++ b/kernel-module/Makefile @@ -0,0 +1 @@ +obj-m += ipt_MEDIAPROXY.o diff --git a/kernel-module/ipt_MEDIAPROXY.c b/kernel-module/ipt_MEDIAPROXY.c new file mode 100644 index 0000000..4208082 --- /dev/null +++ b/kernel-module/ipt_MEDIAPROXY.c @@ -0,0 +1,1187 @@ +/* make -C /lib/modules/$(uname -r)/build modules M=$(pwd) */ +/* make -C /lib/modules/$(uname -r)/build clean M=$(pwd) */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "ipt_MEDIAPROXY.h" + +MODULE_LICENSE("GPL"); + + + + +#define MAX_ID 64 /* - 1 */ + +#if 0 +#define DBG(x...) printk(KERN_DEBUG x) +#else +#define DBG(x...) ((void)0) +#endif + + + + + +static struct proc_dir_entry *my_proc_root; +static struct proc_dir_entry *proc_list; +static struct proc_dir_entry *proc_control; +static struct mediaproxy_table *table[64]; +static rwlock_t table_lock; + + + + +static ssize_t proc_control_write(struct file *, const char __user *, size_t, loff_t *); +static int proc_control_open(struct inode *, struct file *); +static int proc_control_close(struct inode *, struct file *); +static int proc_status(char *, char **, off_t, int, int *, void *); + +static ssize_t proc_main_control_write(struct file *, const char __user *, size_t, loff_t *); +static int proc_main_control_open(struct inode *, struct file *); +static int proc_main_control_close(struct inode *, struct file *); + +static int proc_list_open(struct inode *, struct file *); + +static void *proc_list_start(struct seq_file *, loff_t *); +static void proc_list_stop(struct seq_file *, void *); +static void *proc_list_next(struct seq_file *, void *, loff_t *); +static int proc_list_show(struct seq_file *, void *); + +static int proc_blist_open(struct inode *, struct file *); +static int proc_blist_close(struct inode *, struct file *); +static ssize_t proc_blist_read(struct file *, char __user *, size_t, loff_t *); + +static int proc_main_list_open(struct inode *, struct file *); + +static void *proc_main_list_start(struct seq_file *, loff_t *); +static void proc_main_list_stop(struct seq_file *, void *); +static void *proc_main_list_next(struct seq_file *, void *, loff_t *); +static int proc_main_list_show(struct seq_file *, void *); + +static void table_push(struct mediaproxy_table *); +static struct mediaproxy_target *get_target(struct mediaproxy_table *, u_int16_t); + + +static const struct file_operations proc_control_ops = { + .write = proc_control_write, + .open = proc_control_open, + .release = proc_control_close, +}; + +static const struct file_operations proc_main_control_ops = { + .write = proc_main_control_write, + .open = proc_main_control_open, + .release = proc_main_control_close, +}; + +static const struct file_operations proc_list_ops = { + .open = proc_list_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, +}; + +static const struct file_operations proc_blist_ops = { + .open = proc_blist_open, + .read = proc_blist_read, + .release = proc_blist_close, +}; + +static struct seq_operations proc_list_seq_ops = { + .start = proc_list_start, + .next = proc_list_next, + .stop = proc_list_stop, + .show = proc_list_show, +}; + +static const struct file_operations proc_main_list_ops = { + .open = proc_main_list_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, +}; + +static struct seq_operations proc_main_list_seq_ops = { + .start = proc_main_list_start, + .next = proc_main_list_next, + .stop = proc_main_list_stop, + .show = proc_main_list_show, +}; + + + + + +static struct mediaproxy_table *new_table(void) { + struct mediaproxy_table *t; + + DBG(KERN_DEBUG "Creating new table\n"); + + if (!try_module_get(THIS_MODULE)) + return NULL; + + t = kmalloc(sizeof(*t), GFP_KERNEL); + if (!t) { + module_put(THIS_MODULE); + return NULL; + } + + memset(t, 0, sizeof(*t)); + + atomic_set(&t->refcnt, 1); + rwlock_init(&t->target_lock); + t->id = -1; + + return t; +} + + + + +static void table_hold(struct mediaproxy_table *t) { + atomic_inc(&t->refcnt); +} + + + + + +static int table_create_proc(struct mediaproxy_table *t, u_int32_t id) { + char num[10]; + + sprintf(num, "%u", id); + + t->proc = create_proc_entry(num, S_IFDIR | S_IRUGO | S_IXUGO, my_proc_root); + if (!t->proc) + return -1; + /* t->proc->owner = THIS_MODULE; */ + + t->status = create_proc_entry("status", S_IFREG | S_IRUGO, t->proc); + if (!t->status) + return -1; + /* t->status->owner = THIS_MODULE; */ + t->status->read_proc = proc_status; + t->status->data = (void *) (unsigned long) id; + + t->control = create_proc_entry("control", S_IFREG | S_IWUSR | S_IWGRP, t->proc); + if (!t->control) + return -1; + /* t->control->owner = THIS_MODULE; */ + t->control->proc_fops = &proc_control_ops; + t->control->data = (void *) (unsigned long) id; + + t->list = create_proc_entry("list", S_IFREG | S_IRUGO, t->proc); + if (!t->list) + return -1; + /* t->list->owner = THIS_MODULE; */ + t->list->proc_fops = &proc_list_ops; + t->list->data = (void *) (unsigned long) id; + + t->blist = create_proc_entry("blist", S_IFREG | S_IRUGO, t->proc); + if (!t->blist) + return -1; + /* t->blist->owner = THIS_MODULE; */ + t->blist->proc_fops = &proc_blist_ops; + t->blist->data = (void *) (unsigned long) id; + + return 0; +} + + + + +static struct mediaproxy_table *new_table_link(u_int32_t id) { + struct mediaproxy_table *t; + unsigned long flags; + + if (id >= MAX_ID) + return NULL; + + t = new_table(); + if (!t) { + printk(KERN_WARNING "ipt_MEDIAPROXY out of memory\n"); + return NULL; + } + + if (table_create_proc(t, id)) { + printk(KERN_WARNING "ipt_MEDIAPROXY failed to create /proc entry for ID %u\n", id); + table_push(t); + return NULL; + } + + write_lock_irqsave(&table_lock, flags); + if (table[id]) { + write_unlock_irqrestore(&table_lock, flags); + table_push(t); + printk(KERN_WARNING "ipt_MEDIAPROXY duplicate ID %u\n", id); + return NULL; + } + + table_hold(t); + table[id] = t; + t->id = id; + write_unlock_irqrestore(&table_lock, flags); + + return t; +} + + + + + +static void target_push(struct mediaproxy_target *t) { + if (!t) + return; + + if (!atomic_dec_and_test(&t->refcnt)) + return; + + DBG(KERN_DEBUG "Freeing target\n"); + + kfree(t); +} + + + + + + +static void target_hold(struct mediaproxy_target *t) { + atomic_inc(&t->refcnt); +} + + + + + + +static void clear_proc(struct proc_dir_entry **e) { + if (!e || !*e) + return; + + remove_proc_entry((*e)->name, (*e)->parent); + *e = NULL; +} + + + + + +static void table_push(struct mediaproxy_table *t) { + int i, j; + + if (!t) + return; + + if (!atomic_dec_and_test(&t->refcnt)) + return; + + DBG(KERN_DEBUG "Freeing table\n"); + + for (i = 0; i < 256; i++) { + if (!t->target[i]) + continue; + + for (j = 0; j < 256; j++) { + if (!t->target[i][j]) + continue; + t->target[i][j]->table = -1; + target_push(t->target[i][j]); + t->target[i][j] = NULL; + } + + kfree(t->target[i]); + t->target[i] = NULL; + } + + clear_proc(&t->status); + clear_proc(&t->control); + clear_proc(&t->list); + clear_proc(&t->blist); + clear_proc(&t->proc); + + kfree(t); + + module_put(THIS_MODULE); +} + + + + +static int unlink_table(struct mediaproxy_table *t) { + unsigned long flags; + + if (t->id >= MAX_ID) + return -EINVAL; + + DBG(KERN_DEBUG "Unlinking table %u\n", t->id); + + write_lock_irqsave(&table_lock, flags); + if (t->id >= MAX_ID || table[t->id] != t) { + write_unlock_irqrestore(&table_lock, flags); + return -EINVAL; + } + if (t->pid) { + write_unlock_irqrestore(&table_lock, flags); + return -EBUSY; + } + table[t->id] = NULL; + t->id = -1; + write_unlock_irqrestore(&table_lock, flags); + + clear_proc(&t->status); + clear_proc(&t->control); + clear_proc(&t->list); + clear_proc(&t->blist); + clear_proc(&t->proc); + + table_push(t); + + return 0; +} + + + + +static struct mediaproxy_table *get_table(u_int32_t id) { + struct mediaproxy_table *t; + unsigned long flags; + + if (id >= MAX_ID) + return NULL; + + read_lock_irqsave(&table_lock, flags); + t = table[id]; + if (t) + table_hold(t); + read_unlock_irqrestore(&table_lock, flags); + + return t; +} + + + + +static int proc_status(char *page, char **start, off_t off, int count, int *eof, void *data) { + struct mediaproxy_table *t; + int len = 0; + unsigned long flags; + + u_int32_t id = (u_int32_t) (unsigned long) data; + t = get_table(id); + if (!t) + return -ENOENT; + + read_lock_irqsave(&t->target_lock, flags); + len += sprintf(page + len, "Refcount: %u\n", atomic_read(&t->refcnt) - 1); + len += sprintf(page + len, "Control PID: %u\n", t->pid); + len += sprintf(page + len, "Targets: %u\n", t->targets); + len += sprintf(page + len, "Buckets: %u\n", t->buckets); + read_unlock_irqrestore(&t->target_lock, flags); + + table_push(t); + + return len; +} + + + + +static int proc_main_list_open(struct inode *i, struct file *f) { + return seq_open(f, &proc_main_list_seq_ops); +} + + + + + +static void *proc_main_list_start(struct seq_file *f, loff_t *o) { + if (!try_module_get(THIS_MODULE)) + return NULL; + return proc_main_list_next(f, NULL, o); +} + +static void proc_main_list_stop(struct seq_file *f, void *v) { + module_put(THIS_MODULE); +} + +static void *proc_main_list_next(struct seq_file *f, void *v, loff_t *o) { /* v is invalid */ + struct mediaproxy_table *t = NULL; + u_int32_t id; + + if (*o < 0) + return NULL; + id = *o; + + while (id++ < MAX_ID) { + t = get_table(id); + if (!t) + continue; + break; + } + + *o = id; + + return t; /* might be NULL */ +} + +static int proc_main_list_show(struct seq_file *f, void *v) { + struct mediaproxy_table *g = v; + + seq_printf(f, "table %u present\n", g->id); + table_push(g); + + return 0; +} + + + + + +static int proc_blist_open(struct inode *i, struct file *f) { + struct proc_dir_entry *pde; + u_int32_t id; + struct mediaproxy_table *t; + + pde = PDE(i); + id = (u_int32_t) (unsigned long) pde->data; + t = get_table(id); + if (!t) + return -ENOENT; + + table_push(t); + + return 0; +} + +static int proc_blist_close(struct inode *i, struct file *f) { + struct proc_dir_entry *pde; + u_int32_t id; + struct mediaproxy_table *t; + + pde = PDE(i); + id = (u_int32_t) (unsigned long) pde->data; + t = get_table(id); + if (!t) + return 0; + + table_push(t); + + return 0; +} + +static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t *o) { + struct inode *inode; + struct proc_dir_entry *pde; + u_int32_t id; + struct mediaproxy_table *t; + struct mediaproxy_list_entry op; + int err; + struct mediaproxy_target *g; + unsigned long flags; + + if (l != sizeof(op)) + return -EINVAL; + if (*o < 0) + return -EINVAL; + + inode = f->f_path.dentry->d_inode; + pde = PDE(inode); + id = (u_int32_t) (unsigned long) pde->data; + t = get_table(id); + if (!t) + return -ENOENT; + + for (;;) { + err = 0; + if (*o > 0xffff) + goto err; + + g = get_target(t, (*o)++); + if (g) + break; + } + + memset(&op, 0, sizeof(op)); + spin_lock_irqsave(&g->lock, flags); + memcpy(&op.target, &g->target, sizeof(op.target)); + memcpy(&op.stats, &g->stats, sizeof(op.stats)); + spin_unlock_irqrestore(&g->lock, flags); + + target_push(g); + + err = -EFAULT; + if (copy_to_user(b, &op, sizeof(op))) + goto err; + + table_push(t); + return l; + +err: + table_push(t); + return err; +} + + + + + +static int proc_list_open(struct inode *i, struct file *f) { + int err; + struct seq_file *p; + struct proc_dir_entry *pde; + u_int32_t id; + struct mediaproxy_table *t; + + pde = PDE(i); + id = (u_int32_t) (unsigned long) pde->data; + t = get_table(id); + if (!t) + return -ENOENT; + table_push(t); + + err = seq_open(f, &proc_list_seq_ops); + if (err) + return err; + + p = f->private_data; + p->private = (void *) (unsigned long) id; + + return 0; +} + + + + +static void *proc_list_start(struct seq_file *f, loff_t *o) { + return proc_list_next(f, NULL, o); +} + +static void proc_list_stop(struct seq_file *f, void *v) { +} + +static void *proc_list_next(struct seq_file *f, void *v, loff_t *o) { /* v is invalid */ + u_int32_t id = (u_int32_t) (unsigned long) f->private; + struct mediaproxy_target *g = NULL; + struct mediaproxy_table *t; + u_int16_t port; + unsigned char hi, lo; + unsigned long flags; + + if (*o < 0 || *o > 0xffff) + return NULL; + port = (u_int16_t) *o; + + t = get_table(id); + if (!t) + return NULL; + + hi = (port & 0xff00) >> 8; + lo = port & 0xff; + + read_lock_irqsave(&t->target_lock, flags); + for (;;) { + lo++; /* will make the iteration start from 1 */ + if (lo == 0) { + hi++; + if (hi == 0) + break; + } + if (!t->target[hi]) { + lo = 0xff; + continue; + } + + g = t->target[hi][lo]; + if (!g) + continue; + + target_hold(g); + break; + } + read_unlock_irqrestore(&t->target_lock, flags); + + *o = (hi << 8) | lo; + table_push(t); + + return g; +} + +static int proc_list_show(struct seq_file *f, void *v) { + struct mediaproxy_target *g = v; + unsigned long flags; + + spin_lock_irqsave(&g->lock, flags); + seq_printf(f, "port %5u: %20llu bytes, %20llu packets, %20llu errors\n", + g->target.target_port, g->stats.bytes, g->stats.packets, g->stats.errors); + spin_unlock_irqrestore(&g->lock, flags); + + target_push(g); + + return 0; +} + + + + + +static int table_del_target(struct mediaproxy_table *t, u_int16_t port) { + unsigned char hi, lo; + struct mediaproxy_target *g; + unsigned long flags; + + if (!port) + return -EINVAL; + + hi = (port & 0xff00) >> 8; + lo = port & 0xff; + + write_lock_irqsave(&t->target_lock, flags); + g = t->target[hi] ? t->target[hi][lo] : NULL; + if (g) { + t->target[hi][lo] = NULL; + t->targets--; + } + write_unlock_irqrestore(&t->target_lock, flags); + + if (!g) + return -ENOENT; + + target_push(g); + + return 0; +} + + + + + +static int table_new_target(struct mediaproxy_table *t, struct mediaproxy_target_info *i, int update) { + unsigned char hi, lo; + struct mediaproxy_target *g; + struct mediaproxy_target **gp; + struct mediaproxy_target *og = NULL; + int err; + unsigned long flags; + + if (!i->target_port) + return -EINVAL; + if (!i->src_ip) + return -EINVAL; + if (!i->dst_ip) + return -EINVAL; + if (!i->src_port) + return -EINVAL; + if (!i->dst_port) + return -EINVAL; + + DBG(KERN_DEBUG "Creating new target\n"); + + err = -ENOMEM; + g = kmalloc(sizeof(*g), GFP_KERNEL); + if (!g) + goto fail1; + memset(g, 0, sizeof(*g)); + g->table = t->id; + atomic_set(&g->refcnt, 1); + spin_lock_init(&g->lock); + memcpy(&g->target, i, sizeof(*i)); + + if (update) + gp = NULL; + else { + gp = kmalloc(sizeof(void *) * 256, GFP_KERNEL); + if (!gp) + goto fail2; + memset(gp, 0, sizeof(void *) * 256); + } + + hi = (i->target_port & 0xff00) >> 8; + lo = i->target_port & 0xff; + + write_lock_irqsave(&t->target_lock, flags); + if (!t->target[hi]) { + err = -ENOENT; + if (update) + goto fail4; + t->target[hi] = gp; + gp = NULL; + t->buckets++; + } + if (update) { + err = -ENOENT; + og = t->target[hi][lo]; + if (!og) + goto fail4; + + spin_lock(&og->lock); /* nested lock! irqs are disabled already */ + memcpy(&g->stats, &og->stats, sizeof(g->stats)); + spin_unlock(&og->lock); + } + else { + err = -EEXIST; + if (t->target[hi][lo]) + goto fail4; + } + + t->target[hi][lo] = g; + g = NULL; + if (!update) + t->targets++; + write_unlock_irqrestore(&t->target_lock, flags); + + if (gp) + kfree(gp); + if (og) + target_push(og); + + return 0; + +fail4: + write_unlock_irqrestore(&t->target_lock, flags); + if (gp) + kfree(gp); +fail2: + kfree(g); +fail1: + return err; +} + + + + + +static struct mediaproxy_target *get_target(struct mediaproxy_table *t, u_int16_t port) { + unsigned char hi, lo; + struct mediaproxy_target *r; + unsigned long flags; + + if (!t) + return NULL; + if (!port) + return NULL; + + hi = (port & 0xff00) >> 8; + lo = port & 0xff; + + read_lock_irqsave(&t->target_lock, flags); + r = t->target[hi] ? t->target[hi][lo] : NULL; + if (r) + target_hold(r); + read_unlock_irqrestore(&t->target_lock, flags); + + return r; +} + + + + + +static int proc_main_control_open(struct inode *inode, struct file *file) { + if (!try_module_get(THIS_MODULE)) + return -ENXIO; + return 0; +} + +static int proc_main_control_close(struct inode *inode, struct file *file) { + module_put(THIS_MODULE); + return 0; +} + +static ssize_t proc_main_control_write(struct file *file, const char __user *buf, size_t buflen, loff_t *off) { + char b[30]; + unsigned long id; + char *endp; + struct mediaproxy_table *t; + int err; + + if (buflen < 6 || buflen > 20) + return -EINVAL; + + if (copy_from_user(&b, buf, buflen)) + return -EFAULT; + + if (!strncmp(b, "add ", 4)) { + id = simple_strtoul(b + 4, &endp, 10); + if (endp == b + 4) + return -EINVAL; + if (id >= MAX_ID) + return -EINVAL; + t = new_table_link((u_int32_t) id); + if (!t) + return -EEXIST; + table_push(t); + t = NULL; + } + else if (!strncmp(b, "del ", 4)) { + id = simple_strtoul(b + 4, &endp, 10); + if (endp == b + 4) + return -EINVAL; + if (id >= MAX_ID) + return -EINVAL; + t = get_table((u_int32_t) id); + if (!t) + return -ENOENT; + err = unlink_table(t); + table_push(t); + t = NULL; + if (err) + return err; + } + else + return -EINVAL; + + return buflen; +} + + + + + +static int proc_control_open(struct inode *inode, struct file *file) { + struct proc_dir_entry *pde; + u_int32_t id; + struct mediaproxy_table *t; + unsigned long flags; + + pde = PDE(inode); + id = (u_int32_t) (unsigned long) pde->data; + t = get_table(id); + if (!t) + return -ENOENT; + + write_lock_irqsave(&table_lock, flags); + if (t->pid) { + write_unlock_irqrestore(&table_lock, flags); + table_push(t); + return -EBUSY; + } + t->pid = current->tgid; + write_unlock_irqrestore(&table_lock, flags); + + table_push(t); + return 0; +} + +static int proc_control_close(struct inode *inode, struct file *file) { + struct proc_dir_entry *pde; + u_int32_t id; + struct mediaproxy_table *t; + unsigned long flags; + + pde = PDE(inode); + id = (u_int32_t) (unsigned long) pde->data; + t = get_table(id); + if (!t) + return 0; + + write_lock_irqsave(&table_lock, flags); + t->pid = 0; + write_unlock_irqrestore(&table_lock, flags); + + table_push(t); + + return 0; +} + +static ssize_t proc_control_write(struct file *file, const char __user *buf, size_t buflen, loff_t *off) { + struct inode *inode; + struct proc_dir_entry *pde; + u_int32_t id; + struct mediaproxy_table *t; + struct mediaproxy_message msg; + int err; + + if (buflen != sizeof(msg)) + return -EINVAL; + + inode = file->f_path.dentry->d_inode; + pde = PDE(inode); + id = (u_int32_t) (unsigned long) pde->data; + t = get_table(id); + if (!t) + return -ENOENT; + + err = -EFAULT; + if (copy_from_user(&msg, buf, sizeof(msg))) + goto err; + + switch (msg.cmd) { + case MMG_NOOP: + DBG(KERN_DEBUG "noop.\n"); + break; + + case MMG_ADD: + err = table_new_target(t, &msg.target, 0); + if (err) + goto err; + break; + + case MMG_DEL: + err = table_del_target(t, msg.target.target_port); + if (err) + goto err; + break; + + case MMG_UPDATE: + err = table_new_target(t, &msg.target, 1); + if (err) + goto err; + break; + + default: + printk(KERN_WARNING "ipt_MEDIAPROXY unimplemented op %u\n", msg.cmd); + err = -EINVAL; + goto err; + } + + table_push(t); + + return buflen; + +err: + table_push(t); + return err; +} + + + + + +static int send_proxy_packet(struct sk_buff *skb, u_int32_t sip, u_int16_t sport, u_int32_t dip, u_int16_t dport, unsigned char tos) { + long sum; + struct iphdr *ih; + struct udphdr *uh; + + ih = ip_hdr(skb); + uh = udp_hdr(skb); + + sum = uh->check; + if (sum) { + sum += ((u_int16_t *) &ih->saddr)[0]; + sum += ((u_int16_t *) &ih->saddr)[1]; + sum += ((u_int16_t *) &ih->daddr)[0]; + sum += ((u_int16_t *) &ih->daddr)[1]; + sum += uh->source; + sum += uh->dest; + } + + ih->saddr = sip; + ih->daddr = dip; + ih->tos = tos; + uh->source = htons(sport); + uh->dest = htons(dport); + + if (sum) { + sum -= ((u_int16_t *) &ih->saddr)[0]; + sum -= ((u_int16_t *) &ih->saddr)[1]; + sum -= ((u_int16_t *) &ih->daddr)[0]; + sum -= ((u_int16_t *) &ih->daddr)[1]; + sum -= uh->source; + sum -= uh->dest; + + if (sum < 0) { + sum = -sum; + sum = (sum >> 16) + (sum & 0xffff); + sum += sum >> 16; + uh->check = ~sum; + } + else { + sum = (sum >> 16) + (sum & 0xffff); + sum += sum >> 16; + uh->check = sum; + } + } + + __ip_select_ident(ih, skb_dst(skb), 0); + + if (ip_route_me_harder(skb, RTN_LOCAL)) + goto drop; + + __ip_select_ident(ih, skb_dst(skb), 0); + + ip_local_out(skb); + + return 0; + +drop: + kfree_skb(skb); + return -1; +} + + + + + + +static unsigned int mediaproxy(struct sk_buff *oskb, const struct xt_target_param *par) { + const struct ipt_mediaproxy_info *pinfo = par->targinfo; + struct sk_buff *skb; + struct sk_buff *skb2; + struct iphdr *ih; + struct udphdr *uh; + struct mediaproxy_table *t; + struct mediaproxy_target *g; + int err; + unsigned long flags; + + t = get_table(pinfo->id); + if (!t) + goto skip; + + skb = skb_copy(oskb, GFP_ATOMIC); + if (!skb) + goto skip3; + if (skb_dst(skb)) + dst_release(skb_dst(skb)); + skb_dst_set(skb, dst_clone(skb_dst(oskb))); + + skb_reset_network_header(skb); + ih = ip_hdr(skb); + if (ih->protocol != IPPROTO_UDP) + goto skip2; + + skb_set_transport_header(skb, (ih->ihl << 2)); + uh = udp_hdr(skb); + + g = get_target(t, ntohs(uh->dest)); + if (!g) + goto skip2; + + DBG(KERN_DEBUG "target found, src %08x -> dst %08x\n", g->target.src_ip, g->target.dst_ip); + + if (g->target.mirror_ip && g->target.mirror_port) { + DBG(KERN_DEBUG "sending mirror packet to dst %08x\n", g->target.mirror_ip); + skb2 = skb_copy(skb, GFP_ATOMIC); + if (skb_dst(skb2)) + dst_release(skb_dst(skb2)); + skb_dst_set(skb2, dst_clone(skb_dst(oskb))); + err = send_proxy_packet(skb2, g->target.src_ip, g->target.src_port, g->target.mirror_ip, g->target.mirror_port, g->target.tos); + if (err) { + spin_lock_irqsave(&g->lock, flags); + g->stats.errors++; + spin_unlock_irqrestore(&g->lock, flags); + } + } + + err = send_proxy_packet(skb, g->target.src_ip, g->target.src_port, g->target.dst_ip, g->target.dst_port, g->target.tos); + + spin_lock_irqsave(&g->lock, flags); + if (err) + g->stats.errors++; + else { + g->stats.packets++; + g->stats.bytes += skb->len; + } + spin_unlock_irqrestore(&g->lock, flags); + + target_push(g); + table_push(t); + + return NF_DROP; + +skip2: + kfree_skb(skb); +skip3: + table_push(t); +skip: + return XT_CONTINUE; +} + + + + + +static bool check(const struct xt_tgchk_param *par) { + const struct ipt_mediaproxy_info *pinfo = par->targinfo; + + if (!my_proc_root) { + printk(KERN_WARNING "ipt_MEDIAPROXY check() without proc_root\n"); + return 0; + } + if (pinfo->id >= MAX_ID) { + printk(KERN_WARNING "ipt_MEDIAPROXY ID too high (%u >= %u)\n", pinfo->id, MAX_ID); + return 0; + } + + return 1; +} + + + + +static struct xt_target ipt_mediaproxy_reg = { + .name = "MEDIAPROXY", + .family = NFPROTO_IPV4, + .target = mediaproxy, + .targetsize = sizeof(struct ipt_mediaproxy_info), + .table = "filter", + .hooks = (1 << NF_INET_LOCAL_IN), + .checkentry = check, + .me = THIS_MODULE, +}; + +static int __init init(void) { + int ret; + + printk(KERN_NOTICE "Registering ipt_MEDIAPROXY module\n"); + + rwlock_init(&table_lock); + + ret = -ENOMEM; + my_proc_root = proc_mkdir("mediaproxy", NULL); + if (!my_proc_root) + goto fail; + /* my_proc_root->owner = THIS_MODULE; */ + + proc_control = create_proc_entry("control", S_IFREG | S_IWUSR | S_IWGRP, my_proc_root); + if (!proc_control) + goto fail; + /* proc_control->owner = THIS_MODULE; */ + proc_control->proc_fops = &proc_main_control_ops; + + proc_list = create_proc_entry("list", S_IFREG | S_IRUGO, my_proc_root); + if (!proc_list) + goto fail; + /* proc_list->owner = THIS_MODULE; */ + proc_list->proc_fops = &proc_main_list_ops; + + ret = xt_register_target(&ipt_mediaproxy_reg); + if (ret) + goto fail; + + return 0; + +fail: + clear_proc(&proc_control); + clear_proc(&proc_list); + clear_proc(&my_proc_root); + + return ret; +} + +static void __exit fini(void) { + printk(KERN_NOTICE "Unregistering ipt_MEDIAPROXY module\n"); + xt_unregister_target(&ipt_mediaproxy_reg); + + clear_proc(&proc_control); + clear_proc(&proc_list); + clear_proc(&my_proc_root); +} + +module_init(init); +module_exit(fini); diff --git a/kernel-module/ipt_MEDIAPROXY.h b/kernel-module/ipt_MEDIAPROXY.h new file mode 100644 index 0000000..04542cf --- /dev/null +++ b/kernel-module/ipt_MEDIAPROXY.h @@ -0,0 +1,75 @@ +#ifndef IPT_RTPPROXY_H +#define IPT_RTPPROXY_H + +struct ipt_mediaproxy_info { + u_int32_t id; +}; + +struct mediaproxy_stats { + u_int64_t packets; + u_int64_t bytes; + u_int64_t errors; +}; + +struct mediaproxy_target_info { + u_int16_t target_port; + + u_int32_t src_ip; + u_int32_t dst_ip; + u_int16_t src_port; + u_int16_t dst_port; + + u_int32_t mirror_ip; + u_int16_t mirror_port; + + unsigned char tos; +}; + +struct mediaproxy_message { + enum { + MMG_NOOP = 1, + MMG_ADD, + MMG_DEL, + MMG_UPDATE, + } cmd; + + struct mediaproxy_target_info target; +}; + +struct mediaproxy_list_entry { + struct mediaproxy_target_info target; + struct mediaproxy_stats stats; +}; + +#ifdef __KERNEL__ + +struct mediaproxy_target { + atomic_t refcnt; + u_int32_t table; + struct mediaproxy_target_info target; + + spinlock_t lock; + struct mediaproxy_stats stats; +}; + +struct mediaproxy_table { + atomic_t refcnt; + rwlock_t target_lock; + pid_t pid; + + u_int32_t id; + struct proc_dir_entry *proc; + struct proc_dir_entry *status; + struct proc_dir_entry *control; + struct proc_dir_entry *list; + struct proc_dir_entry *blist; + + struct mediaproxy_target **target[256]; + + unsigned int buckets; + unsigned int targets; +}; + +#endif + +#endif diff --git a/tests/blist.pl b/tests/blist.pl new file mode 100755 index 0000000..3678de1 --- /dev/null +++ b/tests/blist.pl @@ -0,0 +1,20 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use Socket; + +my $t = $ARGV[0] || "0"; + +open(X, "<", "/proc/mediaproxy/$t/blist") or die; +my $buf; +while (sysread(X, $buf, 48)) { + my @b = unpack("Sa2 a4a4 SS a4 Sa2 LLLLLL", $buf); + for (2,3,6) { + $b[$_] = inet_ntoa($b[$_]); + } + for (9,11,13) { + $b[$_] += $b[$_ + 1] * 2**32; + } + printf("%5u %15s:%-5u -> %15s:%-5u (-> %15s:%-5u) [%llu %llu %llu]\n", @b[0,2,4,3,5,6,7,9,11,13]); +} diff --git a/tests/test.pl b/tests/test.pl new file mode 100755 index 0000000..032324b --- /dev/null +++ b/tests/test.pl @@ -0,0 +1,54 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use Socket; + +$| = 1; + +open(F, "> /proc/mediaproxy/1/control"); +{ + my $x = select(F); + $| = 1; + select($x); +} +#print F (pack("I SS LLSS LS S", 0, 0, -1, 0, 0, 0, 0, 0, 0, -1)); +#sleep(10); + +print("add 9876 -> 1234/6543\n"); +syswrite(F, pack("I SS a4a4 SS a4 S S", 1, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, "", 0, -1)); +sleep(30); + +print("add fail\n"); +syswrite(F, pack("I SS a4a4 SS a4 S S", 1, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, inet_aton("192.168.231.1"), 6789, -1)); +sleep(30); + +print("update 9876 -> 1234/6543 & 6789\n"); +syswrite(F, pack("I SS a4a4 SS a4 S S", 3, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, inet_aton("192.168.231.1"), 6789, -1)); +sleep(30); + +print("update 9876 -> 2345/7890 & 4321\n"); +syswrite(F, pack("I SS a4a4 SS a4 S S", 3, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 2345, 7890, inet_aton("192.168.231.1"), 4321, -1)); +sleep(30); + +print("add fail\n"); +syswrite(F, pack("I SS a4a4 SS a4 S S", 1, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, inet_aton("192.168.231.1"), 6789, -1)); +sleep(30); + +print("update 9876 -> 1234/6543\n"); +syswrite(F, pack("I SS a4a4 SS a4 S S", 3, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, "", 0, -1)); +sleep(30); + +print("delete\n"); +syswrite(F, pack("I SS a4a4 SS a4 S S", 2, 9876, -1, "", "", 0, 0, "", 0, -1)); +sleep(30); + +print("delete fail\n"); +syswrite(F, pack("I SS a4a4 SS a4 S S", 2, 9876, -1, "", "", 0, 0, "", 0, -1)); +sleep(30); + +print("update fail\n"); +syswrite(F, pack("I SS a4a4 SS a4 S S", 3, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, "", 0, -1)); +sleep(30); + +close(F);