create tags and branches

remotes/origin/2.0
Richard Fuchs 14 years ago
commit 593121d551

@ -0,0 +1,40 @@
CC= gcc
CFLAGS= -g -Wall `pkg-config --cflags glib-2.0` `pcre-config --cflags` -I/lib/modules/`uname -r`/build/include/ -I../kernel-module/
#CFLAGS+= -O2
LDFLAGS= `pkg-config --libs glib-2.0` `pcre-config --libs`
SRCS= main.c kernel.c poller.c aux.c control.c streambuf.c call.c
OBJS= $(SRCS:.c=.o)
.PHONY: all dep clean tests
all:
make mediaproxy-ng
tests:
make aux-test poller-test
dep: .depend
clean:
rm -f $(OBJS) mediaproxy-ng aux-test poller-test aux-test.o poller-test.o .depend build_time.h core
.depend: $(SRCS) Makefile build_time.h
$(CC) $(CFLAGS) -M $(SRCS) | sed -e 's/:/ .depend:/' > .depend
build_time.h: $(SRCS) Makefile
date +"#define BUILD_TIME \"%Y-%m-%d %H:%M:%S\"" > build_time.h
mediaproxy-ng: $(OBJS) .depend build_time.h
$(CC) $(CFLAGS) -o $@ $(OBJS) $(LDFLAGS)
aux-test: aux.o aux-test.o .depend build_time.h
$(CC) $(CFLAGS) -o $@ aux-test.o aux.o $(LDFLAGS)
poller-test: poller.o poller-test.o aux.o .depend build_time.h
$(CC) $(CFLAGS) -o $@ poller-test.o poller.o aux.o $(LDFLAGS)
include .depend

@ -0,0 +1,305 @@
#include <stdio.h>
#include "aux.h"
int test[32] = {
0x10, 0x20, 0x30, 0x40, 0x50, 0x60, 0x70, 0x80,
0x90, 0xa0, 0xb0, 0xc0, 0xd0, 0xe0, 0xf0, 0xff,
0x10ff, 0x20ff, 0x30ff, 0x40ff, 0x50ff, 0x60ff, 0x70ff, 0x80ff,
0x90ff, 0xa0ff, 0xb0ff, 0xc0ff, 0xd0ff, 0xe0ff, 0xf0ff, 0xffff,
};
void exsrx(int x, int exp, unsigned int s, int ex) {
int ret = mybsearch(test, s, sizeof(int), &x, 0, sizeof(x), ex);
if (ret != exp)
fprintf(stderr, "TEST FAILED! params=%u %i; result=%i, expected=%i\n", s, ex, ret, exp);
}
void exsr1(int x, int exp) {
exsrx(x, exp, 16, 1);
}
void exsr2(int x, int exp) {
exsrx(x, exp, 15, 1);
}
void exsr3(int x, int exp) {
exsrx(x, exp, 2, 1);
}
void exsr4(int x, int exp) {
exsrx(x, exp, 1, 1);
}
void exsr5(int x, int exp) {
exsrx(x, exp, 32, 1);
}
void exsr6(int x, int exp) {
exsrx(x, exp, 31, 1);
}
void exsr7(int x, int exp) {
exsrx(x, exp, 16, 0);
}
void exsr8(int x, int exp) {
exsrx(x, exp, 15, 0);
}
void exsr9(int x, int exp) {
exsrx(x, exp, 2, 0);
}
void exsr10(int x, int exp) {
exsrx(x, exp, 1, 0);
}
void exsr11(int x, int exp) {
exsrx(x, exp, 32, 0);
}
void exsr12(int x, int exp) {
exsrx(x, exp, 31, 0);
}
int main() {
exsr1(0x10, 0);
exsr1(0x20, 1);
exsr1(0x30, 2);
exsr1(0x40, 3);
exsr1(0x50, 4);
exsr1(0x60, 5);
exsr1(0x70, 6);
exsr1(0x80, 7);
exsr1(0x90, 8);
exsr1(0xa0, 9);
exsr1(0xb0, 10);
exsr1(0xc0, 11);
exsr1(0xd0, 12);
exsr1(0xe0, 13);
exsr1(0xf0, 14);
exsr1(0xff, 15);
exsr1(0xffff, -1);
exsr1(0xfe, -1);
exsr2(0x10, 0);
exsr2(0x20, 1);
exsr2(0x30, 2);
exsr2(0x40, 3);
exsr2(0x50, 4);
exsr2(0x60, 5);
exsr2(0x70, 6);
exsr2(0x80, 7);
exsr2(0x90, 8);
exsr2(0xa0, 9);
exsr2(0xb0, 10);
exsr2(0xc0, 11);
exsr2(0xd0, 12);
exsr2(0xe0, 13);
exsr2(0xf0, 14);
exsr2(0xff, -1);
exsr3(0x10, 0);
exsr3(0x20, 1);
exsr3(0x30, -1);
exsr4(0x10, 0);
exsr4(0x20, -1);
exsr5(0x10, 0);
exsr5(0x20, 1);
exsr5(0x30, 2);
exsr5(0x40, 3);
exsr5(0x50, 4);
exsr5(0x60, 5);
exsr5(0x70, 6);
exsr5(0x80, 7);
exsr5(0x90, 8);
exsr5(0xa0, 9);
exsr5(0xb0, 10);
exsr5(0xc0, 11);
exsr5(0xd0, 12);
exsr5(0xe0, 13);
exsr5(0xf0, 14);
exsr5(0xff, 15);
exsr5(0x10ff, 16);
exsr5(0x20ff, 17);
exsr5(0x30ff, 18);
exsr5(0x40ff, 19);
exsr5(0x50ff, 20);
exsr5(0x60ff, 21);
exsr5(0x70ff, 22);
exsr5(0x80ff, 23);
exsr5(0x90ff, 24);
exsr5(0xa0ff, 25);
exsr5(0xb0ff, 26);
exsr5(0xc0ff, 27);
exsr5(0xd0ff, 28);
exsr5(0xe0ff, 29);
exsr5(0xf0ff, 30);
exsr5(0xffff, 31);
exsr5(0xfff3, -1);
exsr5(0xffffff, -1);
exsr6(0x10, 0);
exsr6(0x20, 1);
exsr6(0x30, 2);
exsr6(0x40, 3);
exsr6(0x50, 4);
exsr6(0x60, 5);
exsr6(0x70, 6);
exsr6(0x80, 7);
exsr6(0x90, 8);
exsr6(0xa0, 9);
exsr6(0xb0, 10);
exsr6(0xc0, 11);
exsr6(0xd0, 12);
exsr6(0xe0, 13);
exsr6(0xf0, 14);
exsr6(0xff, 15);
exsr6(0x10ff, 16);
exsr6(0x20ff, 17);
exsr6(0x30ff, 18);
exsr6(0x40ff, 19);
exsr6(0x50ff, 20);
exsr6(0x60ff, 21);
exsr6(0x70ff, 22);
exsr6(0x80ff, 23);
exsr6(0x90ff, 24);
exsr6(0xa0ff, 25);
exsr6(0xb0ff, 26);
exsr6(0xc0ff, 27);
exsr6(0xd0ff, 28);
exsr6(0xe0ff, 29);
exsr6(0xf0ff, 30);
exsr6(0xffff, -1);
exsr7(0x10, 0);
exsr7(0x20, 1);
exsr7(0x30, 2);
exsr7(0x40, 3);
exsr7(0x50, 4);
exsr7(0x60, 5);
exsr7(0x70, 6);
exsr7(0x80, 7);
exsr7(0x90, 8);
exsr7(0xa0, 9);
exsr7(0xb0, 10);
exsr7(0xc0, 11);
exsr7(0xd0, 12);
exsr7(0xe0, 13);
exsr7(0xf0, 14);
exsr7(0xff, 15);
exsr7(0xffff, -17);
exsr7(0xfe, -16);
exsr7(0x00, -1);
exsr8(0x05, -1);
exsr8(0x15, -2);
exsr8(0x10, 0);
exsr8(0x20, 1);
exsr8(0x30, 2);
exsr8(0x40, 3);
exsr8(0x50, 4);
exsr8(0x60, 5);
exsr8(0x70, 6);
exsr8(0x80, 7);
exsr8(0x90, 8);
exsr8(0xa0, 9);
exsr8(0xb0, 10);
exsr8(0xc0, 11);
exsr8(0xd0, 12);
exsr8(0xe0, 13);
exsr8(0xf0, 14);
exsr8(0xff, -16);
exsr8(0xffff, -16);
exsr8(0xef, -15);
exsr8(0x00, -1);
exsr8(0x05, -1);
exsr8(0x15, -2);
exsr9(0x10, 0);
exsr9(0x20, 1);
exsr9(0x30, -3);
exsr10(0x10, 0);
exsr10(0x20, -2);
exsr11(0x10, 0);
exsr11(0x20, 1);
exsr11(0x30, 2);
exsr11(0x40, 3);
exsr11(0x50, 4);
exsr11(0x60, 5);
exsr11(0x70, 6);
exsr11(0x80, 7);
exsr11(0x90, 8);
exsr11(0xa0, 9);
exsr11(0xb0, 10);
exsr11(0xc0, 11);
exsr11(0xd0, 12);
exsr11(0xe0, 13);
exsr11(0xf0, 14);
exsr11(0xff, 15);
exsr11(0x10ff, 16);
exsr11(0x20ff, 17);
exsr11(0x30ff, 18);
exsr11(0x40ff, 19);
exsr11(0x50ff, 20);
exsr11(0x60ff, 21);
exsr11(0x70ff, 22);
exsr11(0x80ff, 23);
exsr11(0x90ff, 24);
exsr11(0xa0ff, 25);
exsr11(0xb0ff, 26);
exsr11(0xc0ff, 27);
exsr11(0xd0ff, 28);
exsr11(0xe0ff, 29);
exsr11(0xf0ff, 30);
exsr11(0xffff, 31);
exsr11(0xfff3, -16);
exsr11(0xffffff, -33);
exsr12(0x10, 0);
exsr12(0x20, 1);
exsr12(0x30, 2);
exsr12(0x40, 3);
exsr12(0x50, 4);
exsr12(0x60, 5);
exsr12(0x70, 6);
exsr12(0x80, 7);
exsr12(0x90, 8);
exsr12(0xa0, 9);
exsr12(0xb0, 10);
exsr12(0xc0, 11);
exsr12(0xd0, 12);
exsr12(0xe0, 13);
exsr12(0xf0, 14);
exsr12(0xff, 15);
exsr12(0x10ff, 16);
exsr12(0x20ff, 17);
exsr12(0x30ff, 18);
exsr12(0x40ff, 19);
exsr12(0x50ff, 20);
exsr12(0x60ff, 21);
exsr12(0x70ff, 22);
exsr12(0x80ff, 23);
exsr12(0x90ff, 24);
exsr12(0xa0ff, 25);
exsr12(0xb0ff, 26);
exsr12(0xc0ff, 27);
exsr12(0xd0ff, 28);
exsr12(0xe0ff, 29);
exsr12(0xf0ff, 30);
exsr12(0xffff, -32);
printf("all done\n");
return 0;
}

@ -0,0 +1,161 @@
#define _GNU_SOURCE
#include <string.h>
#include <stdio.h>
#include <glib.h>
#include <pcre.h>
#include "aux.h"
#if 0
#define BSDB(x...) fprintf(stderr, x)
#else
#define BSDB(x...) ((void)0)
#endif
int mybsearch(void *base, unsigned int len, unsigned int size, void *key, unsigned int key_off, unsigned int key_size, int exact) {
unsigned char *cbase = base;
int pos;
unsigned char *cur;
int res;
unsigned int num;
if (!len) {
BSDB("zero length array\n");
return -1;
}
pos = len / 2;
num = pos;
num += 3;
num /= 2;
pos--;
if (pos < 0)
pos = 0;
BSDB("starting pos=%u, num=%u\n", pos, num);
for (;;) {
cur = cbase + (pos * size);
res = memcmp(cur + key_off, key, key_size);
BSDB("compare=%i\n", res);
if (!res)
return pos;
if (!num) {
BSDB("nothing found\n");
if (exact)
return -1;
if (res > 0) /* cur > key */
return -1 * pos - 1;
return -1 * pos - 2;
}
if (res < 0) { /* cur < key */
pos += num;
if (pos >= len)
pos = len - 1;
}
else {
pos -= num;
if (pos < 0)
pos = 0;
}
BSDB("new pos=%u\n", pos);
if (num == 1)
num = 0;
else {
num++;
num /= 2;
}
BSDB("new num=%u\n", num);
}
}
GList *g_list_link(GList *list, GList *el) {
el->prev = NULL;
el->next = list;
if (list)
list->prev = el;
return el;
}
GQueue *pcre_multi_match(pcre **re, pcre_extra **ree, const char *rex, const char *s, unsigned int num, parse_func f, void *p) {
GQueue *q;
const char *errptr;
int erroff;
unsigned int start, len;
int ovec[60];
int *ov;
char **el;
unsigned int i;
void *ins;
if (!*re) {
*re = pcre_compile(rex, PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL);
if (!*re)
return NULL;
*ree = pcre_study(*re, 0, &errptr);
}
q = g_queue_new();
el = malloc(sizeof(*el) * num);
for (start = 0, len = strlen(s); pcre_exec(*re, *ree, s + start, len - start, 0, 0, ovec, ARRAY_SIZE(ovec)) > 0; start += ovec[1]) {
for (i = 0; i < num; i++) {
ov = ovec + 2 + i*2;
el[i] = (ov[0] == -1) ? NULL : g_strndup(s + start + ov[0], ov[1] - ov[0]);
}
if (!f(el, &ins, p))
g_queue_push_tail(q, ins);
for (i = 0; i < num; i++) {
if (el[i])
free(el[i]);
}
}
free(el);
return q;
}
void strmove(char **d, char **s) {
if (*d)
free(*d);
*d = *s;
*s = strdup("");
}
void strdupfree(char **d, const char *s) {
if (*d)
free(*d);
*d = strdup(s);
}
#if !GLIB_CHECK_VERSION(2,14,0)
void g_string_vprintf(GString *string, const gchar *format, va_list args) {
char *s;
int r;
r = vasprintf(&s, format, args);
if (r < 0)
return;
g_string_assign(string, s);
free(s);
}
#endif

@ -0,0 +1,48 @@
#ifndef __AUX_H__
#define __AUX_H__
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <fcntl.h>
#include <glib.h>
#include <pcre.h>
#include <stdarg.h>
#define OFFSET_OF(t,e) ((unsigned int) (unsigned long) &(((t *) 0)->e))
#define ARRAY_SIZE(x) (sizeof(x) / sizeof(*(x)))
#define ZERO(x) memset(&(x), 0, sizeof(x))
#define IPF "%u.%u.%u.%u"
#define IPP(x) ((unsigned char *) (&(x)))[0], ((unsigned char *) (&(x)))[1], ((unsigned char *) (&(x)))[2], ((unsigned char *) (&(x)))[3]
#define DF IPF ":%u"
#define DP(x) IPP((x).sin_addr.s_addr), ntohs((x).sin_port)
#define NONBLOCK(x) fcntl(x, F_SETFL, O_NONBLOCK)
#define REUSEADDR(x) do { int ONE = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &ONE, sizeof(ONE)); } while (0)
typedef int (*parse_func)(char **, void **, void *);
int mybsearch(void *, unsigned int, unsigned int, void *, unsigned int, unsigned int, int);
GList *g_list_link(GList *, GList *);
GQueue *pcre_multi_match(pcre **, pcre_extra **, const char *, const char *, unsigned int, parse_func, void *);
void strmove(char **, char **);
void strdupfree(char **, const char *);
#if !GLIB_CHECK_VERSION(2,14,0)
void g_string_vprintf(GString *string, const gchar *format, va_list args);
#endif
#endif

@ -0,0 +1,989 @@
#include <stdio.h>
#include <unistd.h>
#include <glib.h>
#include <stdlib.h>
#include <pcre.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include "call.h"
#include "poller.h"
#include "aux.h"
#include "log.h"
#include "kernel.h"
#include "control.h"
#include "streambuf.h"
#if 0
#define DBG(x...) mylog(LOG_DEBUG, x)
#else
#define DBG(x...) ((void)0)
#endif
static pcre *info_re;
static pcre_extra *info_ree;
static pcre *streams_re;
static pcre_extra *streams_ree;
static char *rtp_codecs[] = {
[0] = "G711u",
[1] = "1016",
[2] = "G721",
[3] = "GSM",
[4] = "G723",
[5] = "DVI4",
[6] = "DVI4",
[7] = "LPC",
[8] = "G711a",
[9] = "G722",
[10] = "L16",
[11] = "L16",
[14] = "MPA",
[15] = "G728",
[18] = "G729",
[25] = "CelB",
[26] = "JPEG",
[28] = "nv",
[31] = "H261",
[32] = "MPV",
[33] = "MP2T",
[34] = "H263",
};
static void call_destroy(struct call *);
static void unkernelize(struct peer *);
static void stream_closed(int fd, void *p) {
struct streamrelay *r = p;
struct call *c;
c = r->up->up->call;
mylog(LOG_WARNING, "[%s] Read error on RTP socket", c->callid);
call_destroy(c);
}
static void kernelize(struct callstream *c) {
int i, j;
struct peer *p, *pp;
struct streamrelay *r, *rp;
struct kernel_stream ks;
mylog(LOG_DEBUG, "[%s] Kernelizing RTP streams", c->call->callid);
ZERO(ks);
for (i = 0; i < 2; i++) {
p = &c->peers[i];
pp = &c->peers[i ^ 1];
if (p->kernelized)
continue;
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
rp = &pp->rtps[j];
if (!r->peer.ip || !r->peer.port)
continue;
ZERO(r->kstats);
ks.local_port = r->localport;
ks.src.ip = c->call->callmaster->ip;
ks.src.port = rp->localport;
ks.dest.ip = r->peer.ip;
ks.dest.port = r->peer.port;
ks.tos = c->call->callmaster->tos;
kernel_add_stream(c->call->callmaster->kernelfd, &ks, 0);
}
p->kernelized = 1;
}
}
static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_in *fsin) {
struct streamrelay *p, *p2;
struct peer *pe, *pe2;
struct callstream *cs;
int ret;
struct sockaddr_in sin;
struct msghdr mh;
struct iovec iov;
unsigned char buf[256];
struct cmsghdr *ch;
struct in_pktinfo *pi;
struct call *c;
struct callmaster *m;
unsigned char cc;
pe = r->up;
cs = pe->up;
pe2 = &cs->peers[pe->idx ^ 1];
p = &pe2->rtps[r->idx];
c = cs->call;
m = c->callmaster;
if (p->fd == -1) {
mylog(LOG_WARNING, "[%s] RTP packet discarded from " DF, c->callid, DP(*fsin));
r->stats.errors++;
m->statsps.errors++;
return 0;
}
if (!pe->confirmed && pe->filled && r->idx == 0) {
if (l < 2)
goto skip;
if (!pe->codec) {
cc = b[1];
cc &= 0x7f;
if (cc < ARRAY_SIZE(rtp_codecs))
pe->codec = rtp_codecs[cc] ? : "unknown";
else
pe->codec = "unknown";
}
mylog(LOG_DEBUG, "[%s] Confirmed peer information - " DF, c->callid, DP(*fsin));
p->peer.ip = fsin->sin_addr.s_addr;
p->peer.port = ntohs(fsin->sin_port);
p2 = &p->up->rtps[p->idx ^ 1];
p2->peer.ip = fsin->sin_addr.s_addr;
p2->peer.port = p->peer.port + ((int) (p2->idx * 2) - 1);
pe->confirmed = 1;
if (pe2->confirmed && pe2->filled)
kernelize(cs);
}
skip:
if (!r->peer.ip || !r->peer.port)
goto drop;
ZERO(sin);
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = r->peer.ip;
sin.sin_port = htons(r->peer.port);
ZERO(iov);
iov.iov_base = b;
iov.iov_len = l;
ZERO(mh);
mh.msg_name = &sin;
mh.msg_namelen = sizeof(sin);
mh.msg_iov = &iov;
mh.msg_iovlen = 1;
mh.msg_control = buf;
mh.msg_controllen = sizeof(buf);
ch = CMSG_FIRSTHDR(&mh);
ZERO(*ch);
ch->cmsg_len = CMSG_LEN(sizeof(*pi));
ch->cmsg_level = IPPROTO_IP;
ch->cmsg_type = IP_PKTINFO;
pi = (void *) CMSG_DATA(ch);
ZERO(*pi);
pi->ipi_spec_dst.s_addr = m->ip;
mh.msg_controllen = CMSG_SPACE(sizeof(*pi));
ret = sendmsg(p->fd, &mh, 0);
if (ret == -1 && errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
r->stats.errors++;
m->statsps.errors++;
return -1;
}
drop:
r->stats.packets++;
r->stats.bytes += l;
m->statsps.packets++;
m->statsps.bytes += l;
r->last = m->poller->now;
return 0;
}
static void stream_readable(int fd, void *p) {
struct streamrelay *r = p;
char buf[1024];
int ret;
struct sockaddr_in sin;
unsigned int sinlen;
for (;;) {
sinlen = sizeof(sin);
ret = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *) &sin, &sinlen);
if (ret == 0)
goto err;
else if (ret < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
break;
err:
stream_closed(fd, r);
break;
}
if (stream_packet(r, buf, ret, &sin)) {
mylog(LOG_WARNING, "Write error on RTP socket");
call_destroy(r->up->up->call);
return;
}
}
}
static int info_parse_func(char **a, void **ret, void *p) {
GHashTable *h = p;
g_hash_table_replace(h, strdup(a[0]), a[1] ? strdup(a[1]) : NULL);
return -1;
}
static GHashTable *info_parse(const char *s, GHashTable **h) {
GQueue *q;
if (!*h)
*h = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
q = pcre_multi_match(&info_re, &info_ree, "^([^:,]+)(?::(.*?))?(?:$|,)", s, 2, info_parse_func, *h);
g_queue_free(q);
return *h;
}
static int streams_parse_func(char **a, void **ret, void *p) {
struct stream *st;
st = malloc(sizeof(*st));
ZERO(*st);
st->ip = inet_addr(a[0]);
st->port = atoi(a[1]);
st->mediatype = strdup(a[2] ? : "");
if (st->ip == -1)
goto fail;
if (!st->port)
goto fail;
*ret = st;
return 0;
fail:
free(st);
return -1;
}
static GQueue *streams_parse(const char *s) {
return pcre_multi_match(&streams_re, &streams_ree, "^([\\d.]+):(\\d+)(?::(.*?))?(?:$|,)", s, 3, streams_parse_func, NULL);
}
static void streams_free(GQueue *q) {
struct stream *s;
while (q->head) {
s = g_queue_pop_head(q);
free(s->mediatype);
free(s);
}
g_queue_free(q);
}
struct iterator_helper {
GList *del;
struct streamrelay *ports[0x10000];
};
static void call_timer_iterator(void *key, void *val, void *ptr) {
struct call *c = val;
struct iterator_helper *hlp = ptr;
GList *it;
struct callstream *cs;
int i;
struct peer *p;
struct poller *po;
struct callmaster *cm;
unsigned int check;
if (!c->callstreams->head)
goto drop;
cm = c->callmaster;
po = cm->poller;
for (it = c->callstreams->head; it; it = it->next) {
cs = it->data;
for (i = 0; i < 2; i++) {
p = &cs->peers[i];
hlp->ports[p->rtps[0].localport] = &p->rtps[0];
hlp->ports[p->rtps[1].localport] = &p->rtps[1];
check = cm->timeout;
if (!p->rtps[0].peer.ip || !p->rtps[0].peer.port)
check = cm->silent_timeout;
if (po->now - p->rtps[0].last < check)
goto good;
}
}
mylog(LOG_INFO, "[%s] Closing call due to timeout", c->callid);
drop:
hlp->del = g_list_prepend(hlp->del, c);
return;
good:
;
}
#define DS(x) do { \
if (ke->stats.x < sr->kstats.x) \
d = 0; \
else \
d = ke->stats.x - sr->kstats.x; \
sr->stats.x += d; \
m->statsps.x += d; \
} while (0)
static void callmaster_timer(void *ptr) {
struct callmaster *m = ptr;
struct iterator_helper hlp;
GList *i;
struct call *c;
struct mediaproxy_list_entry *ke;
struct streamrelay *sr;
struct poller *po;
u_int64_t d;
ZERO(hlp);
po = m->poller;
g_hash_table_foreach(m->callhash, call_timer_iterator, &hlp);
memcpy(&m->stats, &m->statsps, sizeof(m->stats));
ZERO(m->statsps);
i = kernel_list(m->kernelid);
while (i) {
ke = i->data;
sr = hlp.ports[ke->target.target_port];
if (!sr)
goto next;
DS(packets);
DS(bytes);
DS(errors);
if (ke->stats.packets != sr->kstats.packets)
sr->last = po->now;
memcpy(&sr->kstats, &ke->stats, sizeof(sr->kstats));
next:
free(ke);
i = g_list_delete_link(i, i);
}
for (i = hlp.del; i; i = i->next) {
c = i->data;
call_destroy(c);
}
g_list_free(i);
}
#undef DS
struct callmaster *callmaster_new(struct poller *p) {
struct callmaster *c;
c = malloc(sizeof(*c));
ZERO(*c);
c->callhash = g_hash_table_new(g_str_hash, g_str_equal);
if (!c->callhash)
goto fail;
c->poller = p;
poller_timer(p, callmaster_timer, c);
return c;
fail:
free(c);
return NULL;
}
static int get_port(struct streamrelay *r, u_int16_t p) {
int fd;
struct sockaddr_in sin;
fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0)
return -1;
NONBLOCK(fd);
REUSEADDR(fd);
setsockopt(fd, SOL_IP, IP_TOS, &r->up->up->call->callmaster->tos, sizeof(r->up->up->call->callmaster->tos));
ZERO(sin);
sin.sin_family = AF_INET;
sin.sin_port = htons(p);
if (bind(fd, (struct sockaddr *) &sin, sizeof(sin)))
goto fail;
r->fd = fd;
r->localport = p;
return 0;
fail:
close(fd);
return -1;
}
static void get_port_pair(struct peer *p) {
struct call *c;
struct callmaster *m;
struct streamrelay *a, *b;
u_int16_t port;
c = p->up->call;
m = c->callmaster;
a = &p->rtps[0];
b = &p->rtps[1];
assert(a->fd == -1 && b->fd == -1);
port = m->lastport + 1;
for (;;) {
if (port < 1024)
port = 1024;
if (port == m->lastport)
goto fail;
if ((port & 1))
goto next;
if (get_port(a, port))
goto next;
port++;
if (get_port(b, port))
goto tryagain;
break;
tryagain:
close(a->fd);
next:
port++;
}
m->lastport = port;
mylog(LOG_DEBUG, "[%s] Opened ports %u/%u for RTP", c->callid, a->localport, b->localport);
return;
fail:
mylog(LOG_ERR, "[%s] Failed to get RTP port pair", c->callid);
if (a->fd != -1)
close(a->fd);
if (b->fd != -1)
close(b->fd);
a->fd = b->fd = -1;
}
static int setup_peer(struct peer *p, struct stream *s, char *tag) {
struct streamrelay *a, *b;
a = &p->rtps[0];
b = &p->rtps[1];
if (a->peer.ip != s->ip || a->peer.port != b->peer.port) {
p->confirmed = 0;
if (p->kernelized)
unkernelize(p);
}
a->peer.ip = b->peer.ip = s->ip;
a->peer.port = b->peer.port = s->port;
if (b->peer.port)
b->peer.port++;
strdupfree(&p->mediatype, s->mediatype);
strdupfree(&p->tag, tag);
p->filled = 1;
return 0;
}
static void steal_peer(struct peer *p, struct streamrelay *r) {
struct peer *s = r->up;
int i;
struct poller_item pi;
struct streamrelay *sr, *srs;
struct call *c;
struct poller *po;
ZERO(pi);
c = s->up->call;
po = c->callmaster->poller;
mylog(LOG_DEBUG, "[%s] Re-using existing open RTP ports", c->callid);
if (s->kernelized)
unkernelize(s);
p->filled = 1;
strmove(&p->mediatype, &s->mediatype);
strmove(&p->tag, &s->tag);
//p->kernelized = s->kernelized;
//s->kernelized = 0;
for (i = 0; i < 2; i++) {
sr = &p->rtps[i];
srs = &s->rtps[i];
if (sr->fd != -1) {
close(sr->fd);
poller_del_item(po, sr->fd);
}
sr->fd = srs->fd;
sr->peer.ip = srs->peer.ip;
sr->peer.port = srs->peer.port;
sr->localport = srs->localport;
srs->fd = -1;
srs->peer.ip = 0;
srs->peer.port = 0;
srs->localport = 0;
pi.fd = sr->fd;
pi.ptr = sr;
pi.readable = stream_readable;
pi.closed = stream_closed;
poller_update_item(po, &pi);
}
}
static void callstream_init(struct callstream *s, struct call *ca) {
int i, j;
struct peer *p;
struct streamrelay *r;
struct poller_item pi;
struct poller *po;
po = ca->callmaster->poller;
ZERO(*s);
ZERO(pi);
s->call = ca;
for (i = 0; i < 2; i++) {
p = &s->peers[i];
p->idx = i;
p->up = s;
p->tag = strdup("");
p->mediatype = strdup("");
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
r->fd = -1;
r->idx = j;
r->up = p;
r->last = po->now;
}
get_port_pair(p);
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
pi.fd = r->fd;
pi.ptr = r;
pi.readable = stream_readable;
pi.closed = stream_closed;
poller_add_item(po, &pi);
}
}
}
static unsigned int call_streams(struct call *c, GQueue *s, char *tag, int opmode) {
GQueue *q;
GList *i, *l;
struct stream *t;
int x;
struct streamrelay *r;
struct callstream *cs;
struct peer *p;
unsigned int ret;
q = g_queue_new(); /* new callstreams list */
if (!tag)
tag = "";
for (i = s->head; i; i = i->next) {
t = i->data;
p = NULL;
if (!opmode) {
DBG("creating new callstream");
cs = malloc(sizeof(*cs));
callstream_init(cs, c);
p = &cs->peers[0];
}
else {
l = c->callstreams->head;
if (!l) {
mylog(LOG_WARNING, "[%s] Got LOOKUP, but no callstreams found", c->callid);
break;
}
cs = l->data;
#if 0
if (cs->peers[1].filled) {
mylog(LOG_WARNING, "[%s] Got LOOKUP, but no incomplete callstreams found", c->callid);
break;
}
#endif
g_queue_delete_link(c->callstreams, l);
p = &cs->peers[1];
}
for (l = c->callstreams->head; l; l = l->next) {
cs = l->data;
for (x = 0; x < 2; x++) {
r = &cs->peers[x].rtps[0];
if (r->peer.ip != t->ip)
continue;
if (r->peer.port != t->port)
continue;
if (strcmp(cs->peers[x].tag, tag))
continue;
DBG("found existing call stream to steal");
goto found;
}
}
/* not found */
setup_peer(p, t, tag);
g_queue_push_tail(q, p->up);
continue;
found:
steal_peer(p, r);
g_queue_push_tail(q, p->up);
}
ret = q->length;
if (!q->head)
g_queue_free(q);
else {
if (c->callstreams->head) {
q->tail->next = c->callstreams->head;
c->callstreams->head->prev = q->tail;
q->tail = c->callstreams->tail;
q->length += c->callstreams->length;
c->callstreams->head = c->callstreams->tail = NULL;
c->callstreams->length = 0;
}
g_queue_free(c->callstreams);
c->callstreams = q;
}
return ret;
}
static void unkernelize(struct peer *p) {
struct streamrelay *r;
int i;
if (!p->kernelized)
return;
for (i = 0; i < 2; i++) {
r = &p->rtps[i];
kernel_del_stream(p->up->call->callmaster->kernelfd, r->localport);
}
p->kernelized = 0;
}
static void kill_callstream(struct callstream *s) {
int i, j;
struct peer *p;
struct streamrelay *r;
for (i = 0; i < 2; i++) {
p = &s->peers[i];
unkernelize(p);
free(p->tag);
free(p->mediatype);
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
if (r->fd != -1)
close(r->fd);
poller_del_item(s->call->callmaster->poller, r->fd);
}
}
free(s);
}
static void call_destroy(struct call *c) {
struct callmaster *m = c->callmaster;
struct callstream *s;
g_hash_table_remove(m->callhash, c->callid);
free(c->callid);
g_hash_table_destroy(c->infohash);
if (c->calling_agent)
free(c->calling_agent);
if (c->called_agent)
free(c->called_agent);
while (c->callstreams->head) {
s = g_queue_pop_head(c->callstreams);
kill_callstream(s);
}
g_queue_free(c->callstreams);
free(c);
}
static char *streams_print(GQueue *s, unsigned int num, unsigned int off) {
GString *o;
int i;
GList *l;
struct callstream *t;
struct streamrelay *x;
o = g_string_new("");
if (!s->head)
goto out;
t = s->head->data;
g_string_append_printf(o, IPF, IPP(t->call->callmaster->ip));
for (i = 0, l = s->head; i < num && l; i++, l = l->next) {
t = l->data;
x = &t->peers[off].rtps[0];
g_string_append_printf(o, " %u", x->localport);
}
out:
g_string_append(o, "\n");
return g_string_free(o, FALSE);
}
char *call_request(const char **o, struct callmaster *m) {
struct call *c;
GQueue *s;
unsigned int num;
c = g_hash_table_lookup(m->callhash, o[2]);
if (!c) {
mylog(LOG_NOTICE, "[%s] Creating new call", o[2]);
c = malloc(sizeof(*c));
ZERO(*c);
c->callmaster = m;
c->callid = strdup(o[2]);
c->callstreams = g_queue_new();
c->created = m->poller->now;
g_hash_table_insert(m->callhash, c->callid, c);
}
strdupfree(&c->calling_agent, o[9] ? : "UNKNOWN");
info_parse(o[10], &c->infohash);
s = streams_parse(o[3]);
num = call_streams(c, s, g_hash_table_lookup(c->infohash, "fromtag"), 0);
streams_free(s);
return streams_print(c->callstreams, num, 0);
}
char *call_lookup(const char **o, struct callmaster *m) {
struct call *c;
GQueue *s;
unsigned int num;
c = g_hash_table_lookup(m->callhash, o[2]);
if (!c) {
mylog(LOG_WARNING, "[%s] Got LOOKUP for unknown call-id", o[2]);
return NULL;
}
strdupfree(&c->called_agent, o[9] ? : "UNKNOWN");
info_parse(o[10], &c->infohash);
s = streams_parse(o[3]);
num = call_streams(c, s, g_hash_table_lookup(c->infohash, "totag"), 1);
streams_free(s);
return streams_print(c->callstreams, num, 1);
}
void call_delete(const char **o, struct callmaster *m) {
struct call *c;
c = g_hash_table_lookup(m->callhash, o[12]);
if (!c)
return;
call_destroy(c);
}
static void call_status_iterator(void *key, void *val, void *ptr) {
struct call *c = val;
struct control_stream *s = ptr;
GList *l;
struct callstream *cs;
struct peer *p;
struct streamrelay *r1, *r2;
struct streamrelay *rx1, *rx2;
struct callmaster *m;
m = c->callmaster;
streambuf_printf(s->outbuf, "session %s %s %s %s %s %i\n",
c->callid,
(char *) g_hash_table_lookup(c->infohash, "from"),
(char *) g_hash_table_lookup(c->infohash, "to"),
c->calling_agent, c->called_agent,
(int) (m->poller->now - c->created));
for (l = c->callstreams->head; l; l = l->next) {
cs = l->data;
p = &cs->peers[0];
r1 = &p->rtps[0];
r2 = &cs->peers[1].rtps[0];
rx1 = &p->rtps[1];
rx2 = &cs->peers[1].rtps[1];
if (r1->fd == -1 || r2->fd == -1)
continue;
streambuf_printf(s->outbuf, "stream " IPF ":%u " IPF ":%u " IPF ":%u %llu/%llu/%llu %s %s %s %i\n",
IPP(r1->peer.ip), r1->peer.port,
IPP(r2->peer.ip), r2->peer.port,
IPP(m->ip), r1->localport,
(long long unsigned int) r1->stats.bytes + rx1->stats.bytes,
(long long unsigned int) r2->stats.bytes + rx2->stats.bytes,
(long long unsigned int) r1->stats.bytes + rx1->stats.bytes + r2->stats.bytes + rx2->stats.bytes,
"active",
p->codec ? : "unknown",
p->mediatype, (int) (m->poller->now - r1->last));
}
}
void calls_status(struct callmaster *m, struct control_stream *s) {
streambuf_printf(s->outbuf, "proxy %u %llu/%llu/%llu\n",
g_hash_table_size(m->callhash),
(long long unsigned int) m->stats.bytes,
(long long unsigned int) m->stats.bytes - m->stats.errors,
(long long unsigned int) m->stats.bytes * 2 - m->stats.errors);
g_hash_table_foreach(m->callhash, call_status_iterator, s);
}

@ -0,0 +1,98 @@
#ifndef __CALL_H__
#define __CALL_H__
#include <sys/types.h>
#include "ipt_MEDIAPROXY.h"
struct poller;
struct control_stream;
struct peer;
struct callstream;
struct call;
struct callmaster;
struct stream {
u_int32_t ip;
u_int16_t port;
char *mediatype;
};
struct streamrelay {
int fd;
struct stream peer;
u_int16_t localport;
unsigned char idx;
struct peer *up;
struct mediaproxy_stats stats;
struct mediaproxy_stats kstats;
time_t last;
};
struct peer {
struct streamrelay rtps[2];
char *tag;
char *mediatype;
char *codec;
unsigned char idx;
struct callstream *up;
int kernelized:1;
int filled:1;
int confirmed:1;
};
struct callstream {
struct peer peers[2];
struct call *call;
};
struct call {
struct callmaster *callmaster;
GQueue *callstreams;
char *callid;
time_t created;
char *calling_agent;
char *called_agent;
GHashTable *infohash;
};
struct callmaster {
GHashTable *callhash;
u_int16_t lastport;
struct mediaproxy_stats statsps;
struct mediaproxy_stats stats;
struct poller *poller;
int kernelfd;
unsigned int kernelid;
u_int32_t ip;
unsigned int timeout;
unsigned int silent_timeout;
unsigned char tos;
};
struct callmaster *callmaster_new(struct poller *);
char *call_request(const char **, struct callmaster *);
char *call_lookup(const char **, struct callmaster *);
void call_delete(const char **, struct callmaster *);
void calls_status(struct callmaster *, struct control_stream *);
#endif

@ -0,0 +1,250 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pcre.h>
#include "control.h"
#include "poller.h"
#include "aux.h"
#include "streambuf.h"
#include "log.h"
#include "build_time.h"
#include "call.h"
static pcre *parse_re;
static pcre_extra *parse_ree;
static void control_stream_closed(int fd, void *p) {
struct control_stream *s = p;
struct control *c;
mylog(LOG_INFO, "Control connection from " DF " closed", DP(s->inaddr));
c = s->control;
c->stream_head = g_list_remove_link(c->stream_head, &s->link);
close(fd);
if (poller_del_item(s->poller, fd))
abort();
streambuf_destroy(s->inbuf);
streambuf_destroy(s->outbuf);
free(s);
}
static void control_list(struct control *c, struct control_stream *s) {
struct control_stream *i;
for (i = (void *) c->stream_head; i; i = (void *) i->link.next)
streambuf_printf(s->outbuf, DF "\n", DP(s->inaddr));
streambuf_printf(s->outbuf, "End.\n");
}
static int control_stream_parse(struct control_stream *s, char *line) {
const char *errptr;
int erroff;
int ovec[60];
int ret;
const char **out;
struct control *c = s->control;
char *output = NULL;
if (!parse_re) {
parse_re = pcre_compile(
/* reqtype callid streams ip fromdom fromtype todom totype agent info |reqtype callid info | reqtype */
"^(?:(request|lookup)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+info=(\\S*)|(delete)\\s+(\\S+)\\s+info=(\\S*)|(build|version|controls|quit|exit|status))$",
PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL);
parse_ree = pcre_study(parse_re, 0, &errptr);
}
ret = pcre_exec(parse_re, parse_ree, line, strlen(line), 0, 0, ovec, ARRAY_SIZE(ovec));
if (ret <= 0) {
mylog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(s->inaddr), line);
return -1;
}
mylog(LOG_INFO, "Got valid command from " DF ": %s", DP(s->inaddr), line);
pcre_get_substring_list(line, ovec, ret, &out);
if (!strcmp(out[1], "request"))
output = call_request(out, c->callmaster);
else if (!strcmp(out[1], "lookup"))
output = call_lookup(out, c->callmaster);
else if (!strcmp(out[11], "delete"))
call_delete(out, c->callmaster);
else if (!strcmp(out[14], "status"))
calls_status(c->callmaster, s);
else if (!strcmp(out[14], "build") | !strcmp(out[14], "version"))
streambuf_printf(s->outbuf, "Build: %s\n", BUILD_TIME);
else if (!strcmp(out[14], "controls"))
control_list(c, s);
else if (!strcmp(out[14], "quit") || !strcmp(out[14], "exit"))
;
if (output) {
streambuf_write(s->outbuf, output, strlen(output));
free(output);
}
pcre_free(out);
return -1;
}
static void control_stream_timer(int fd, void *p) {
struct control_stream *s = p;
struct poller *o = s->poller;
if ((o->now - s->inbuf->active) >= 60 || (o->now - s->outbuf->active) >= 60)
control_stream_closed(s->fd, s);
}
static void control_stream_readable(int fd, void *p) {
struct control_stream *s = p;
char *line;
int ret;
if (streambuf_readable(s->inbuf))
goto close;
while ((line = streambuf_getline(s->inbuf))) {
mylog(LOG_DEBUG, "Got control line from " DF ": %s", DP(s->inaddr), line);
ret = control_stream_parse(s, line);
free(line);
if (ret)
goto close;
}
if (streambuf_bufsize(s->inbuf) > 1024) {
mylog(LOG_WARNING, "Buffer length exceeded in control connection from " DF, DP(s->inaddr));
goto close;
}
return;
close:
control_stream_closed(fd, s);
}
static void control_stream_writeable(int fd, void *p) {
struct control_stream *s = p;
if (streambuf_writeable(s->outbuf))
control_stream_closed(fd, s);
}
static void control_closed(int fd, void *p) {
abort();
}
static void control_incoming(int fd, void *p) {
int nfd;
struct control *c = p;
struct control_stream *s;
struct poller_item i;
struct sockaddr_in sin;
socklen_t sinl;
sinl = sizeof(sin);
nfd = accept(fd, (struct sockaddr *) &sin, &sinl);
if (nfd == -1)
return;
NONBLOCK(nfd);
mylog(LOG_INFO, "New control connection from " DF, DP(sin));
s = malloc(sizeof(*s));
ZERO(*s);
ZERO(i);
i.fd = nfd;
i.closed = control_stream_closed;
i.readable = control_stream_readable;
i.writeable = control_stream_writeable;
i.timer = control_stream_timer;
i.ptr = s;
if (poller_add_item(c->poller, &i))
goto fail;
s->fd = nfd;
s->control = c;
s->poller = c->poller;
s->inbuf = streambuf_new(c->poller, nfd);
s->outbuf = streambuf_new(c->poller, nfd);
memcpy(&s->inaddr, &sin, sizeof(s->inaddr));
c->stream_head = g_list_link(c->stream_head, &s->link);
return;
fail:
free(s);
close(nfd);
}
struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, struct callmaster *m) {
int fd;
struct control *c;
struct poller_item i;
struct sockaddr_in sin;
if (!p)
return NULL;
if (!m)
return NULL;
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1)
return NULL;
NONBLOCK(fd);
REUSEADDR(fd);
ZERO(sin);
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = ip;
sin.sin_port = htons(port);
if (bind(fd, (struct sockaddr *) &sin, sizeof(sin)))
goto fail;
if (listen(fd, 5))
goto fail;
c = malloc(sizeof(*c));
ZERO(*c);
c->fd = fd;
c->poller = p;
c->callmaster = m;
ZERO(i);
i.fd = fd;
i.closed = control_closed;
i.readable = control_incoming;
i.ptr = c;
if (poller_add_item(p, &i))
goto fail2;
return c;
fail2:
free(c);
fail:
close(fd);
return NULL;
}

@ -0,0 +1,48 @@
#ifndef __CONTROL_H__
#define __CONTROL_H__
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <glib.h>
struct poller;
struct control;
struct streambuf;
struct callmaster;
struct control_stream {
GList link; /* must be first */
int fd;
struct streambuf *inbuf;
struct streambuf *outbuf;
struct sockaddr_in inaddr;
struct control *control;
struct poller *poller;
};
struct control {
int fd;
GList *stream_head;
struct poller *poller;
struct callmaster *callmaster;
};
struct control *control_new(struct poller *, u_int32_t, u_int16_t, struct callmaster *);
#endif

@ -0,0 +1,128 @@
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <glib.h>
#include "ipt_MEDIAPROXY.h"
#include "aux.h"
#include "kernel.h"
#if 1
#define PREFIX "/proc/mediaproxy"
#else
#define PREFIX "/tmp/mediaproxy"
#endif
int kernel_create_table(unsigned int id) {
char str[64];
int fd;
int i;
fd = open(PREFIX "/control", O_WRONLY | O_TRUNC);
if (fd == -1)
return -1;
sprintf(str, "add %u\n", id);
i = write(fd, str, strlen(str));
if (i == -1)
goto fail;
close(fd);
return 0;
fail:
close(fd);
return -1;
}
int kernel_open_table(unsigned int id) {
char str[64];
int fd;
struct mediaproxy_message msg;
int i;
sprintf(str, PREFIX "/%u/control", id);
fd = open(str, O_WRONLY | O_TRUNC);
if (fd == -1)
return -1;
ZERO(msg);
msg.cmd = MMG_NOOP;
i = write(fd, &msg, sizeof(msg));
if (i <= 0)
goto fail;
return fd;
fail:
close(fd);
return -1;
}
int kernel_add_stream(int fd, struct kernel_stream *info, int update) {
struct mediaproxy_message msg;
ZERO(msg);
msg.cmd = update ? MMG_UPDATE : MMG_ADD;
msg.target.target_port = info->local_port;
msg.target.src_ip = info->src.ip;
msg.target.dst_ip = info->dest.ip;
msg.target.src_port = info->src.port;
msg.target.dst_port = info->dest.port;
msg.target.mirror_ip = info->mirror.ip;
msg.target.mirror_port = info->mirror.port;
msg.target.tos = info->tos;
return write(fd, &msg, sizeof(msg)) <= 0 ? -1 : 0;
}
int kernel_del_stream(int fd, u_int16_t p) {
struct mediaproxy_message msg;
ZERO(msg);
msg.cmd = MMG_DEL;
msg.target.target_port = p;
return write(fd, &msg, sizeof(msg)) <= 0 ? -1 : 0;
}
GList *kernel_list(unsigned int id) {
char str[64];
int fd;
struct mediaproxy_list_entry *buf;
GList *li = NULL;
int ret;
sprintf(str, PREFIX "/%u/blist", id);
fd = open(str, O_RDONLY);
if (fd == -1)
return NULL;
for (;;) {
buf = malloc(sizeof(*buf));
ret = read(fd, buf, sizeof(*buf));
if (ret != sizeof(*buf))
break;
li = g_list_prepend(li, buf);
}
free(buf);
close(fd);
return li;
}

@ -0,0 +1,38 @@
#ifndef __KERNEL_H__
#define __KERNEL_H__
#include <sys/types.h>
#include <glib.h>
struct ip_port {
u_int32_t ip;
u_int16_t port;
};
struct kernel_stream {
u_int16_t local_port;
struct ip_port src;
struct ip_port dest;
struct ip_port mirror;
unsigned char tos;
};
int kernel_create_table(unsigned int);
int kernel_open_table(unsigned int);
int kernel_add_stream(int, struct kernel_stream *, int);
int kernel_del_stream(int, u_int16_t);
GList *kernel_list(unsigned int);
#endif

@ -0,0 +1,12 @@
#ifndef __LOG_H__
#define __LOG_H__
#include <syslog.h>
#define mylog(x,y...) syslog(x,y)
#endif

@ -0,0 +1,199 @@
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <sys/resource.h>
#include <glib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "poller.h"
#include "control.h"
#include "aux.h"
#include "log.h"
#include "call.h"
#include "kernel.h"
#define die(x...) do { fprintf(stderr, x); exit(-1); } while(0)
static char *pidfile;
static gboolean foreground;
static u_int32_t ip;
static u_int32_t listenp;
static u_int16_t listenport;
static int tos;
static int table;
static int timeout;
static int silent_timeout;
static void signals(void) {
signal(SIGPIPE, SIG_IGN);
}
static int rlim(int res, rlim_t val) {
struct rlimit rlim;
ZERO(rlim);
rlim.rlim_cur = rlim.rlim_max = val;
return setrlimit(res, &rlim);
}
static void resources(void) {
int tryv;
rlim(RLIMIT_CORE, RLIM_INFINITY);
for (tryv = ((1<<16) - 1); tryv && rlim(RLIMIT_NOFILE, tryv) == -1; tryv >>= 1)
;
rlim(RLIMIT_DATA, RLIM_INFINITY);
rlim(RLIMIT_RSS, RLIM_INFINITY);
rlim(RLIMIT_AS, RLIM_INFINITY);
}
static void options(int *argc, char ***argv) {
static char *ips;
static char *listenps;
static GOptionEntry e[] = {
{ "table", 't', 0, G_OPTION_ARG_INT, &table, "Kernel table to use", "INT" },
{ "ip", 'i', 0, G_OPTION_ARG_STRING, &ips, "Local IP address", "IP" },
{ "listen", 'l', 0, G_OPTION_ARG_STRING, &listenps, "Port to listen on", "[IP:]PORT" },
{ "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "TOS value to set on streams", "INT" },
{ "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" },
{ "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" },
{ "pidfile", 'p', 0, G_OPTION_ARG_STRING, &pidfile, "Write PID to file", "FILE" },
{ "foreground", 'f', 0, G_OPTION_ARG_NONE, &foreground, "Don't fork to background", NULL },
{ NULL, }
};
GOptionContext *c;
GError *er = NULL;
char *p;
c = g_option_context_new(" - next-generation media proxy");
g_option_context_add_main_entries(c, e, NULL);
if (!g_option_context_parse(c, argc, argv, &er))
die("Bad command line: %s\n", er->message);
if (!ips)
die("Missing option IP\n");
if (!listenps)
die("Missing option LISTEN\n");
ip = inet_addr(ips);
if (ip == -1)
die("Invalid IP\n");
p = strchr(listenps, ':');
if (p) {
*p++ = 0;
listenp = inet_addr(listenps);
if (listenp == -1)
die("Invalid IP\n");
listenport = atoi(p);
}
else {
if (strchr(listenps, '.'))
die("Invalid port\n");
listenport = atoi(listenps);
}
if (!listenport)
die("Invalid port\n");
if (p)
*--p = ':';
if (tos < 0 || tos > 255)
die("Invalid TOS value");
if (timeout <= 0)
timeout = 60;
if (silent_timeout <= 0)
silent_timeout = 3600;
}
static void daemonize(void) {
printf("Going to background...\n");
if (fork())
_exit(0);
freopen("/dev/null", "r", stdin);
freopen("/dev/null", "w", stdout);
freopen("/dev/null", "w", stderr);
setpgrp();
}
static void wpidfile(void) {
FILE *fp;
if (!pidfile)
return;
fp = fopen(pidfile, "w");
if (fp) {
fprintf(fp, "%u\n", getpid());
fclose(fp);
}
}
int main(int argc, char **argv) {
struct poller *p;
struct callmaster *m;
struct control *c;
int kfd;
int ret;
options(&argc, &argv);
signals();
resources();
if (kernel_create_table(table))
die("Failed to create kernel table %i\n", table);
kfd = kernel_open_table(table);
if (kfd == -1)
die("Failed to open kernel table %i\n", table);
p = poller_new();
if (!p)
die("poller creation failed\n");
m = callmaster_new(p);
if (!m)
return -1;
m->kernelfd = kfd;
m->kernelid = table;
m->ip = ip;
m->timeout = timeout;
m->silent_timeout = silent_timeout;
m->tos = tos;
c = control_new(p, listenp, listenport, m);
if (!c)
die("Failed to open control connection port\n");
mylog(LOG_INFO, "Startup complete");
if (!foreground)
daemonize();
wpidfile();
for (;;) {
ret = poller_poll(p, 100);
if (ret == -1)
break;
}
return 0;
}

@ -0,0 +1,60 @@
#include <stdio.h>
#include <assert.h>
#include "poller.h"
void dummy(int a, void *b) {
}
int main() {
struct poller *p;
struct poller_item i;
p = poller_new();
if (!p) {
fprintf(stderr, "poller creation failed\n");
return -1;
}
assert(p->items_size == 0);
assert(p->pollfds_size == 0);
i.readable = dummy;
i.writeable = dummy;
i.closed = dummy;
i.fd = 3;
assert(poller_add_item(p, &i) == 0);
i.fd = 4;
assert(poller_add_item(p, &i) == 0);
i.fd = 2;
assert(poller_add_item(p, &i) == 0);
i.fd = 6;
assert(poller_add_item(p, &i) == 0);
i.fd = 0;
assert(poller_add_item(p, &i) == 0);
i.fd = 1;
assert(poller_add_item(p, &i) == 0);
i.fd = 5;
assert(poller_add_item(p, &i) == 0);
i.fd = 7;
assert(poller_add_item(p, &i) == 0);
i.fd = 9;
assert(poller_add_item(p, &i) == 0);
assert(poller_del_item(p, 10) == -1);
assert(poller_del_item(p, 6) == 0);
assert(poller_del_item(p, 8) == -1);
assert(poller_del_item(p, 0) == 0);
assert(poller_del_item(p, 3) == 0);
assert(poller_del_item(p, 11) == -1);
assert(poller_del_item(p, 9) == 0);
assert(poller_del_item(p, 11) == -1);
assert(poller_del_item(p, 4) == 0);
return 0;
}

@ -0,0 +1,311 @@
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/types.h>
#include <poll.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <errno.h>
#include "poller.h"
#include "aux.h"
#define POLLER_BSEARCH(b,l,k,e) mybsearch(b, l, sizeof(struct pollfd), k, OFFSET_OF(struct pollfd, fd), sizeof(*(k)), e)
struct timer_item {
void (*func)(void *);
void *ptr;
};
struct poller *poller_new(void) {
struct poller *p;
p = malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
p->now = time(NULL);
return p;
}
int poller_add_item(struct poller *p, struct poller_item *i) {
struct poller_item *ip;
struct pollfd *pf;
int idx;
unsigned int u;
if (!p || !i)
return -1;
if (i->fd < 0)
return -1;
if (!i->readable && !i->writeable)
return -1;
if (!i->closed)
return -1;
if (i->fd < p->items_size && p->items[i->fd])
return -1;
idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &i->fd, 0);
assert(idx < 0);
idx *= -1;
idx--;
p->pollfds_size++;
p->pollfds = realloc(p->pollfds, p->pollfds_size * sizeof(*p->pollfds));
memmove(p->pollfds + idx + 1, p->pollfds + idx, (p->pollfds_size - idx - 1) * sizeof(*p->pollfds));
pf = &p->pollfds[idx];
pf->fd = i->fd;
pf->events = POLLHUP | POLLERR | ((i->writeable && i->blocked) ? POLLOUT : 0) | (i->readable ? POLLIN : 0);
pf->revents = 0;
if (i->fd >= p->items_size) {
u = p->items_size;
p->items_size = i->fd + 1;
p->items = realloc(p->items, sizeof(*p->items) * p->items_size);
memset(p->items + u, 0, sizeof(*p->items) * (p->items_size - u - 1));
}
ip = malloc(sizeof(*ip));
memcpy(ip, i, sizeof(*ip));
p->items[i->fd] = ip;
return 0;
}
int poller_del_item(struct poller *p, int fd) {
int idx;
if (!p || fd < 0)
return -1;
if (fd >= p->items_size)
return -1;
if (!p->items || !p->items[fd])
return -1;
if (!p->pollfds || !p->pollfds_size)
return -1;
idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &fd, 1);
assert(idx != -1);
memmove(p->pollfds + idx, p->pollfds + idx + 1, (p->pollfds_size - idx - 1) * sizeof(*p->pollfds));
p->pollfds_size--;
p->pollfds = realloc(p->pollfds, p->pollfds_size * sizeof(*p->pollfds));
if (p->pollfds_work) {
idx = POLLER_BSEARCH(p->pollfds_work, p->pollfds_work_size, &fd, 1);
if (idx != -1)
p->pollfds_work[idx].fd = -1;
}
free(p->items[fd]);
p->items[fd] = NULL;
return 0;
}
int poller_update_item(struct poller *p, struct poller_item *i) {
struct poller_item *np;
if (!p || !i)
return -1;
if (i->fd < 0)
return -1;
if (!i->readable && !i->writeable)
return -1;
if (!i->closed)
return -1;
if (i->fd >= p->items_size || !(np = p->items[i->fd]))
return poller_add_item(p, i);
np->ptr = i->ptr;
np->readable = i->readable;
np->writeable = i->writeable;
np->closed = i->closed;
np->timer = i->timer;
return 0;
}
int poller_poll(struct poller *p, int timeout) {
struct pollfd *pfd, *pf;
int ret, i;
struct poller_item *it;
int idx;
time_t last;
int do_timer;
GList *li;
struct timer_item *ti;
if (!p)
return -1;
if (!p->pollfds || !p->pollfds_size)
return -1;
if (!p->items || !p->items_size)
return -1;
p->pollfds_work_size = i = p->pollfds_size;
p->pollfds_work = pfd = malloc(sizeof(*pfd) * i);
memcpy(pfd, p->pollfds, sizeof(*pfd) * i);
do_timer = 0;
last = p->now;
p->now = time(NULL);
if (last != p->now) {
do_timer = 1;
ret = i;
for (li = p->timers; li; li = li->next) {
ti = li->data;
ti->func(ti->ptr);
}
}
else {
ret = poll(pfd, i, timeout);
if (errno == EINTR)
ret = 0;
if (ret < 0)
goto out;
}
pf = pfd;
for (pf = pfd; i; pf++) {
i--;
if (pf->fd < 0)
continue;
it = (pf->fd < p->items_size) ? p->items[pf->fd] : NULL;
if (!it)
continue;
if (do_timer) {
if (it->timer)
it->timer(it->fd, it->ptr);
continue;
}
if (it->error) {
it->closed(it->fd, it->ptr);
continue;
}
if ((pf->revents & (POLLERR | POLLHUP)))
it->closed(it->fd, it->ptr);
else if ((pf->revents & POLLOUT)) {
it->blocked = 0;
idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &it->fd, 1);
assert(idx != -1);
p->pollfds[idx].events &= ~POLLOUT;
it->writeable(it->fd, it->ptr);
}
else if ((pf->revents & POLLIN))
it->readable(it->fd, it->ptr);
else if (!pf->revents)
continue;
else
abort();
}
out:
free(pfd);
p->pollfds_work = NULL;
p->pollfds_work_size = 0;
return ret;
}
void poller_blocked(struct poller *p, int fd) {
int idx;
if (!p || fd < 0)
return;
if (fd >= p->items_size)
return;
if (!p->items || !p->items[fd])
return;
if (!p->pollfds || !p->pollfds_size)
return;
if (!p->items[fd]->writeable)
return;
p->items[fd]->blocked = 1;
idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &fd, 1);
assert(idx != -1);
p->pollfds[idx].events |= POLLOUT;
}
void poller_error(struct poller *p, int fd) {
if (!p || fd < 0)
return;
if (fd >= p->items_size)
return;
if (!p->items || !p->items[fd])
return;
if (!p->pollfds || !p->pollfds_size)
return;
if (!p->items[fd]->writeable)
return;
p->items[fd]->error = 1;
p->items[fd]->blocked = 1;
}
int poller_isblocked(struct poller *p, int fd) {
if (!p || fd < 0)
return -1;
if (fd >= p->items_size)
return -1;
if (!p->items || !p->items[fd])
return -1;
if (!p->pollfds || !p->pollfds_size)
return -1;
if (!p->items[fd]->writeable)
return -1;
return p->items[fd]->blocked;
}
int poller_timer(struct poller *p, void (*f)(void *), void *ptr) {
struct timer_item *i;
if (!p || !f)
return -1;
i = malloc(sizeof(*i));
ZERO(*i);
i->func = f;
i->ptr = ptr;
p->timers = g_list_prepend(p->timers, i);
return 0;
}

@ -0,0 +1,51 @@
#ifndef __POLLER_H__
#define __POLLER_H__
#include <sys/types.h>
#include <time.h>
#include <glib.h>
struct poller_item {
int fd;
void *ptr;
void (*readable)(int, void *);
void (*writeable)(int, void *);
void (*closed)(int, void *);
void (*timer)(int, void *);
int blocked:1;
int error:1;
};
struct poller {
struct poller_item **items;
unsigned int items_size;
struct pollfd *pollfds;
unsigned int pollfds_size;
GList *timers;
time_t now;
struct pollfd *pollfds_work;
unsigned int pollfds_work_size;
};
struct poller *poller_new(void);
int poller_add_item(struct poller *, struct poller_item *);
int poller_update_item(struct poller *, struct poller_item *);
int poller_del_item(struct poller *, int);
int poller_poll(struct poller *, int);
void poller_blocked(struct poller *, int);
int poller_isblocked(struct poller *, int);
void poller_error(struct poller *, int);
int poller_timer(struct poller *, void (*)(void *), void *);
#endif

@ -0,0 +1,172 @@
#include <stdio.h>
#include <glib.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <stdarg.h>
#include <time.h>
#include "streambuf.h"
#include "poller.h"
#include "aux.h"
struct streambuf *streambuf_new(struct poller *p, int fd) {
struct streambuf *b;
b = malloc(sizeof(*b));
ZERO(*b);
b->buf = g_string_new("");
b->fd = fd;
b->poller = p;
b->active = p->now;
return b;
}
void streambuf_destroy(struct streambuf *b) {
g_string_free(b->buf, TRUE);
free(b);
}
int streambuf_writeable(struct streambuf *b) {
int ret;
unsigned int out;
for (;;) {
if (!b->buf->len)
break;
out = (b->buf->len > 1024) ? 1024 : b->buf->len;
ret = write(b->fd, b->buf->str, out);
if (ret < 0) {
if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK)
return -1;
ret = 0;
}
if (ret > 0) {
g_string_erase(b->buf, 0, ret);
b->active = b->poller->now;
}
if (ret != out) {
poller_blocked(b->poller, b->fd);
break;
}
}
return 0;
}
int streambuf_readable(struct streambuf *b) {
int ret;
char buf[1024];
for (;;) {
ret = read(b->fd, buf, 1024);
if (ret == 0)
return -1;
if (ret < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
break;
return -1;
}
g_string_append_len(b->buf, buf, ret);
b->active = b->poller->now;
}
return 0;
}
char *streambuf_getline(struct streambuf *b) {
char *p;
int len;
char *s = NULL;
for (;;) {
if (s)
free(s);
p = memchr(b->buf->str, '\n', b->buf->len);
if (!p)
return NULL;
len = p - b->buf->str;
if (len == 0) {
g_string_erase(b->buf, 0, 1);
continue;
}
s = malloc(len + 1);
memcpy(s, b->buf->str, len);
s[len] = '\0';
g_string_erase(b->buf, 0, len + 1);
if (s[--len] == '\r') {
if (len == 0)
continue;
s[len] = '\0';
}
break;
}
return s;
}
unsigned int streambuf_bufsize(struct streambuf *b) {
return b->buf->len;
}
void streambuf_printf(struct streambuf *b, char *f, ...) {
va_list va;
GString *gs;
va_start(va, f);
gs = g_string_new("");
g_string_vprintf(gs, f, va);
va_end(va);
streambuf_write(b, gs->str, gs->len);
g_string_free(gs, TRUE);
}
void streambuf_write(struct streambuf *b, char *s, unsigned int len) {
unsigned int out;
int ret;
while (len && !poller_isblocked(b->poller, b->fd)) {
out = (len > 1024) ? 1024 : len;
ret = write(b->fd, s, out);
if (ret < 0) {
if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
poller_error(b->poller, b->fd);
break;
}
poller_blocked(b->poller, b->fd);
break;
}
if (ret == 0)
break;
s += ret;
len -= ret;
b->active = b->poller->now;
}
if (b->buf->len > 5242880)
poller_error(b->poller, b->fd);
else if (len)
g_string_append_len(b->buf, s, len);
}

@ -0,0 +1,35 @@
#ifndef __BUFFER_H__
#define __BUFFER_H__
#include <sys/types.h>
#include <time.h>
#include <glib.h>
struct poller;
struct streambuf {
GString *buf;
int fd;
struct poller *poller;
time_t active;
};
struct streambuf *streambuf_new(struct poller *, int);
void streambuf_destroy(struct streambuf *);
int streambuf_writeable(struct streambuf *);
int streambuf_readable(struct streambuf *);
char *streambuf_getline(struct streambuf *);
unsigned int streambuf_bufsize(struct streambuf *);
void streambuf_printf(struct streambuf *, char *, ...) __attribute__ ((format (printf, 2, 3)));
void streambuf_write(struct streambuf *, char *, unsigned int);
#endif

5
debian/changelog vendored

@ -0,0 +1,5 @@
mediaproxy-ng (0.1) unstable; urgency=low
* Initial release.
-- Michael Prokop <mprokop@sipwise.com> Thu, 11 Feb 2010 23:06:08 +0100

1
debian/compat vendored

@ -0,0 +1 @@
5

19
debian/control vendored

@ -0,0 +1,19 @@
Source: mediaproxy-ng
Section: net
Priority: extra
Maintainer: Michael Prokop <mprokop@sipwise.com>
Build-Depends: debhelper (>= 5), iptables-dev, libglib2.0-dev, libpcre3-dev
Standards-Version: 3.8.4
Homepage: http://sipwise.com/
Package: mediaproxy-ng-daemon
Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}
Description: TODO
TODO
Package: mediaproxy-ng-iptables
Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}
Description: TODO
TODO

7
debian/copyright vendored

@ -0,0 +1,7 @@
Upstream Author: The Sipwise Team - http://sipwise.com/
Copyright: Copyright (c) 2007-2010 Sipwise GmbH, Austria
License: All software included in this package is
Copyright (c) Sipwise GmbH, Austria.
All rights reserved. You may not copy, distribute
or modify without prior written permission from
Sipwise GmbH, Austria.

1
debian/dirs vendored

@ -0,0 +1 @@
usr/sbin

@ -0,0 +1 @@
daemon/mediaproxy-ng /usr/sbin/

@ -0,0 +1 @@
iptables-extension/libipt_MEDIAPROXY.so /lib/xtables/

81
debian/rules vendored

@ -0,0 +1,81 @@
#!/usr/bin/make -f
# -*- makefile -*-
# Sample debian/rules that uses debhelper.
# This file was originally written by Joey Hess and Craig Small.
# As a special exception, when this file is copied by dh-make into a
# dh-make output file, you may use that output file without restriction.
# This special exception was added by Craig Small in version 0.37 of dh-make.
# Uncomment this to turn on verbose mode.
#export DH_VERBOSE=1
b=$(CURDIR)/debian/build
build: build-stamp
build-stamp:
dh_testdir
cd iptables-extension/ && gcc -O2 -Wall -shared -o libipt_MEDIAPROXY.so libipt_MEDIAPROXY-1.4.c && cd ..
cd daemon && $(MAKE) && cd ..
touch $@
clean:
dh_testdir
dh_testroot
rm -f build-stamp
rm -f iptables-extension/libipt_MEDIAPROXY.so
rm -f daemon/mediaproxy-ng daemon/build_time.h daemon/.depend kernel-module/.ipt_MEDIAPROXY.o.d
rm -rf kernel-module/.tmp_versions
dh_clean
-rm -rf debian/build
install: build
dh_testdir
dh_testroot
dh_clean -k
dh_installdirs
mediaproxy-ng-daemon: install
@echo "--- Building: $@"
dh_installdirs -p$@ -P$(b)/$@
dh_link -p$@ -P$(b)/$@
dh_installdocs -p$@ -P$(b)/$@
dh_installchangelogs -p$@ -P$(b)/$@
dh_install -p$@ -P$(b)/$@
dh_strip -p$@ -P$(b)/$@
dh_compress -p$@ -P$(b)/$@
dh_fixperms -p$@ -P$(b)/$@
dh_makeshlibs -p$@ -P$(b)/$@ -V
dh_installdeb -p$@ -P$(b)/$@
dh_shlibdeps -p$@ -P$(b)/$@
dh_installdebconf -p$@ -P$(b)/$@
dh_gencontrol -p$@ -P$(b)/$@
dh_md5sums -p$@ -P$(b)/$@
dh_builddeb -p$@ -P$(b)/$@
mediaproxy-ng-iptables: install
@echo "--- Building: $@"
dh_installdirs -p$@ -P$(b)/$@
dh_link -p$@ -P$(b)/$@
dh_installdocs -p$@ -P$(b)/$@
dh_installchangelogs -p$@ -P$(b)/$@
dh_install -p$@ -P$(b)/$@
dh_strip -p$@ -P$(b)/$@
dh_compress -p$@ -P$(b)/$@
dh_fixperms -p$@ -P$(b)/$@
dh_makeshlibs -p$@ -P$(b)/$@ -V
dh_installdeb -p$@ -P$(b)/$@
dh_shlibdeps -p$@ -P$(b)/$@
dh_installdebconf -p$@ -P$(b)/$@
dh_gencontrol -p$@ -P$(b)/$@
dh_md5sums -p$@ -P$(b)/$@
dh_builddeb -p$@ -P$(b)/$@
# Build architecture-dependent files here.
binary-all: build install
# Build architecture-independent files here.
binary-indep: build install mediaproxy-ng-daemon mediaproxy-ng-iptables
binary: binary-indep binary-arch
.PHONY: build clean binary-indep binary-arch binary install

@ -0,0 +1,81 @@
/* gcc -O2 -Wall -shared -o libipt_MEDIAPROXY.so libipt_MEDIAPROXY.c */
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <xtables.h>
#include <linux/netfilter_ipv4/ip_tables.h>
#include "../kernel-module/ipt_MEDIAPROXY.h"
static void help(void) {
printf(
"MEDIAPROXY target options:\n"
" --id <id>\n"
" Unique ID for this instance\n"
);
}
static int parse(int c,
char **argv,
int invert,
unsigned int *flags,
const void *entry,
struct xt_entry_target **target) {
struct ipt_mediaproxy_info *info = (void *) *target;
if (c == '1') {
info->id = atoi(optarg);
if (flags)
*flags = 1;
}
else
return 0;
return 1;
}
static void final_check(unsigned int flags) {
if (!flags)
exit_error(PARAMETER_PROBLEM, "You must specify --id");
}
static void print(const void *ip, const struct xt_entry_target *target, int numeric) {
struct ipt_mediaproxy_info *info = (void *) target;
printf("id %u", info->id);
}
static void save(const void *ip, const struct xt_entry_target *target) {
struct ipt_mediaproxy_info *info = (void *) target;
printf("--id %u", info->id);
}
static struct option opts[] = {
{ "id", 1, NULL, '1' },
{ NULL, },
};
static struct xtables_target mediaproxy = {
.name = "MEDIAPROXY",
.family = AF_INET,
.version = "1.4.2",
.size = XT_ALIGN(sizeof(struct ipt_mediaproxy_info)),
.userspacesize = XT_ALIGN(sizeof(struct ipt_mediaproxy_info)),
.help = help,
.parse = parse,
.final_check = final_check,
.print = print,
.save = save,
.extra_opts = opts,
};
void _init(void) {
xtables_register_target(&mediaproxy);
}

@ -0,0 +1,80 @@
/* gcc -O2 -Wall -shared -o libipt_MEDIAPROXY.so libipt_MEDIAPROXY.c */
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <iptables.h>
#include <linux/netfilter_ipv4/ip_tables.h>
#include "../kernel-module/ipt_MEDIAPROXY.h"
static void help(void) {
printf(
"MEDIAPROXY target options:\n"
" --id <id>\n"
" Unique ID for this instance\n"
);
}
static int parse(int c,
char **argv,
int invert,
unsigned int *flags,
const struct ipt_entry *entry,
struct ipt_entry_target **target) {
struct ipt_mediaproxy_info *info = (void *) (*target)->data;
if (c == '1') {
info->id = atoi(optarg);
if (flags)
*flags = 1;
}
else
return 0;
return 1;
}
static void final_check(unsigned int flags) {
if (!flags)
exit_error(PARAMETER_PROBLEM, "You must specify --id");
}
static void print(const struct ipt_ip *ip, const struct ipt_entry_target *target, int numeric) {
struct ipt_mediaproxy_info *info = (void *) target->data;
printf("id %u", info->id);
}
static void save(const struct ipt_ip *ip, const struct ipt_entry_target *target) {
struct ipt_mediaproxy_info *info = (void *) target->data;
printf("--id %u", info->id);
}
static struct option opts[] = {
{ "id", 1, NULL, '1' },
{ NULL, },
};
static struct iptables_target mediaproxy = {
.name = "MEDIAPROXY",
.version = "1.3.6",
.size = IPT_ALIGN(sizeof(struct ipt_mediaproxy_info)),
.userspacesize = IPT_ALIGN(sizeof(struct ipt_mediaproxy_info)),
.help = help,
.parse = parse,
.final_check = final_check,
.print = print,
.save = save,
.extra_opts = opts,
};
void _init(void) {
register_target(&mediaproxy);
}

@ -0,0 +1 @@
obj-m += ipt_MEDIAPROXY.o

File diff suppressed because it is too large Load Diff

@ -0,0 +1,75 @@
#ifndef IPT_RTPPROXY_H
#define IPT_RTPPROXY_H
struct ipt_mediaproxy_info {
u_int32_t id;
};
struct mediaproxy_stats {
u_int64_t packets;
u_int64_t bytes;
u_int64_t errors;
};
struct mediaproxy_target_info {
u_int16_t target_port;
u_int32_t src_ip;
u_int32_t dst_ip;
u_int16_t src_port;
u_int16_t dst_port;
u_int32_t mirror_ip;
u_int16_t mirror_port;
unsigned char tos;
};
struct mediaproxy_message {
enum {
MMG_NOOP = 1,
MMG_ADD,
MMG_DEL,
MMG_UPDATE,
} cmd;
struct mediaproxy_target_info target;
};
struct mediaproxy_list_entry {
struct mediaproxy_target_info target;
struct mediaproxy_stats stats;
};
#ifdef __KERNEL__
struct mediaproxy_target {
atomic_t refcnt;
u_int32_t table;
struct mediaproxy_target_info target;
spinlock_t lock;
struct mediaproxy_stats stats;
};
struct mediaproxy_table {
atomic_t refcnt;
rwlock_t target_lock;
pid_t pid;
u_int32_t id;
struct proc_dir_entry *proc;
struct proc_dir_entry *status;
struct proc_dir_entry *control;
struct proc_dir_entry *list;
struct proc_dir_entry *blist;
struct mediaproxy_target **target[256];
unsigned int buckets;
unsigned int targets;
};
#endif
#endif

@ -0,0 +1,20 @@
#!/usr/bin/perl
use strict;
use warnings;
use Socket;
my $t = $ARGV[0] || "0";
open(X, "<", "/proc/mediaproxy/$t/blist") or die;
my $buf;
while (sysread(X, $buf, 48)) {
my @b = unpack("Sa2 a4a4 SS a4 Sa2 LLLLLL", $buf);
for (2,3,6) {
$b[$_] = inet_ntoa($b[$_]);
}
for (9,11,13) {
$b[$_] += $b[$_ + 1] * 2**32;
}
printf("%5u %15s:%-5u -> %15s:%-5u (-> %15s:%-5u) [%llu %llu %llu]\n", @b[0,2,4,3,5,6,7,9,11,13]);
}

@ -0,0 +1,54 @@
#!/usr/bin/perl
use strict;
use warnings;
use Socket;
$| = 1;
open(F, "> /proc/mediaproxy/1/control");
{
my $x = select(F);
$| = 1;
select($x);
}
#print F (pack("I SS LLSS LS S", 0, 0, -1, 0, 0, 0, 0, 0, 0, -1));
#sleep(10);
print("add 9876 -> 1234/6543\n");
syswrite(F, pack("I SS a4a4 SS a4 S S", 1, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, "", 0, -1));
sleep(30);
print("add fail\n");
syswrite(F, pack("I SS a4a4 SS a4 S S", 1, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, inet_aton("192.168.231.1"), 6789, -1));
sleep(30);
print("update 9876 -> 1234/6543 & 6789\n");
syswrite(F, pack("I SS a4a4 SS a4 S S", 3, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, inet_aton("192.168.231.1"), 6789, -1));
sleep(30);
print("update 9876 -> 2345/7890 & 4321\n");
syswrite(F, pack("I SS a4a4 SS a4 S S", 3, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 2345, 7890, inet_aton("192.168.231.1"), 4321, -1));
sleep(30);
print("add fail\n");
syswrite(F, pack("I SS a4a4 SS a4 S S", 1, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, inet_aton("192.168.231.1"), 6789, -1));
sleep(30);
print("update 9876 -> 1234/6543\n");
syswrite(F, pack("I SS a4a4 SS a4 S S", 3, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, "", 0, -1));
sleep(30);
print("delete\n");
syswrite(F, pack("I SS a4a4 SS a4 S S", 2, 9876, -1, "", "", 0, 0, "", 0, -1));
sleep(30);
print("delete fail\n");
syswrite(F, pack("I SS a4a4 SS a4 S S", 2, 9876, -1, "", "", 0, 0, "", 0, -1));
sleep(30);
print("update fail\n");
syswrite(F, pack("I SS a4a4 SS a4 S S", 3, 9876, -1, inet_aton("192.168.231.132"), inet_aton("192.168.231.1"), 1234, 6543, "", 0, -1));
sleep(30);
close(F);
Loading…
Cancel
Save