TT#5566 checkin of external call recording daemon

Change-Id: I8102144ab1508fe815be84d727f6fa3234fd0994
changes/03/9503/6
Richard Fuchs 9 years ago
parent e3a7248b01
commit 411c3b3373

@ -1464,6 +1464,7 @@ static void __rtp_payload_types(struct call_media *media, GQueue *types) {
/* we steal the entire list to avoid duplicate allocs */
while ((pt = g_queue_pop_head(types))) {
/* but we must duplicate the contents */
call_str_cpy(call, &pt->encoding_with_params, &pt->encoding_with_params);
call_str_cpy(call, &pt->encoding, &pt->encoding);
call_str_cpy(call, &pt->encoding_parameters, &pt->encoding_parameters);
g_hash_table_replace(media->rtp_payload_types, &pt->payload_type, pt);
@ -1726,6 +1727,8 @@ init:
/* we are now ready to fire up ICE if so desired and requested */
ice_update(other_media->ice_agent, sp);
ice_update(media->ice_agent, NULL); /* this is in case rtcp-mux has changed */
recording_setup_media(other_media);
}
return 0;
@ -1999,13 +2002,9 @@ void call_destroy(struct call *c) {
md->protocol ? md->protocol->name : "(unknown)"
if (!rtp_pt)
ilog(LOG_INFO, MLL_PREFIX "unknown codec", MLL_COMMON);
else if (!rtp_pt->encoding_parameters.s)
ilog(LOG_INFO, MLL_PREFIX ""STR_FORMAT"/%u", MLL_COMMON,
STR_FMT(&rtp_pt->encoding), rtp_pt->clock_rate);
else
ilog(LOG_INFO, MLL_PREFIX ""STR_FORMAT"/%u/"STR_FORMAT"", MLL_COMMON,
STR_FMT(&rtp_pt->encoding), rtp_pt->clock_rate,
STR_FMT(&rtp_pt->encoding_parameters));
ilog(LOG_INFO, MLL_PREFIX STR_FORMAT, MLL_COMMON,
STR_FMT(&rtp_pt->encoding_with_params));
/* add PayloadType(codec) info in CDR logging */
if (_log_facility_cdr && rtp_pt) {

@ -44,6 +44,7 @@ static void finish_proc(struct call *);
static void dump_packet_proc(struct recording *recording, struct packet_stream *sink, const str *s);
static void init_stream_proc(struct packet_stream *);
static void setup_stream_proc(struct packet_stream *);
static void setup_media_proc(struct call_media *);
static void kernel_info_proc(struct packet_stream *, struct rtpengine_target_info *);
@ -71,6 +72,7 @@ static const struct recording_method methods[] = {
.finish = finish_proc,
.init_stream_struct = init_stream_proc,
.setup_stream = setup_stream_proc,
.setup_media = setup_media_proc,
.stream_kernel_info = kernel_info_proc,
},
};
@ -672,6 +674,26 @@ static void setup_stream_proc(struct packet_stream *stream) {
append_meta_chunk(recording, buf, len, "STREAM %u interface", stream->unique_id);
}
static void setup_media_proc(struct call_media *media) {
struct call *call = media->call;
struct recording *recording = call->recording;
if (!recording)
return;
GList *pltypes = g_hash_table_get_values(media->rtp_payload_types);
for (GList *l = pltypes; l; l = l->next) {
struct rtp_payload_type *pt = l->data;
append_meta_chunk(recording, pt->encoding_with_params.s, pt->encoding_with_params.len,
"MEDIA %u PAYLOAD TYPE %u", media->unique_id, pt->payload_type);
}
g_list_free(pltypes);
}
static void dump_packet_proc(struct recording *recording, struct packet_stream *stream, const str *s) {
if (stream->recording.proc.stream_idx == UNINIT_IDX)
return;

@ -22,6 +22,7 @@ struct call;
enum call_opmode;
struct rtpengine_target_info;
struct call_monologue;
struct call_media;
struct recording_pcap {
@ -77,6 +78,7 @@ struct recording_method {
void (*init_stream_struct)(struct packet_stream *);
void (*setup_stream)(struct packet_stream *);
void (*setup_media)(struct call_media *);
void (*stream_kernel_info)(struct packet_stream *, struct rtpengine_target_info *);
};
@ -185,6 +187,7 @@ void recording_finish(struct call *);
#define recording_setup_stream(args...) _rm(setup_stream, args)
#define recording_setup_media(args...) _rm(setup_media, args)
#define recording_init_stream(args...) _rm(init_stream_struct, args)
#define recording_stream_kernel_info(args...) _rm(stream_kernel_info, args)
#define recording_meta_chunk(args...) _rm(meta_chunk, args)

@ -21,6 +21,7 @@ struct rtp_header {
struct rtp_payload_type {
unsigned int payload_type;
str encoding_with_params;
str encoding;
unsigned int clock_rate;
str encoding_parameters;

@ -705,6 +705,8 @@ static int parse_attribute_rtpmap(struct sdp_attribute *output) {
a = &output->u.rtpmap;
pt = &a->rtp_pt;
pt->encoding_with_params = a->encoding_str;
pt->payload_type = strtoul(a->payload_type_str.s, &ep, 10);
if (ep == a->payload_type_str.s)
return -1;

@ -1 +1,2 @@
daemon/rtpengine /usr/sbin/
recording-daemon/rtpengine-recording /usr/sbin/

2
debian/rules vendored

@ -38,12 +38,14 @@ build-stamp:
dh_testdir
make -C iptables-extension
make -C daemon -j`nproc`
make -C recording-daemon -j`nproc`
touch $@
clean:
dh_testdir
dh_testroot
cd daemon && $(MAKE) clean && cd ..
cd recording-daemon && $(MAKE) clean && cd ..
rm -f build-stamp
rm -f iptables-extension/libxt_RTPENGINE.so
rm -f daemon/rtpengine daemon/build_time.h daemon/.depend kernel-module/.xt_RTPENGINE.o.d

@ -0,0 +1,5 @@
.depend
*.o
core
core.*
.ycm_extra_conf.pyc

@ -0,0 +1,102 @@
import os
import ycm_core
from clang_helpers import PrepareClangFlags
# Set this to the absolute path to the folder (NOT the file!) containing the
# compile_commands.json file to use that instead of 'flags'. See here for
# more details: http://clang.llvm.org/docs/JSONCompilationDatabase.html
# Most projects will NOT need to set this to anything; you can just change the
# 'flags' list of compilation flags. Notice that YCM itself uses that approach.
compilation_database_folder = ''
# These are the compilation flags that will be used in case there's no
# compilation database set.
flags = [
'-g',
'-Wall',
'-pthread',
'-fno-strict-aliasing',
'-I/usr/include/glib-2.0',
'-I/usr/lib/x86_64-linux-gnu/glib-2.0/include',
'-pthread',
'-D_GNU_SOURCE',
'-D__DEBUG=1',
'-D__YCM=1',
'-O2',
'-fstack-protector',
'--param=ssp-buffer-size=4',
'-Wformat',
'-Werror=format-security',
'-D_FORTIFY_SOURCE=2',
# THIS IS IMPORTANT! Without a "-std=<something>" flag, clang won't know which
# language to use when compiling headers. So it will guess. Badly. So C++
# headers will be compiled as C headers. You don't want that so ALWAYS specify
# a "-std=<something>".
# For a C project, you would set this to something like 'c99' instead of
# 'c++11'.
'-std=c99',
# ...and the same thing goes for the magic -x option which specifies the
# language that the files to be compiled are written in. This is mostly
# relevant for c++ headers.
# For a C project, you would set this to 'c' instead of 'c++'.
'-x',
'c',
]
if compilation_database_folder:
database = ycm_core.CompilationDatabase( compilation_database_folder )
else:
database = None
def DirectoryOfThisScript():
return os.path.dirname( os.path.abspath( __file__ ) )
def MakeRelativePathsInFlagsAbsolute( flags, working_directory ):
if not working_directory:
return flags
new_flags = []
make_next_absolute = False
path_flags = [ '-isystem', '-I', '-iquote', '--sysroot=' ]
for flag in flags:
new_flag = flag
if make_next_absolute:
make_next_absolute = False
if not flag.startswith( '/' ):
new_flag = os.path.join( working_directory, flag )
for path_flag in path_flags:
if flag == path_flag:
make_next_absolute = True
break
if flag.startswith( path_flag ):
path = flag[ len( path_flag ): ]
new_flag = path_flag + os.path.join( working_directory, path )
break
if new_flag:
new_flags.append( new_flag )
return new_flags
def FlagsForFile( filename ):
if database:
# Bear in mind that compilation_info.compiler_flags_ does NOT return a
# python list, but a "list-like" StringVec object
compilation_info = database.GetCompilationInfoForFile( filename )
final_flags = PrepareClangFlags(
MakeRelativePathsInFlagsAbsolute(
compilation_info.compiler_flags_,
compilation_info.compiler_working_dir_ ),
filename )
else:
relative_to = DirectoryOfThisScript()
final_flags = MakeRelativePathsInFlagsAbsolute( flags, relative_to )
return {
'flags': final_flags,
'do_cache': True
}

@ -0,0 +1,58 @@
TARGET= rtpengine-recording
CC?=gcc
CFLAGS= -g -Wall -pthread
CFLAGS+= -std=c99
CFLAGS+= -D_GNU_SOURCE -D_POSIX_SOURCE -D_POSIX_C_SOURCE
CFLAGS+= `pkg-config --cflags glib-2.0`
CFLAGS+= `pkg-config --cflags gthread-2.0`
#CFLAGS+= `pcre-config --cflags`
ifeq ($(DBG),yes)
CFLAGS+= -D__DEBUG=1
else
CFLAGS+= -O3
endif
LDFLAGS= -lm
LDFLAGS+= `pkg-config --libs glib-2.0`
LDFLAGS+= `pkg-config --libs gthread-2.0`
#LDFLAGS+= `pcre-config --libs`
ifneq ($(DBG),yes)
DPKG_BLDFLGS= $(shell which dpkg-buildflags 2>/dev/null)
ifneq ($(DPKG_BLDFLGS),)
# support http://wiki.debian.org/Hardening for >=wheezy
CFLAGS+= `dpkg-buildflags --get CFLAGS`
CPPFLAGS+= `dpkg-buildflags --get CPPFLAGS`
LDFLAGS+= `dpkg-buildflags --get LDFLAGS`
endif
endif
SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c aux.c
OBJS= $(SRCS:.c=.o)
.PHONY: all dep clean tests debug
all:
$(MAKE) $(TARGET)
debug:
$(MAKE) DBG=yes all
dep: .depend
clean:
rm -f $(OBJS) $(TARGET) .depend core core.*
.depend: $(SRCS) Makefile
$(CC) $(CFLAGS) -M $(SRCS) | sed -e 's/:/ .depend:/' > .depend
$(TARGET): $(OBJS) .depend Makefile
$(CC) $(CFLAGS) -o $@ $(OBJS) $(LDFLAGS)
$(OBJS): Makefile
include .depend

@ -0,0 +1,21 @@
#include "aux.h"
#include <stdio.h>
#include <stdarg.h>
int __thread __sscanf_hack_var;
int __sscanf_match(const char *str, const char *fmt, ...) {
va_list ap;
__sscanf_hack_var = 0; // to make sure that sscanf consumes the entire string
va_start(ap, fmt);
int ret = vsscanf(str, fmt, ap);
va_end(ap);
if (__sscanf_hack_var == 0)
return 0;
return ret;
}

@ -0,0 +1,9 @@
#ifndef _AUX_H_
#define _AUX_H_
extern int __thread __sscanf_hack_var;
#define sscanf_match(str, format, ...) __sscanf_match(str, format "%n", ##__VA_ARGS__, &__sscanf_hack_var)
int __sscanf_match(const char *str, const char *fmt, ...) __attribute__ ((__format__ (__scanf__, 2, 3)));
#endif

@ -0,0 +1,64 @@
#include "epoll.h"
#include <sys/epoll.h>
#include <glib.h>
#include <pthread.h>
#include <unistd.h>
#include "log.h"
#include "main.h"
#include "garbage.h"
static int epoll_fd = -1;
void epoll_setup(void) {
epoll_fd = epoll_create1(0);
if (epoll_fd == -1)
die_errno("epoll_create1 failed");
}
int epoll_add(int fd, uint32_t events, handler_t *handler) {
struct epoll_event epev = { .events = events | EPOLLET, .data = { .ptr = handler } };
int ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &epev);
return ret;
}
void epoll_del(int fd) {
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
}
void *poller_thread(void *ptr) {
struct epoll_event epev;
unsigned int me_num = GPOINTER_TO_UINT(ptr);
dbg("poller thread %u running", me_num);
while (!shutdown_flag) {
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
int ret = epoll_wait(epoll_fd, &epev, 1, 10000);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
if (ret == -1)
die_errno("epoll_wait failed");
if (ret > 0) {
dbg("thread %u handling event", me_num);
handler_t *handler = epev.data.ptr;
handler->func(handler);
}
garbage_collect(me_num);
}
return NULL;
}
void epoll_cleanup(void) {
close(epoll_fd);
}

@ -0,0 +1,20 @@
#ifndef _EPOLL_H_
#define _EPOLL_H_
#include <sys/types.h>
#include <sys/epoll.h>
#include <stdint.h>
#include "types.h"
void epoll_setup(void);
void epoll_cleanup(void);
int epoll_add(int fd, uint32_t events, handler_t *handler);
void epoll_del(int fd);
void *poller_thread(void *ptr);
#endif

@ -0,0 +1,96 @@
#include "garbage.h"
#include <glib.h>
#include <pthread.h>
#include "log.h"
typedef struct {
void *ptr;
void (*free_func)(void *);
int *wait_threads;
unsigned int array_len;
unsigned int threads_left;
} garbage_t;
static pthread_mutex_t garbage_lock = PTHREAD_MUTEX_INITIALIZER;
static GQueue garbage = G_QUEUE_INIT;
static volatile int garbage_thread_num;
unsigned int garbage_new_thread_num(void) {
return g_atomic_int_add(&garbage_thread_num, 1);
}
void garbage_add(void *ptr, free_func_t *free_func) {
// Each running poller thread has a unique number associated with it, starting
// with 0. A garbage entry uses an array of boolean flags, one for each running
// thread, to keep track of which threads have seen this entry. Once a garbage
// entry has been seen by all threads, the free function is finally called.
// This is to make sure that all poller threads have left epoll_wait() after
// an fd has been removed from the watch list.
garbage_t *garb = g_slice_alloc(sizeof(*garb));
garb->ptr = ptr;
garb->free_func = free_func;
pthread_mutex_lock(&garbage_lock);
garb->array_len = g_atomic_int_get(&garbage_thread_num);
garb->threads_left = garb->array_len;
garb->wait_threads = malloc(sizeof(int) * garb->array_len);
memset(garb->wait_threads, 0, sizeof(int) * garb->array_len);
g_queue_push_tail(&garbage, garb);
pthread_mutex_unlock(&garbage_lock);
}
static void garbage_collect1(garbage_t *garb) {
garb->free_func(garb->ptr);
free(garb->wait_threads);
g_slice_free1(sizeof(*garb), garb);
}
void garbage_collect(unsigned int num) {
dbg("running garbage collection thread %u", num);
restart:
pthread_mutex_lock(&garbage_lock);
for (GList *l = garbage.head; l; l = l->next) {
garbage_t *garb = l->data;
// has this been created before we were running?
if (garb->array_len <= num)
continue;
// have we processed this already?
if (garb->wait_threads[num])
continue;
dbg("marking garbage entry %p as seen by %u with %u threads left", garb, num,
garb->threads_left);
garb->wait_threads[num] = 1;
garb->threads_left--;
// anything left?
if (!garb->threads_left) {
// remove from list and process
g_queue_delete_link(&garbage, l);
pthread_mutex_unlock(&garbage_lock);
garbage_collect1(garb);
goto restart;
}
}
pthread_mutex_unlock(&garbage_lock);
}
void garbage_collect_all(void) {
garbage_t *garb;
while ((garb = g_queue_pop_head(&garbage)))
garbage_collect1(garb);
}

@ -0,0 +1,11 @@
#ifndef _GARBAGE_H_
#define _GARBAGE_H_
typedef void free_func_t(void *);
unsigned int garbage_new_thread_num(void);
void garbage_add(void *ptr, free_func_t *free_func);
void garbage_collect(unsigned int num);
void garbage_collect_all(void);
#endif

@ -0,0 +1,79 @@
#include "inotify.h"
#include <sys/inotify.h>
#include <limits.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include "log.h"
#include "main.h"
#include "epoll.h"
#include "metafile.h"
static int inotify_fd = -1;
static handler_func inotify_handler_func;
static handler_t inotify_handler = {
.func = inotify_handler_func,
};
static void inotify_close_write(struct inotify_event *inev) {
dbg("inotify close_write(%s)", inev->name);
metafile_change(inev->name);
}
static void inotify_delete(struct inotify_event *inev) {
dbg("inotify delete(%s)", inev->name);
metafile_delete(inev->name);
}
static void inotify_handler_func(handler_t *handler) {
char buf[4 * (sizeof(struct inotify_event) + NAME_MAX + 1)];
while (1) {
int ret = read(inotify_fd, buf, sizeof(buf));
if (ret == -1) {
if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
break;
die_errno("read on inotify fd failed");
}
if (ret == 0)
die("EOF on inotify fd");
char *bufend = buf + ret;
char *bufhead = buf;
while (bufhead < bufend) {
struct inotify_event *inev = (void *) bufhead;
if ((inev->mask & IN_DELETE))
inotify_delete(inev);
if ((inev->mask & IN_CLOSE_WRITE))
inotify_close_write(inev);
bufhead += sizeof(*inev) + inev->len;
}
}
}
void inotify_setup(void) {
inotify_fd = inotify_init1(IN_NONBLOCK);
if (inotify_fd == -1)
die_errno("inotify_init1 failed");
int ret = inotify_add_watch(inotify_fd, SPOOL_DIR, IN_CLOSE_WRITE | IN_DELETE);
if (ret == -1)
die_errno("inotify_add_watch failed");
if (epoll_add(inotify_fd, EPOLLIN, &inotify_handler))
die_errno("failed to add inotify_fd to epoll");
}
void inotify_cleanup(void) {
close(inotify_fd);
}

@ -0,0 +1,7 @@
#ifndef _INOTIFY_H_
#define _INOTIFY_H_
void inotify_setup(void);
void inotify_cleanup(void);
#endif

@ -0,0 +1,15 @@
#ifndef _LOG_H_
#define _LOG_H_
#include <stdio.h>
#include <syslog.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#define die(fmt, ...) do { ilog(LOG_CRIT, "Fatal error: " fmt, ##__VA_ARGS__); exit(-1); } while (0)
#define die_errno(msg) die("%s: %s", msg, strerror(errno))
#define ilog(fclt, fmt, ...) fprintf(stderr, fmt "\n", ##__VA_ARGS__)
#define dbg(fmt, ...) ilog(LOG_DEBUG, fmt, ##__VA_ARGS__)
#endif

@ -0,0 +1,105 @@
#include "main.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <glib.h>
#include <unistd.h>
#include <signal.h>
#include "log.h"
#include "epoll.h"
#include "inotify.h"
#include "metafile.h"
#include "garbage.h"
static GQueue threads = G_QUEUE_INIT; // only accessed from main thread
volatile int shutdown_flag;
static void signals(void) {
sigset_t ss;
sigfillset(&ss);
sigdelset(&ss, SIGABRT);
sigdelset(&ss, SIGSEGV);
sigdelset(&ss, SIGQUIT);
sigprocmask(SIG_SETMASK, &ss, NULL);
pthread_sigmask(SIG_SETMASK, &ss, NULL);
}
static void setup(void) {
signals();
metafile_setup();
epoll_setup();
inotify_setup();
}
static void start_poller_thread(void) {
pthread_t *thr = g_slice_alloc(sizeof(*thr));
int ret = pthread_create(thr, NULL, poller_thread,
GUINT_TO_POINTER(garbage_new_thread_num()));
if (ret)
die_errno("pthread_create failed");
g_queue_push_tail(&threads, thr);
}
static void wait_threads_finish(void) {
pthread_t *thr;
while ((thr = g_queue_pop_head(&threads))) {
pthread_cancel(*thr);
pthread_join(*thr, NULL);
g_slice_free1(sizeof(*thr), thr);
}
}
static void wait_for_signal(void) {
sigset_t ss;
int ret, sig;
sigemptyset(&ss);
sigaddset(&ss, SIGINT);
sigaddset(&ss, SIGTERM);
while (1) {
ret = sigwait(&ss, &sig);
if (ret == -1) {
if (errno == EAGAIN || errno == EINTR)
continue;
abort();
}
shutdown_flag = 1;
break;
}
}
static void cleanup(void) {
garbage_collect_all();
metafile_cleanup();
inotify_cleanup();
epoll_cleanup();
}
int main() {
setup();
for (int i = 0; i < NUM_THREADS; i++)
start_poller_thread();
wait_for_signal();
dbg("shutting down");
wait_threads_finish();
cleanup();
}

@ -0,0 +1,13 @@
#ifndef _MAIN_H_
#define _MAIN_H_
#define SPOOL_DIR "/var/spool/rtpengine"
#define PROC_DIR "/proc/rtpengine/0/calls"
#define NUM_THREADS 8
extern volatile int shutdown_flag;
#endif

@ -0,0 +1,241 @@
#include "metafile.h"
#include <glib.h>
#include <pthread.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include <limits.h>
#include "log.h"
#include "stream.h"
#include "garbage.h"
#include "main.h"
#include "aux.h"
static pthread_mutex_t metafiles_lock = PTHREAD_MUTEX_INITIALIZER;
static GHashTable *metafiles;
//static pcre_t stream_interface_re,
//stream_details_re;
static void meta_free(void *ptr) {
metafile_t *mf = ptr;
dbg("freeing metafile info for %s", mf->name);
g_string_chunk_free(mf->gsc);
for (int i = 0; i < mf->streams->len; i++) {
stream_t *stream = g_ptr_array_index(mf->streams, i);
stream_close(stream); // should be closed already
stream_free(stream);
}
g_ptr_array_free(mf->streams, TRUE);
g_slice_free1(sizeof(*mf), mf);
}
// mf is locked
static void meta_destroy(metafile_t *mf) {
// close all streams
for (int i = 0; i < mf->streams->len; i++) {
stream_t *stream = g_ptr_array_index(mf->streams, i);
pthread_mutex_lock(&stream->lock);
stream_close(stream);
pthread_mutex_unlock(&stream->lock);
}
}
// mf is locked
static void meta_stream_interface(metafile_t *mf, unsigned long snum, char *content) {
dbg("stream %lu interface %s", snum, content);
stream_open(mf, snum, content);
}
// mf is locked
static void meta_stream_details(metafile_t *mf, unsigned long snum, char *content) {
dbg("stream %lu details %s", snum, content);
}
// mf is locked
static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned int payload_num,
char *payload_type)
{
dbg("payload type in media %lu num %u is %s", mnum, payload_num, payload_type);
}
// mf is locked
static void meta_section(metafile_t *mf, char *section, char *content, unsigned long len) {
unsigned long lu;
unsigned int u;
if (!strcmp(section, "CALL-ID"))
mf->call_id = g_string_chunk_insert(mf->gsc, content);
else if (!strcmp(section, "PARENT"))
mf->parent = g_string_chunk_insert(mf->gsc, content);
else if (sscanf_match(section, "STREAM %lu interface", &lu) == 1)
meta_stream_interface(mf, lu, content);
else if (sscanf_match(section, "STREAM %lu details", &lu) == 1)
meta_stream_details(mf, lu, content);
else if (sscanf_match(section, "MEDIA %lu PAYLOAD TYPE %u", &lu, &u) == 2)
meta_rtp_payload_type(mf, lu, u, content);
}
void metafile_change(char *name) {
// get or create metafile metadata
pthread_mutex_lock(&metafiles_lock);
metafile_t *mf = g_hash_table_lookup(metafiles, name);
if (!mf) {
dbg("allocating metafile info for %s", name);
mf = g_slice_alloc0(sizeof(*mf));
mf->gsc = g_string_chunk_new(0);
mf->name = g_string_chunk_insert(mf->gsc, name);
pthread_mutex_init(&mf->lock, NULL);
mf->streams = g_ptr_array_new();
g_hash_table_insert(metafiles, mf->name, mf);
}
// switch locks
pthread_mutex_lock(&mf->lock);
pthread_mutex_unlock(&metafiles_lock);
char fnbuf[PATH_MAX];
snprintf(fnbuf, sizeof(fnbuf), "%s/%s", SPOOL_DIR, name);
// open file and seek to last known position
int fd = open(fnbuf, O_RDONLY);
if (fd == -1) {
ilog(LOG_ERR, "Failed to open %s: %s\n", fnbuf, strerror(errno));
goto out;
}
lseek(fd, mf->pos, SEEK_SET);
// read the entire file
GString *s = g_string_new(NULL);
char buf[1024];
while (1) {
int ret = read(fd, buf, sizeof(buf));
if (ret == 0)
break;
if (ret == -1)
die_errno("read on metadata file failed");
g_string_append_len(s, buf, ret);
}
// save read position and close file
mf->pos = lseek(fd, 0, SEEK_CUR);
close(fd);
// process contents of metadata file
char *head = s->str;
char *endp = s->str + s->len;
while (head < endp) {
// section header
char *nl = memchr(head, '\n', endp - head);
if (!nl || nl == head) {
ilog(LOG_WARN, "Missing section header in %s", name);
break;
}
if (memchr(head, '\0', nl - head)) {
ilog(LOG_WARN, "NUL character in section header in %s", name);
break;
}
*(nl++) = '\0';
char *section = head;
dbg("section %s", section);
head = nl;
// content length
nl = memchr(head, ':', endp - head);
if (!nl || nl == head) {
ilog(LOG_WARN, "Content length for section %s missing in %s", section, name);
break;
}
*(nl++) = '\0';
if (*(nl++) != '\n') {
ilog(LOG_WARN, "Unterminated content length for section %s in %s", section, name);
break;
}
char *errp;
unsigned long slen = strtoul(head, &errp, 10);
if (*errp != '\0') {
ilog(LOG_WARN, "Invalid content length for section %s in %s", section, name);
break;
}
dbg("content length %lu", slen);
head = nl;
// content
if (endp - head < slen) {
ilog(LOG_WARN, "Content truncated in section %s in %s", section, name);
break;
}
char *content = head;
if (memchr(content, '\0', slen)) {
ilog(LOG_WARN, "NUL character in content in section %s in %s", section, name);
break;
}
// double newline separator
head += slen;
if (*head != '\n' || *(head + 1) != '\n') {
ilog(LOG_WARN, "Separator missing after section %s in %s", section, name);
break;
}
*head = '\0';
head += 2;
meta_section(mf, section, content, slen);
}
g_string_free(s, TRUE);
out:
pthread_mutex_unlock(&mf->lock);
}
void metafile_delete(char *name) {
// get metafile metadata
pthread_mutex_lock(&metafiles_lock);
metafile_t *mf = g_hash_table_lookup(metafiles, name);
if (!mf) {
// nothing to do
pthread_mutex_unlock(&metafiles_lock);
return;
}
// switch locks and remove entry
pthread_mutex_lock(&mf->lock);
g_hash_table_remove(metafiles, name);
pthread_mutex_unlock(&metafiles_lock);
meta_destroy(mf);
// add to garbage
garbage_add(mf, meta_free);
pthread_mutex_unlock(&mf->lock);
}
void metafile_setup(void) {
metafiles = g_hash_table_new(g_str_hash, g_str_equal);
//pcre_build(&stream_interface_re, "^STREAM (\\d+) interface$");
//pcre_build(&stream_interface_re, "^STREAM (\\d+) details$");
}
void metafile_cleanup(void) {
GList *mflist = g_hash_table_get_values(metafiles);
for (GList *l = mflist; l; l = l->next) {
metafile_t *mf = l->data;
meta_destroy(mf);
meta_free(mf);
}
g_list_free(mflist);
g_hash_table_destroy(metafiles);
}

@ -0,0 +1,12 @@
#ifndef _METAFILE_H_
#define _METAFILE_H_
#include "types.h"
void metafile_setup(void);
void metafile_cleanup(void);
void metafile_change(char *name);
void metafile_delete(char *name);
#endif

@ -0,0 +1,14 @@
#include "pcre.h"
#include <pcre.h>
#include "log.h"
void pcre_build(pcre_t *out, const char *pattern) {
const char *errptr;
int erroff;
out->re = pcre_compile(pattern, PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL);
if (!out->re)
die("Failed to compile PCRE '%s': %s (at %i)", pattern, errptr, erroff);
out->extra = pcre_study(out->re, 0, &errptr);
}

@ -0,0 +1,8 @@
#ifndef _PCRE_H_
#define _PCRE_H_
#include "types.h"
void pcre_build(pcre_t *out, const char *pattern);
#endif

@ -0,0 +1,95 @@
#include "stream.h"
#include <glib.h>
#include <pthread.h>
#include <unistd.h>
#include <limits.h>
#include <fcntl.h>
#include "metafile.h"
#include "epoll.h"
#include "log.h"
#include "main.h"
// stream is locked
void stream_close(stream_t *stream) {
if (stream->fd == -1)
return;
epoll_del(stream->fd);
close(stream->fd);
stream->fd = -1;
}
void stream_free(stream_t *stream) {
g_slice_free1(sizeof(*stream), stream);
}
static void stream_handler(handler_t *handler) {
stream_t *stream = handler->ptr;
//dbg("poll event for %s", stream->name);
pthread_mutex_lock(&stream->lock);
if (stream->fd == -1)
goto out;
char buf[65535];
int ret = read(stream->fd, buf, sizeof(buf));
if (ret == 0) {
ilog(LOG_INFO, "EOF on stream %s", stream->name);
stream_close(stream);
goto out;
}
else if (ret < 0) {
ilog(LOG_INFO, "Read error on stream %s: %s", stream->name, strerror(errno));
stream_close(stream);
goto out;
}
out:
pthread_mutex_unlock(&stream->lock);
}
// mf is locked
static stream_t *stream_get(metafile_t *mf, unsigned long id) {
if (mf->streams->len <= id)
g_ptr_array_set_size(mf->streams, id + 1);
stream_t *ret = g_ptr_array_index(mf->streams, id);
if (ret)
goto out;
ret = g_slice_alloc0(sizeof(*ret));
g_ptr_array_index(mf->streams, id) = ret;
pthread_mutex_init(&ret->lock, NULL);
ret->fd = -1;
ret->id = id;
out:
return ret;
}
// mf is locked
void stream_open(metafile_t *mf, unsigned long id, char *name) {
dbg("opening stream %lu/%s", id, name);
stream_t *stream = stream_get(mf, id);
stream->name = g_string_chunk_insert(mf->gsc, name);
char fnbuf[PATH_MAX];
snprintf(fnbuf, sizeof(fnbuf), "%s/%s/%s", PROC_DIR, mf->parent, name);
stream->fd = open(fnbuf, O_RDONLY | O_NONBLOCK);
if (stream->fd == -1) {
ilog(LOG_ERR, "Failed to open kernel stream %s: %s", fnbuf, strerror(errno));
return;
}
// add to epoll
stream->handler.ptr = stream;
stream->handler.func = stream_handler;
epoll_add(stream->fd, EPOLLIN, &stream->handler);
}

@ -0,0 +1,10 @@
#ifndef _STREAM_H_
#define _STREAM_H_
#include "types.h"
void stream_open(metafile_t *mf, unsigned long id, char *name);
void stream_close(stream_t *stream);
void stream_free(stream_t *stream);
#endif

@ -0,0 +1,46 @@
#ifndef _TYPES_H_
#define _TYPES_H_
#include <pthread.h>
#include <sys/types.h>
#include <glib.h>
#include <pcre.h>
typedef struct handler_s handler_t;
typedef void handler_func(handler_t *);
struct handler_s {
handler_func *func;
void *ptr;
};
struct stream_s {
pthread_mutex_t lock;
char *name;
unsigned long id;
int fd;
handler_t handler;
};
typedef struct stream_s stream_t;
struct metafile_s {
pthread_mutex_t lock;
char *name;
char *parent;
char *call_id;
off_t pos;
GStringChunk *gsc; // XXX limit max size
GPtrArray *streams;
};
typedef struct metafile_s metafile_t;
// struct pcre_s {
// pcre *re;
// pcre_extra *extra;
// };
// typedef struct pcre_s pcre_t;
#endif
Loading…
Cancel
Save