Added trunk/tags dir structure.

Added 1.1.0 tag.
trunk@3605
Andreas Granig 16 years ago
commit e6326e7781

@ -0,0 +1,3 @@
2008-03-18 agranig:
- removed force-setting of "anonymous" as caller_cli if caller_clir is
active

@ -0,0 +1,30 @@
BIN=mediator
CC := gcc
#CFLAGS := -I. -I/usr/include/mysql -I/usr/local/include -g -Wall -DWITH_TIME_CALC
GLIB_CFLAGS := `pkg-config glib-2.0 --cflags`
CFLAGS := -I. -I/usr/include/mysql $(GLIB_CFLAGS) -g -Wall -O3
GLIB_LDFLAGS := `pkg-config glib-2.0 --libs`
LDFLAGS := -lmysqlclient -O3
CFILES := $(wildcard *.c)
OFILES := $(CFILES:.c=.o)
.PHONY: $(BIN) all
all: $(BIN)
$(BIN): $(OFILES)
$(CC) -o $@ $^ $(LDFLAGS) $(GLIB_LDFLAGS)
%.o: %.c
$(CC) $(CFLAGS) -c -o $@ $<
clean:
rm -f *.o
rm -f core*
rm -f $(BIN)

477
cdr.c

@ -0,0 +1,477 @@
#include <ctype.h>
#include "medmysql.h"
#include "config.h"
#include "cdr.h"
static char* cdr_map_status(const char *sip_status)
{
if(strncmp("200", sip_status, 3) == 0)
{
return CDR_STATUS_OK;
}
if(strncmp("480", sip_status, 3) == 0)
{
return CDR_STATUS_NA;
}
if(strncmp("487", sip_status, 3) == 0)
{
return CDR_STATUS_CANCEL;
}
if(strncmp("486", sip_status, 3) == 0)
{
return CDR_STATUS_BUSY;
}
if(strncmp("408", sip_status, 3) == 0)
{
return CDR_STATUS_TIMEOUT;
}
if(strncmp("404", sip_status, 3) == 0)
{
return CDR_STATUS_OFFLINE;
}
return CDR_STATUS_UNKNOWN;
}
int cdr_process_records(med_entry_t *records, u_int64_t count, u_int64_t *ext_count, struct medmysql_batches *batches)
{
int ret = 0;
u_int8_t trash = 0;
u_int64_t i;
u_int16_t msg_invites = 0;
u_int16_t msg_byes = 0;
u_int16_t msg_unknowns = 0;
u_int16_t invite_200 = 0;
char *callid = records[0].callid;
cdr_entry_t *cdrs;
u_int64_t cdr_count;
*ext_count = 0;
for(i = 0; i < count; ++i)
{
med_entry_t *e = &(records[i]);
if(strcmp(e->sip_method, MSG_INVITE) == 0)
{
++msg_invites;
e->method = MED_INVITE;
if(strncmp("200", e->sip_code, 3) == 0)
{
++invite_200;
}
}
else if(strcmp(e->sip_method, MSG_BYE) == 0)
{
++msg_byes;
e->method = MED_BYE;
}
else
{
++msg_unknowns;
e->method = MED_UNRECOGNIZED;
}
}
/*syslog(LOG_DEBUG, "%d INVITEs, %d BYEs, %d unrecognized", msg_invites, msg_byes, msg_unknowns);*/
if(msg_invites > 0)
{
if(msg_byes > 0 || invite_200 == 0)
{
if(/*msg_byes > 2*/ 0)
{
syslog(LOG_WARNING, "Multiple (%d) BYE messages for callid '%s' found, trashing...",
msg_byes, callid);
trash = 1;
ret = -1;
}
else
{
if(cdr_create_cdrs(records, count, &cdrs, &cdr_count, &trash) != 0)
{
/* try again next round */
ret = -1;
}
else
{
*ext_count = cdr_count;
if(cdr_count > 0)
{
if(config_dumpcdr)
{
/* cdr_log_records(cdrs, cdr_count); */
}
if(medmysql_insert_cdrs(cdrs, cdr_count, batches) != 0)
{
/* TODO: error handling */
}
else
{
if(medmysql_backup_entries(callid, batches) != 0)
{
// TODO: error handling
}
}
}
else
{
/* TODO: no CDRs created? */
}
free(cdrs);
}
}
}
else
{
/*
syslog(LOG_DEBUG, "No BYE message for callid '%s' found, skipping...",
callid);
*/
}
}
else
{
/*syslog(LOG_WARNING, "No INVITE message for callid '%s' found, trashing...", callid);*/
trash = 1;
ret = -1;
}
if(trash)
{
if(medmysql_trash_entries(callid, batches) != 0)
{
/* TODO: error handling */
}
}
return ret;
}
static int cdr_parse_srcleg(char *srcleg, cdr_entry_t *cdr)
{
char *tmp1, *tmp2;
int len;
len = strlen(srcleg);
tmp2 = srcleg;
tmp1 = strchr(tmp2, MED_SEP);
if(tmp1 == NULL)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated source user id, '%s'", cdr->call_id, tmp2);
return -1;
}
*tmp1 = '\0';
strncpy(cdr->source_user_id, tmp2, sizeof(cdr->source_user_id));
tmp2 = ++tmp1;
tmp1 = strchr(tmp2, MED_SEP);
if(tmp1 == NULL)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated source user, '%s'", cdr->call_id, tmp2);
return -1;
}
*tmp1 = '\0';
strncpy(cdr->source_user, tmp2, sizeof(cdr->source_user));
tmp2 = ++tmp1;
tmp1 = strchr(tmp2, MED_SEP);
if(tmp1 == NULL)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated source domain, '%s'", cdr->call_id, tmp2);
return -1;
}
*tmp1 = '\0';
strncpy(cdr->source_domain, tmp2, sizeof(cdr->source_domain));
tmp2 = ++tmp1;
tmp1 = strchr(tmp2, MED_SEP);
if(tmp1 == NULL)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated source cli, '%s'", cdr->call_id, tmp2);
return -1;
}
*tmp1 = '\0';
strncpy(cdr->source_cli, tmp2, sizeof(cdr->source_cli));
tmp2 = ++tmp1;
tmp1 = strchr(tmp2, MED_SEP);
if(tmp1 == NULL)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated source clir status, '%s'", cdr->call_id, tmp2);
return -1;
}
*tmp1 = '\0';
cdr->source_clir = atoi(tmp2);
tmp2 = ++tmp1;
if(len < tmp2 - srcleg + 1)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated call type, '%s'", cdr->call_id, tmp2);
return -1;
}
strncpy(cdr->call_type, tmp2, sizeof(cdr->call_type));
return 0;
}
static int cdr_parse_dstleg(char *dstleg, cdr_entry_t *cdr)
{
char *tmp1, *tmp2;
int len;
len = strlen(dstleg);
//syslog(LOG_INFO, "Call-Id '%s' has dstleg '%s' with length %d", cdr->call_id, dstleg, len);
tmp2 = dstleg;
tmp1 = strchr(tmp2, MED_SEP);
if(tmp1 == NULL)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated dialed digits", cdr->call_id);
return -1;
}
*tmp1 = '\0';
strncpy(cdr->destination_dialed, tmp2, sizeof(cdr->destination_dialed));
tmp2 = ++tmp1;
tmp1 = strchr(tmp2, MED_SEP);
if(tmp1 == NULL)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated destination user id", cdr->call_id);
return -1;
}
*tmp1 = '\0';
strncpy(cdr->destination_user_id, tmp2, sizeof(cdr->destination_user_id));
tmp2 = ++tmp1;
tmp1 = strchr(tmp2, MED_SEP);
if(tmp1 == NULL)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated destination user", cdr->call_id);
return -1;
}
*tmp1 = '\0';
strncpy(cdr->destination_user, tmp2, sizeof(cdr->destination_user));
tmp2 = ++tmp1;
tmp1 = strchr(tmp2, MED_SEP);
if(tmp1 == NULL)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated destination domain", cdr->call_id);
return -1;
}
*tmp1 = '\0';
strncpy(cdr->destination_domain, tmp2, sizeof(cdr->destination_domain));
tmp2 = ++tmp1;
tmp1 = strchr(tmp2, MED_SEP);
if(tmp1 == NULL)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated incoming destination user", cdr->call_id);
return -1;
}
*tmp1 = '\0';
strncpy(cdr->destination_user_in, tmp2, sizeof(cdr->destination_user_in));
tmp2 = ++tmp1;
//syslog(LOG_INFO, "Call-Id '%s' tmp calc: len=%d, rest=%d, tmp2='%s'", cdr->call_id, len, tmp2 - dstleg + 1, tmp2);
if(len < tmp2 - dstleg + 1)
{
syslog(LOG_WARNING, "Call-Id '%s' has no separated incoming destination domain", cdr->call_id);
return -1;
}
strncpy(cdr->destination_domain_in, tmp2, sizeof(cdr->destination_domain_in));
syslog(LOG_INFO, "Call-Id '%s' dst_domain_in='%s'", cdr->call_id, cdr->destination_domain_in);
return 0;
}
int cdr_create_cdrs(med_entry_t *records, u_int64_t count,
cdr_entry_t **cdrs, u_int64_t *cdr_count, u_int8_t *trash)
{
u_int64_t i = 0, cdr_index = 0;
u_int32_t invites = 0;
size_t cdr_size;
char *endtime = NULL, *tmp_endtime = NULL;
u_int64_t unix_endtime = 0, tmp_unix_endtime = 0;
char *call_status;
*cdr_count = 0;
/* get end time from BYE's timestamp */
for(i = 0; i < count; ++i)
{
med_entry_t *e = &(records[i]);
if(e->valid && e->method == MED_INVITE)
{
++invites;
}
else if(e->method == MED_BYE && endtime == NULL)
{
endtime = e->timestamp;
unix_endtime = e->unix_timestamp;
}
}
if(invites == 0)
{
syslog(LOG_CRIT, "No valid INVITEs for creating a cdr, internal error, callid='%s'",
records[0].callid);
return -1;
}
/* each INVITE maps to a CDR */
cdr_size = sizeof(cdr_entry_t) * invites;
*cdrs = (cdr_entry_t*)malloc(cdr_size);
if(*cdrs == NULL)
{
syslog(LOG_ERR, "Error allocating memory for cdrs: %s", strerror(errno));
return -1;
}
memset(*cdrs, 0, cdr_size);
for(i = 0; i < count; ++i)
{
med_entry_t *e = NULL;
cdr_entry_t *cdr = NULL;
cdr = &(*cdrs)[cdr_index];
e = &(records[i]);
call_status = cdr_map_status(e->sip_code);
if(e->valid && e->method == MED_INVITE && call_status != NULL)
{
++cdr_index;
if(strncmp("200", e->sip_code, 3))
{
/* missed calls have duration of 0 */
tmp_endtime = e->timestamp;
tmp_unix_endtime = e->unix_timestamp;
}
else
{
tmp_endtime = endtime;
tmp_unix_endtime = unix_endtime;
}
strncpy(cdr->call_id, e->callid, sizeof(cdr->call_id));
strncpy(cdr->start_time, e->timestamp, sizeof(cdr->start_time));
cdr->duration = tmp_unix_endtime - e->unix_timestamp > 0 ? tmp_unix_endtime - e->unix_timestamp : 0;
strncpy(cdr->call_status, call_status, sizeof(cdr->call_status));
strncpy(cdr->call_code, e->sip_code, sizeof(cdr->call_code));
cdr->carrier_cost = 0;
cdr->reseller_cost = 0;
cdr->customer_cost = 0;
if(cdr_parse_srcleg(e->src_leg, cdr) < 0)
{
*trash = 1;
return -1;
}
if(cdr_parse_dstleg(e->dst_leg, cdr) < 0)
{
*trash = 1;
return -1;
}
if(cdr_fill_record(cdr) != 0)
{
// TODO: error handling
}
}
}
*cdr_count = cdr_index;
/*syslog(LOG_DEBUG, "Created %llu CDRs:", *cdr_count);*/
return 0;
}
int cdr_fill_record(cdr_entry_t *cdr)
{
cdr_set_provider(cdr);
/*
if(cdr->source_clir)
{
strcpy(cdr->source_cli, "anonymous");
}
*/
return 0;
}
void cdr_set_provider(cdr_entry_t *cdr)
{
char *val;
if(strncmp("0", cdr->source_user_id, sizeof(cdr->source_user_id)) != 0)
{
if((val = g_hash_table_lookup(med_uuid_table, cdr->source_user_id)) != NULL)
{
strncpy(cdr->source_provider_id, val, sizeof(cdr->source_provider_id));
}
else
{
strncpy(cdr->source_provider_id, "0", sizeof(cdr->source_provider_id));
}
}
else if((val = g_hash_table_lookup(med_peer_ip_table, cdr->source_domain)) != NULL)
{
strncpy(cdr->source_provider_id, val, sizeof(cdr->source_provider_id));
}
else if((val = g_hash_table_lookup(med_peer_host_table, cdr->source_domain)) != NULL)
{
strncpy(cdr->source_provider_id, val, sizeof(cdr->source_provider_id));
}
else
{
strncpy(cdr->source_provider_id, "0", sizeof(cdr->source_provider_id));
}
if(strncmp("0", cdr->destination_user_id, sizeof(cdr->destination_user_id)) != 0)
{
if((val = g_hash_table_lookup(med_uuid_table, cdr->destination_user_id)) != NULL)
{
strncpy(cdr->destination_provider_id, val, sizeof(cdr->destination_provider_id));
}
else
{
strncpy(cdr->destination_provider_id, "0", sizeof(cdr->destination_provider_id));
}
}
else if((val = g_hash_table_lookup(med_peer_ip_table, cdr->destination_domain)) != NULL)
{
strncpy(cdr->destination_provider_id, val, sizeof(cdr->destination_provider_id));
}
else if((val = g_hash_table_lookup(med_peer_host_table, cdr->destination_domain)) != NULL)
{
strncpy(cdr->destination_provider_id, val, sizeof(cdr->destination_provider_id));
}
else
{
strncpy(cdr->destination_provider_id, "0", sizeof(cdr->destination_provider_id));
}
}

57
cdr.h

@ -0,0 +1,57 @@
#ifndef _CDR_H
#define _CDR_H
#include "mediator.h"
#define MSG_INVITE "INVITE"
#define MSG_BYE "BYE"
#define CDR_STATUS_OK "ok"
#define CDR_STATUS_BUSY "busy"
#define CDR_STATUS_NA "noanswer"
#define CDR_STATUS_CANCEL "cancel"
#define CDR_STATUS_OFFLINE "offline"
#define CDR_STATUS_TIMEOUT "timeout"
#define CDR_STATUS_UNKNOWN "other"
struct medmysql_batches;
typedef struct {
char call_id[128];
char source_user_id[37];
char source_provider_id[256];
char source_user[256];
char source_domain[256];
char source_cli[65];
u_int8_t source_clir;
char destination_user_id[37];
char destination_provider_id[256];
char destination_user[256];
char destination_domain[256];
char destination_user_in[256];
char destination_domain_in[256];
char destination_dialed[256];
char call_type[8];
char call_status[16];
char call_code[4];
char start_time[32];
u_int32_t duration;
u_int32_t carrier_cost;
u_int32_t reseller_cost;
u_int32_t customer_cost;
} cdr_entry_t;
int cdr_process_records(med_entry_t *records, u_int64_t count, u_int64_t *cdr_count, struct medmysql_batches *);
void cdr_fix_accids(med_entry_t *records, u_int64_t count);
int cdr_create_cdrs(med_entry_t *records, u_int64_t count, cdr_entry_t **cdrs, u_int64_t *cdr_count, u_int8_t *trash);
int cdr_fill_record(cdr_entry_t *cdr);
void cdr_set_provider(cdr_entry_t *cdr);
#endif /* _CDR_H */

@ -0,0 +1,179 @@
#include <getopt.h>
#include "mediator.h"
#include "config.h"
unsigned int config_interval = MEDIATOR_DEFAULT_INTERVAL;
u_int8_t config_dumpcdr = MEDIATOR_DEFAULT_DUMPCDR;
u_int8_t config_daemonize = MEDIATOR_DEFAULT_DAEMONIZE;
char *config_pid_path = MEDIATOR_DEFAULT_PIDPATH;
char *config_med_host = MEDIATOR_DEFAULT_ACCHOST;
char *config_med_user = MEDIATOR_DEFAULT_ACCUSER;
char *config_med_pass = MEDIATOR_DEFAULT_ACCPASS;
char *config_med_db = MEDIATOR_DEFAULT_ACCDB;
unsigned int config_med_port = MEDIATOR_DEFAULT_ACCPORT;
char *config_cdr_host = MEDIATOR_DEFAULT_CDRHOST;
char *config_cdr_user = MEDIATOR_DEFAULT_CDRUSER;
char *config_cdr_pass = MEDIATOR_DEFAULT_CDRPASS;
char *config_cdr_db = MEDIATOR_DEFAULT_CDRDB;
unsigned int config_cdr_port = MEDIATOR_DEFAULT_CDRPORT;
static u_int8_t config_pid_path_free = 0;
static u_int8_t config_med_host_free = 0;
static u_int8_t config_med_user_free = 0;
static u_int8_t config_med_pass_free = 0;
static u_int8_t config_med_db_free = 0;
static u_int8_t config_cdr_host_free = 0;
static u_int8_t config_cdr_user_free = 0;
static u_int8_t config_cdr_pass_free = 0;
static u_int8_t config_cdr_db_free = 0;
static void config_help(const char *self)
{
printf("mediator 1.0 - Creates call detail records from SER accounting.\n" \
"Usage: %s [-?] [-d] [-D pidpath]\n" \
" -D\tThe path of the PID file (default = '%s').\n" \
" -d\tEnables daemonization of the process (default = disabled).\n" \
" -l\tEnables dumping of CDRs to syslog (default = disabled).\n" \
" -i\tThe creation interval (default = %d).\n" \
" -h\tThe ACC db host (default = '%s').\n" \
" -o\tThe ACC db port (default = '%d').\n" \
" -u\tThe ACC db user (default = '%s').\n" \
" -p\tThe ACC db pass (default = '%s').\n" \
" -b\tThe ACC db name (default = '%s').\n" \
" -H\tThe CDR db host (default = '%s').\n" \
" -O\tThe CDR db port (default = '%d').\n" \
" -U\tThe CDR db user (default = '%s').\n" \
" -P\tThe CDR db pass (default = '%s').\n" \
" -B\tThe CDR db name (default = '%s').\n" \
" -?\tDisplays this message.\n",
self, config_pid_path, config_interval,
config_med_host, config_med_port, config_med_user, config_med_pass, config_med_db,
config_cdr_host, config_cdr_port, config_cdr_user, config_cdr_pass, config_cdr_db);
}
int config_parse_cmdopts(int argc, char **argv)
{
int c;
while((c = getopt(argc, argv, "D:i:dl?h:u:p:b:o:H:U:P:B:O:")) != -1)
{
if(c == '?')
{
config_help(argv[0]);
exit(0);
}
else if(c == 'd')
{
config_daemonize = 1;
}
else if(c == 'l')
{
config_dumpcdr = 1;
}
else if(c == 'D')
{
config_pid_path = (char*)strdup(optarg);
config_pid_path_free = 1;
}
else if(c == 'i')
{
config_interval = atoi(optarg);
}
else if(c == 'h')
{
config_med_host = (char*)strdup(optarg);
config_med_host_free = 1;
}
else if(c == 'u')
{
config_med_user = (char*)strdup(optarg);
config_med_user_free = 1;
}
else if(c == 'p')
{
config_med_pass = (char*)strdup(optarg);
config_med_pass_free = 1;
}
else if(c == 'b')
{
config_med_db = (char*)strdup(optarg);
config_med_db_free = 1;
}
else if(c == 'o')
{
config_med_port = atoi(optarg);
}
else if(c == 'H')
{
config_cdr_host = (char*)strdup(optarg);
config_cdr_host_free = 1;
}
else if(c == 'U')
{
config_cdr_user = (char*)strdup(optarg);
config_cdr_user_free = 1;
}
else if(c == 'P')
{
config_cdr_pass = (char*)strdup(optarg);
config_cdr_pass_free = 1;
}
else if(c == 'B')
{
config_cdr_db = (char*)strdup(optarg);
config_cdr_db_free = 1;
}
else if(c == 'O')
{
config_cdr_port = atoi(optarg);
}
}
return 0;
}
void config_cleanup()
{
if(config_pid_path_free)
{
free(config_pid_path);
}
if(config_cdr_host_free)
{
free(config_cdr_host);
}
if(config_cdr_user_free)
{
free(config_cdr_user);
}
if(config_cdr_pass_free)
{
free(config_cdr_pass);
}
if(config_cdr_db_free)
{
free(config_cdr_db);
}
if(config_med_host_free)
{
free(config_med_host);
}
if(config_med_user_free)
{
free(config_med_user);
}
if(config_med_pass_free)
{
free(config_med_pass);
}
if(config_med_db_free)
{
free(config_med_db);
}
}

@ -0,0 +1,29 @@
#ifndef _CONFIG_H
#define _CONFIG_H
#include <sys/types.h>
#include "mediator.h"
extern u_int8_t config_daemonize;
extern char *config_pid_path;
extern char *config_hostname;
extern char *config_med_host;
extern unsigned int config_med_port;
extern char *config_med_user;
extern char *config_med_pass;
extern char *config_med_db;
extern char *config_cdr_host;
extern unsigned int config_cdr_port;
extern char *config_cdr_user;
extern char *config_cdr_pass;
extern char *config_cdr_db;
extern unsigned int config_interval;
extern u_int8_t config_dumpcdr;
int config_parse_cmdopts(int argc, char **argv);
void config_cleanup();
#endif /* _CONFIG_H */

@ -0,0 +1,49 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "daemonizer.h"
int daemonize()
{
pid_t pid = fork();
if(pid < 0)
{
return -1;
}
else if(pid > 0)
{
_exit(0);
}
else if(pid == 0)
{
int fds;
setsid();
for(fds = getdtablesize(); fds >= 0; --fds)
{
if(fds != mediator_lockfd)
close(fds);
}
fds = open("/dev/null", O_RDWR); /* stdin */
dup(fds); /* stdout */
dup(fds); /* stderr */
umask(027);
chdir("/");
}
return 0;
}
int write_pid(const char *pidfile)
{
FILE *pfile = fopen(pidfile, "w");
if(pfile == NULL)
{
syslog(LOG_CRIT, "Error opening pid file '%s': %s", pidfile, strerror(errno));
return -1;
}
fprintf(pfile, "%d", getpid());
fclose(pfile);
return 0;
}

@ -0,0 +1,9 @@
#ifndef _DAEMONIZER_H
#define _DAEMONIZER_H
#include "mediator.h"
int daemonize();
int write_pid(const char *pidfile);
#endif /* _DAEMONIZER_H */

6
debian/changelog vendored

@ -0,0 +1,6 @@
sipwise-mediator (0.1.1) unstable; urgency=low
* Initial release.
-- Michael Prokop <mprokop@sipwise.com> Wed, 16 Jul 2008 18:36:20 +0200

1
debian/compat vendored

@ -0,0 +1 @@
5

15
debian/control vendored

@ -0,0 +1,15 @@
Source: sipwise-mediator
Section: utils
Priority: extra
Maintainer: Michael Prokop <mprokop@sipwise.com>
Build-Depends: debhelper (>= 5), libmysqlclient15-dev, libglib2.0-dev
Standards-Version: 3.8.0
Homepage: http://sipwise.com/
Vcs-svn: https://dev.sipwise.com/svn/voip-tools/mediator
Vcs-Browser: https://dev.sipwise.com/svn/voip-tools/mediator
Package: sipwise-mediator
Architecture: any
Depends: ${shlibs:Depends}, ${misc:Depends}
Description: a mediator software package
TODO

17
debian/copyright vendored

@ -0,0 +1,17 @@
This package was debianized by Michael Prokop <mprokop@sipwise.com> on
Wed, 16 Jul 2008 18:36:20 +0200.
It was downloaded from https://dev.sipwise.com/svn
Upstream Author: The Sipwise Team - http://sipwise.com/
Copyright:
Copyright (C) 2008++ Sipwise / http://sipwise.com/
License:
TODO
The Debian packaging is (C) 2008, Michael Prokop <mprokop@sipwise.com> and
is licensed under the GPL, see `/usr/share/common-licenses/GPL'.

2
debian/overrides vendored

@ -0,0 +1,2 @@
mediator: binary-without-manpage usr/bin/mediator
mediator: new-package-should-close-itp-bug

75
debian/rules vendored

@ -0,0 +1,75 @@
#!/usr/bin/make -f
# -*- makefile -*-
# Sample debian/rules that uses debhelper.
# This file was originally written by Joey Hess and Craig Small.
# As a special exception, when this file is copied by dh-make into a
# dh-make output file, you may use that output file without restriction.
# This special exception was added by Craig Small in version 0.37 of dh-make.
# Uncomment this to turn on verbose mode.
#export DH_VERBOSE=1
CFLAGS = -Wall -g
ifneq (,$(findstring noopt,$(DEB_BUILD_OPTIONS)))
CFLAGS += -O0
else
CFLAGS += -O2
endif
build: build-stamp
build-stamp:
dh_testdir
# Add here commands to compile the package.
$(MAKE)
touch $@
clean:
dh_testdir
dh_testroot
rm -f build-stamp
# Add here commands to clean up after the build process.
-$(MAKE) clean
dh_clean
install: build
dh_testdir
dh_testroot
dh_clean -k
dh_installdirs usr/bin
# Add here commands to install the package into debian/sipwise-mediator.
install -m 755 mediator $(CURDIR)/debian/sipwise-mediator/usr/bin/mediator
# Build architecture-independent files here.
binary-indep: build install
# We have nothing to do by default.
# Build architecture-dependent files here.
binary-arch: build install
dh_testdir
dh_testroot
dh_installchangelogs ChangeLog
dh_installdocs
dh_installexamples
dh_installman
dh_link
dh_strip
mkdir -p $(CURDIR)/debian/sipwise-mediator/usr/share/lintian/overrides/
cp -av debian/overrides $(CURDIR)/debian/sipwise-mediator/usr/share/lintian/overrides/sipwise-mediator
dh_compress
dh_fixperms
dh_installdeb
dh_shlibdeps
dh_gencontrol
dh_md5sums
dh_builddeb
binary: binary-indep binary-arch
.PHONY: build clean binary-indep binary-arch binary install

@ -0,0 +1,271 @@
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <time.h>
#include <sys/time.h>
#include <sys/file.h>
#include "mediator.h"
#include "config.h"
#include "daemonizer.h"
#include "medmysql.h"
#include "cdr.h"
static sig_atomic_t mediator_shutdown = 0;
int mediator_lockfd = -1;
u_int64_t mediator_count = 0;
GHashTable *med_peer_host_table = NULL;
GHashTable *med_peer_ip_table = NULL;
GHashTable *med_uuid_table = NULL;
/**********************************************************************/
static int mediator_load_maps()
{
med_peer_host_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
med_peer_ip_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
med_uuid_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
if(medmysql_load_maps(med_peer_host_table, med_peer_ip_table))
return -1;
if(medmysql_load_uuids(med_uuid_table))
return -1;
return 0;
}
/**********************************************************************/
static void mediator_print_mapentry(gpointer key, gpointer value, gpointer d)
{
syslog(LOG_DEBUG, "\t'%s' -> %s", (char*)key, (char*)value);
}
/**********************************************************************/
static void mediator_destroy_maps()
{
if(med_peer_host_table)
g_hash_table_destroy(med_peer_host_table);
if(med_peer_ip_table)
g_hash_table_destroy(med_peer_ip_table);
if(med_uuid_table)
g_hash_table_destroy(med_uuid_table);
}
/**********************************************************************/
static void mediator_print_maps()
{
syslog(LOG_DEBUG, "Peer host map:");
g_hash_table_foreach(med_peer_host_table, mediator_print_mapentry, NULL);
syslog(LOG_DEBUG, "Peer IP map:");
g_hash_table_foreach(med_peer_ip_table, mediator_print_mapentry, NULL);
syslog(LOG_DEBUG, "UUID map:");
g_hash_table_foreach(med_uuid_table, mediator_print_mapentry, NULL);
}
/**********************************************************************/
static void mediator_unlock()
{
syslog(LOG_DEBUG, "Unlocking mediator.");
if(mediator_lockfd != -1)
{
flock(mediator_lockfd, LOCK_UN);
}
}
/**********************************************************************/
static void mediator_exit()
{
mediator_unlock();
config_cleanup();
closelog();
}
/**********************************************************************/
static void mediator_signal(int signal)
{
if(signal == SIGTERM || signal == SIGINT)
mediator_shutdown = 1;
}
/**********************************************************************/
static int mediator_lock()
{
mediator_lockfd = open(MEDIATOR_LOCK_FILE, O_CREAT|O_RDWR);
if(mediator_lockfd == -1)
{
syslog(LOG_CRIT, "Error creating lock file: %s", strerror(errno));
return -1;
}
if(flock(mediator_lockfd, LOCK_EX|LOCK_NB) == -1)
{
syslog(LOG_CRIT, "Error locking lock file: %s", strerror(errno));
return -1;
}
return 0;
}
#ifdef WITH_TIME_CALC
/**********************************************************************/
static u_int64_t mediator_calc_runtime(struct timeval *tv_start, struct timeval *tv_stop)
{
return ((u_int64_t)((tv_stop->tv_sec * 1000000 + tv_stop->tv_usec) -
(tv_start->tv_sec * 1000000 + tv_start->tv_usec)) / 1000);
}
#endif
/**********************************************************************/
int main(int argc, char **argv)
{
med_callid_t *callids;
med_entry_t *records;
u_int64_t id_count, rec_count, i;
u_int64_t cdr_count, last_count;
int maprefresh;
struct medmysql_batches batches;
#ifdef WITH_TIME_CALC
struct timeval tv_start, tv_stop;
u_int64_t runtime;
#endif
openlog(MEDIATOR_SYSLOG_NAME, LOG_PID|LOG_NDELAY, LOG_DAEMON);
atexit(mediator_exit);
signal(SIGCHLD, SIG_IGN);
signal(SIGTERM, mediator_signal);
signal(SIGINT, mediator_signal);
if(config_parse_cmdopts(argc, argv) == -1)
{
return -1;
}
syslog(LOG_DEBUG, "Locking process.");
if(mediator_lock() != 0)
{
return -1;
}
if(config_daemonize)
{
syslog(LOG_DEBUG, "Daemonizing process.");
if(daemonize() != 0)
{
return -1;
}
}
syslog(LOG_DEBUG, "Writing pid file.");
if(write_pid(config_pid_path) != 0)
{
return -1;
}
syslog(LOG_INFO, "ACC acc database host='%s', port='%d', user='%s', name='%s'",
config_med_host, config_med_port, config_med_user, config_med_db);
syslog(LOG_INFO, "CDR acc database host='%s', port='%d', user='%s', name='%s'",
config_cdr_host, config_cdr_port, config_cdr_user, config_cdr_db);
syslog(LOG_DEBUG, "Setting up mysql connections.");
if(medmysql_init() != 0)
{
return -1;
}
syslog(LOG_INFO, "Up and running, daemonized=%d, pid-path='%s', interval=%d",
config_daemonize, config_pid_path, config_interval);
maprefresh = 0;
while(!mediator_shutdown)
{
if(maprefresh == 0)
{
mediator_destroy_maps();
if(mediator_load_maps() != 0)
{
break;
}
maprefresh = 10;
}
--maprefresh;
/*
mediator_print_maps();
*/
id_count = 0, rec_count = 0, cdr_count = 0;
last_count = mediator_count;
if(medmysql_fetch_callids(&callids, &id_count) != 0)
{
/* TODO: error processing? */
sleep(config_interval);
continue;
}
if(id_count > 0)
{
medmysql_batch_start(&batches);
/*syslog(LOG_DEBUG, "Processing %"PRIu64" accounting record group(s).", id_count);*/
for(i = 0; i < id_count && !mediator_shutdown; ++i)
{
#ifdef WITH_TIME_CALC
gettimeofday(&tv_start, NULL);
#endif
if(medmysql_fetch_records(&(callids[i]), &records, &rec_count) != 0)
{
/* TODO: error processing? */
continue;
}
if(cdr_process_records(records, rec_count, &cdr_count, &batches) != 0)
{
/* TODO: error processing? */
}
if(rec_count > 0)
{
free(records);
}
mediator_count += cdr_count;
#ifdef WITH_TIME_CALC
gettimeofday(&tv_stop, NULL);
runtime = mediator_calc_runtime(&tv_start, &tv_stop);
syslog(LOG_DEBUG, "Runtime for record group was %"PRIu64" ms.", runtime);
#endif
}
free(callids);
medmysql_batch_end(&batches);
}
if(mediator_count > last_count)
{
syslog(LOG_DEBUG, "Overall %"PRIu64" CDRs created so far.", mediator_count);
sleep(3);
}
else
{
/* sleep if no cdrs have been created */
sleep(config_interval);
}
}
mediator_destroy_maps();
syslog(LOG_INFO, "Shutting down.");
medmysql_cleanup();
syslog(LOG_INFO, "Successfully shut down.");
return 0;
}

@ -0,0 +1,74 @@
#ifndef _MEDIATOR_H
#define _MEDIATOR_H
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <errno.h>
#include <syslog.h>
#include <unistd.h>
#include <stdlib.h>
#include <glib.h>
#include <inttypes.h>
#define MEDIATOR_DEFAULT_INTERVAL 10
#define MEDIATOR_SYSLOG_NAME "mediator"
#define MEDIATOR_LOCK_FILE "/var/lock/mediator.lock"
#define MEDIATOR_DEFAULT_DAEMONIZE 0
#define MEDIATOR_DEFAULT_DUMPCDR 0
#define MEDIATOR_DEFAULT_PIDPATH "/var/run/mediator.pid"
#define MEDIATOR_DEFAULT_ACCHOST "localhost"
#define MEDIATOR_DEFAULT_ACCUSER "mediator"
#define MEDIATOR_DEFAULT_ACCPASS "GimmeAllUr$$$"
#define MEDIATOR_DEFAULT_ACCDB "accounting"
#define MEDIATOR_DEFAULT_ACCPORT 0
#define MEDIATOR_DEFAULT_CDRHOST "localhost"
#define MEDIATOR_DEFAULT_CDRUSER "mediator"
#define MEDIATOR_DEFAULT_CDRPASS "GimmeAllUr$$$"
#define MEDIATOR_DEFAULT_CDRDB "accounting"
#define MEDIATOR_DEFAULT_CDRPORT 0
#define MED_GW_STRING "gw"
#define MED_AS_STRING "as"
#define MED_PEER_STRING "peer"
#define MED_MIN_BASELEN 6
#define MED_SEP '|'
extern int mediator_lockfd;
typedef enum {
MED_INVITE = 1,
MED_BYE = 2,
MED_UNRECOGNIZED = 3
} med_method_t;
typedef struct {
char src_leg[256];
char dst_leg[256];
char sip_code[3];
char sip_reason[32];
char callid[128];
char timestamp[24];
u_int64_t unix_timestamp;
u_int64_t med_id;
u_int8_t valid;
med_method_t method;
char sip_method[32];
} med_entry_t;
typedef struct {
char value[128];
} med_callid_t;
extern GHashTable *med_peer_host_table;
extern GHashTable *med_peer_ip_table;
extern GHashTable *med_uuid_table;
#endif /* _MEDIATOR_H */

@ -0,0 +1,536 @@
#include <my_global.h>
#include <m_string.h>
#include <mysql.h>
#include "medmysql.h"
#include "config.h"
/*#define MED_CALLID_QUERY "(select a.callid, a.time from acc a, acc b where a.callid = b.callid and a.method = 'INVITE' and b.method = 'BYE' group by callid) union (select callid, time from acc where method = 'INVITE' and sip_code != '200') order by time asc limit 0,200000"*/
#define MED_CALLID_QUERY "select a.callid from acc a left join acc b on a.callid = b.callid and b.method = 'BYE' where a.method = 'INVITE' and (a.sip_code != '200' or b.id is not null) group by a.callid limit 0,200000"
#define MED_FETCH_QUERY "select sip_code, sip_reason, method, callid, time, unix_timestamp(time), " \
"src_leg, dst_leg, id " \
"from acc where callid = '%s' order by time asc"
#define MED_LOAD_PEER_QUERY "select domain, ip, provider_id from provisioning.voip_peer_hosts"
#define MED_LOAD_UUID_QUERY "select vs.uuid, r.contract_id from billing.voip_subscribers vs, " \
"billing.contracts c, billing.resellers r where c.id = vs.contract_id and " \
"c.reseller_id = r.id"
static MYSQL *cdr_handler = NULL;
static MYSQL *med_handler = NULL;
/**********************************************************************/
int medmysql_init()
{
my_bool recon = 1;
cdr_handler = mysql_init(NULL);
if(!mysql_real_connect(cdr_handler,
config_cdr_host, config_cdr_user, config_cdr_pass,
config_cdr_db, config_cdr_port, NULL, 0))
{
syslog(LOG_CRIT, "Error connecting to CDR db: %s", mysql_error(cdr_handler));
goto err;
}
if(mysql_options(cdr_handler, MYSQL_OPT_RECONNECT, &recon) != 0)
{
syslog(LOG_CRIT, "Error setting reconnect-option for CDR db: %s", mysql_error(cdr_handler));
goto err;
}
med_handler = mysql_init(NULL);
if(!mysql_real_connect(med_handler,
config_med_host, config_med_user, config_med_pass,
config_med_db, config_med_port, NULL, 0))
{
syslog(LOG_CRIT, "Error connecting to ACC db: %s", mysql_error(med_handler));
goto err;
}
if(mysql_options(med_handler, MYSQL_OPT_RECONNECT, &recon) != 0)
{
syslog(LOG_CRIT, "Error setting reconnect-option for ACC db: %s", mysql_error(med_handler));
goto err;
}
return 0;
err:
medmysql_cleanup();
return -1;
}
/**********************************************************************/
void medmysql_cleanup()
{
if(cdr_handler != NULL)
{
mysql_close(cdr_handler);
cdr_handler = NULL;
}
if(med_handler != NULL)
{
mysql_close(med_handler);
med_handler = NULL;
}
}
/**********************************************************************/
int medmysql_fetch_callids(med_callid_t **callids, u_int64_t *count)
{
MYSQL_RES *res;
MYSQL_ROW row;
char query[1024] = "";
size_t callid_size;
u_int64_t i = 0;
int ret = 0;
*count = 0;
strncpy(query, MED_CALLID_QUERY, sizeof(query));
/*syslog(LOG_DEBUG, "q='%s'", query);*/
if(mysql_real_query(med_handler, query, strlen(query)) != 0)
{
syslog(LOG_CRIT, "Error getting acc callids: %s",
mysql_error(med_handler));
return -1;
}
res = mysql_store_result(med_handler);
*count = mysql_num_rows(res);
if(*count == 0)
{
goto out;
}
callid_size = sizeof(med_callid_t) * (*count);
*callids = (med_callid_t*)malloc(callid_size);
memset(*callids, 0, callid_size);
if(*callids == NULL)
{
syslog(LOG_CRIT, "Error allocating callid memory: %s", strerror(errno));
ret = -1;
goto out;
}
while((row = mysql_fetch_row(res)) != NULL)
{
med_callid_t *c = &(*callids)[i++];
if(row == NULL || row[0] == NULL)
{
strcpy(c->value, "0");
}
strcpy(c->value, row[0]);
/*syslog(LOG_DEBUG, "callid[%"PRIu64"]='%s'", i, c->value);*/
}
out:
mysql_free_result(res);
return ret;
}
/**********************************************************************/
int medmysql_fetch_records(med_callid_t *callid,
med_entry_t **entries, u_int64_t *count)
{
MYSQL_RES *res;
MYSQL_ROW row;
char query[1024] = "";
size_t entry_size;
u_int64_t i = 0;
int ret = 0;
*count = 0;
snprintf(query, sizeof(query), MED_FETCH_QUERY, callid->value);
/*syslog(LOG_DEBUG, "q='%s'", query);*/
if(mysql_real_query(med_handler, query, strlen(query)) != 0)
{
syslog(LOG_CRIT, "Error getting acc records for callid '%s': %s",
callid->value, mysql_error(med_handler));
return -1;
}
res = mysql_store_result(med_handler);
*count = mysql_num_rows(res);
if(*count == 0)
{
syslog(LOG_CRIT, "No records found for callid '%s'!",
callid->value);
ret = -1;
goto out;
}
entry_size = (*count) * sizeof(med_entry_t);
*entries = (med_entry_t*)malloc(entry_size);
if(*entries == NULL)
{
syslog(LOG_CRIT, "Error allocating memory for record entries: %s",
strerror(errno));
ret = -1;
goto out;
}
memset(*entries, 0, entry_size);
while((row = mysql_fetch_row(res)) != NULL)
{
med_entry_t *e = &(*entries)[i++];
strncpy(e->sip_code, row[0], sizeof(e->sip_code));
strncpy(e->sip_reason, row[1], sizeof(e->sip_reason));
strncpy(e->sip_method, row[2], sizeof(e->sip_method));
strncpy(e->callid, row[3], sizeof(e->callid));
strncpy(e->timestamp, row[4], sizeof(e->timestamp));
e->unix_timestamp = atoll(row[5]);
strncpy(e->src_leg, row[6], sizeof(e->src_leg));
strncpy(e->dst_leg, row[7], sizeof(e->dst_leg));
e->med_id = atoll(row[8]);
e->valid = 1;
}
out:
mysql_free_result(res);
return ret;
}
/**********************************************************************/
int medmysql_trash_entries(const char *callid, struct medmysql_batches *batches)
{
if (batches->acc_trash.len > (PACKET_SIZE - 1024)) {
if (medmysql_flush_medlist(&batches->acc_trash))
return -1;
}
if (batches->acc_trash.len == 0)
batches->acc_trash.len = sprintf(batches->acc_trash.str, "insert into acc_trash select * from acc where callid in (");
batches->acc_trash.len += sprintf(batches->acc_trash.str + batches->acc_trash.len, "'%s',", callid);
return medmysql_delete_entries(callid, batches);
}
/**********************************************************************/
int medmysql_backup_entries(const char *callid, struct medmysql_batches *batches)
{
if (batches->acc_backup.len > (PACKET_SIZE - 1024)) {
if (medmysql_flush_medlist(&batches->acc_backup))
return -1;
}
if (batches->acc_backup.len == 0)
batches->acc_backup.len = sprintf(batches->acc_backup.str, "insert into acc_backup select * from acc where callid in (");
batches->acc_backup.len += sprintf(batches->acc_backup.str + batches->acc_backup.len, "'%s',", callid);
return medmysql_delete_entries(callid, batches);
}
/**********************************************************************/
int medmysql_delete_entries(const char *callid, struct medmysql_batches *batches)
{
if (batches->to_delete.len > (PACKET_SIZE - 1024)) {
if (medmysql_flush_medlist(&batches->acc_backup))
return -1;
if (medmysql_flush_medlist(&batches->acc_trash))
return -1;
if (medmysql_flush_medlist(&batches->to_delete))
return -1;
}
if (batches->to_delete.len == 0)
batches->to_delete.len = sprintf(batches->to_delete.str, "delete from acc where callid in (");
batches->to_delete.len += sprintf(batches->to_delete.str + batches->to_delete.len, "'%s',", callid);
return 0;
}
#define CDRPRINT(x) batches->cdrs.len += sprintf(batches->cdrs.str + batches->cdrs.len, x)
#define CDRESCAPE(x) batches->cdrs.len += mysql_real_escape_string(med_handler, batches->cdrs.str + batches->cdrs.len, x, strlen(x))
/**********************************************************************/
int medmysql_insert_cdrs(cdr_entry_t *entries, u_int64_t count, struct medmysql_batches *batches)
{
u_int64_t i;
for(i = 0; i < count; ++i)
{
if (batches->cdrs.len > (PACKET_SIZE - 6000)) {
if (medmysql_flush_cdr(batches))
return -1;
}
if (batches->cdrs.len == 0) {
batches->cdrs.len = sprintf(batches->cdrs.str, "insert into cdr (id, update_time, " \
"source_user_id, source_provider_id, source_user, source_domain, " \
"source_cli, source_clir, "\
"destination_user_id, destination_provider_id, destination_user, destination_domain, " \
"destination_user_in, destination_domain_in, destination_user_dialed, " \
"call_type, call_status, call_code, start_time, duration, call_id, " \
"carrier_cost, reseller_cost, customer_cost) values ");
}
cdr_entry_t *e = &(entries[i]);
char str_source_clir[2] = "";
char str_duration[32] = "";
char str_carrier_cost[32] = "";
char str_reseller_cost[32] = "";
char str_customer_cost[32] = "";
snprintf(str_source_clir, sizeof(str_source_clir), "%d", e->source_clir);
snprintf(str_duration, sizeof(str_duration), "%d", e->duration);
snprintf(str_carrier_cost, sizeof(str_carrier_cost), "%d", e->carrier_cost);
snprintf(str_reseller_cost, sizeof(str_reseller_cost), "%d", e->reseller_cost);
snprintf(str_customer_cost, sizeof(str_customer_cost), "%d", e->customer_cost);
CDRPRINT("(NULL, now(), '");
CDRESCAPE(e->source_user_id);
CDRPRINT("','");
CDRESCAPE(e->source_provider_id);
CDRPRINT("','");
CDRESCAPE(e->source_user);
CDRPRINT("','");
CDRESCAPE(e->source_domain);
CDRPRINT("','");
CDRESCAPE(e->source_cli);
CDRPRINT("',");
CDRESCAPE(str_source_clir);
CDRPRINT(",'");
CDRESCAPE(e->destination_user_id);
CDRPRINT("','");
CDRESCAPE(e->destination_provider_id);
CDRPRINT("','");
CDRESCAPE(e->destination_user);
CDRPRINT("','");
CDRESCAPE(e->destination_domain);
CDRPRINT("','");
CDRESCAPE(e->destination_user_in);
CDRPRINT("','");
CDRESCAPE(e->destination_domain_in);
CDRPRINT("','");
CDRESCAPE(e->destination_dialed);
CDRPRINT("','");
CDRESCAPE(e->call_type);
CDRPRINT("','");
CDRESCAPE(e->call_status);
CDRPRINT("','");
CDRESCAPE(e->call_code);
CDRPRINT("','");
CDRESCAPE(e->start_time);
CDRPRINT("',");
CDRESCAPE(str_duration);
CDRPRINT(",'");
CDRESCAPE(e->call_id);
CDRPRINT("',");
CDRESCAPE(str_carrier_cost);
CDRPRINT(",");
CDRESCAPE(str_reseller_cost);
CDRPRINT(",");
CDRESCAPE(str_customer_cost);
CDRPRINT("),");
}
/*syslog(LOG_DEBUG, "q='%s'", query);*/
return 0;
}
/**********************************************************************/
int medmysql_load_maps(GHashTable *host_table, GHashTable *ip_table)
{
MYSQL_RES *res;
MYSQL_ROW row;
int ret = 0;
char query[1024] = "";
gpointer key;
char* host_id;
char* ip_id;
snprintf(query, sizeof(query), MED_LOAD_PEER_QUERY);
/* syslog(LOG_DEBUG, "q='%s'", query); */
if(mysql_real_query(med_handler, query, strlen(query)) != 0)
{
syslog(LOG_CRIT, "Error loading peer hosts: %s",
mysql_error(med_handler));
return -1;
}
res = mysql_store_result(med_handler);
while((row = mysql_fetch_row(res)) != NULL)
{
host_id = NULL;
ip_id = NULL;
if(row[0] == NULL || row[1] == NULL || row[2] == NULL)
{
syslog(LOG_CRIT, "Error loading peer hosts, a column is NULL");
ret = -1;
goto out;
}
host_id = strdup(row[2]);
if(host_id == NULL)
{
syslog(LOG_CRIT, "Error allocating host id memory: %s", strerror(errno));
ret = -1;
goto out;
}
ip_id = strdup(row[2]);
if(ip_id == NULL)
{
syslog(LOG_CRIT, "Error allocating ip id memory: %s", strerror(errno));
free(host_id);
ret = -1;
goto out;
}
if(g_hash_table_lookup(host_table, row[0]) != NULL)
{
syslog(LOG_WARNING, "Skipping duplicate hostname '%s'", row[0]);
}
else
{
key = (gpointer)g_strdup(row[0]);
g_hash_table_insert(host_table, key, host_id);
}
if(ip_table != NULL)
{
if(g_hash_table_lookup(ip_table, row[1]) != NULL)
{
syslog(LOG_WARNING, "Skipping duplicate IP '%s'", row[1]);
}
else
{
key = (gpointer)g_strdup(row[1]);
g_hash_table_insert(ip_table, key, ip_id);
}
}
}
out:
mysql_free_result(res);
return ret;
}
/**********************************************************************/
int medmysql_load_uuids(GHashTable *uuid_table)
{
MYSQL_RES *res;
MYSQL_ROW row;
int ret = 0;
char query[1024] = "";
gpointer key;
char *provider_id;
snprintf(query, sizeof(query), MED_LOAD_UUID_QUERY);
/* syslog(LOG_DEBUG, "q='%s'", query); */
if(mysql_real_query(med_handler, query, strlen(query)) != 0)
{
syslog(LOG_CRIT, "Error loading uuids: %s",
mysql_error(med_handler));
return -1;
}
res = mysql_store_result(med_handler);
while((row = mysql_fetch_row(res)) != NULL)
{
if(row[0] == NULL || row[1] == NULL)
{
syslog(LOG_CRIT, "Error loading uuids, a column is NULL");
ret = -1;
goto out;
}
provider_id = strdup(row[1]);
if(provider_id == NULL)
{
syslog(LOG_CRIT, "Error allocating provider id memory: %s", strerror(errno));
ret = -1;
goto out;
}
key = (gpointer)g_strdup(row[0]);
g_hash_table_insert(uuid_table, key, provider_id);
}
out:
mysql_free_result(res);
return ret;
}
void medmysql_batch_start(struct medmysql_batches *batches) {
mysql_real_query(cdr_handler, "start transaction", 17);
mysql_real_query(med_handler, "start transaction", 17);
batches->cdrs.len = 0;
batches->acc_backup.len = 0;
batches->acc_trash.len = 0;
batches->to_delete.len = 0;
}
int medmysql_flush_cdr(struct medmysql_batches *batches) {
if (batches->cdrs.len == 0)
return 0;
if (batches->cdrs.str[batches->cdrs.len - 1] != ',')
return 0;
batches->cdrs.len--;
batches->cdrs.str[batches->cdrs.len] = '\0';
if(mysql_real_query(cdr_handler, batches->cdrs.str, batches->cdrs.len) != 0)
{
batches->cdrs.len = 0;
syslog(LOG_CRIT, "Error inserting cdrs: %s",
mysql_error(cdr_handler));
return -1;
}
batches->cdrs.len = 0;
return 0;
}
int medmysql_flush_medlist(struct medmysql_str *str) {
if (str->len == 0)
return 0;
if (str->str[str->len - 1] != ',')
return 0;
str->str[str->len - 1] = ')';
if(mysql_real_query(med_handler, str->str, str->len) != 0)
{
str->len = 0;
syslog(LOG_CRIT, "Error executing query: %s",
mysql_error(cdr_handler));
return -1;
}
str->len = 0;
return 0;
}
void medmysql_batch_end(struct medmysql_batches *batches) {
medmysql_flush_cdr(batches);
medmysql_flush_medlist(&batches->acc_trash);
medmysql_flush_medlist(&batches->acc_backup);
medmysql_flush_medlist(&batches->to_delete);
mysql_real_query(cdr_handler, "commit", 6);
mysql_real_query(med_handler, "commit", 6);
}

@ -0,0 +1,38 @@
#ifndef _MED_MYSQL_H
#define _MED_MYSQL_H
#include <glib.h>
#include "mediator.h"
#include "cdr.h"
#define PACKET_SIZE (1024*1024)
struct medmysql_str {
char str[PACKET_SIZE];
unsigned int len;
};
struct medmysql_batches {
struct medmysql_str cdrs;
struct medmysql_str acc_backup;
struct medmysql_str acc_trash;
struct medmysql_str to_delete;
};
int medmysql_init();
void medmysql_cleanup();
int medmysql_fetch_callids(med_callid_t **callids, u_int64_t *count);
int medmysql_fetch_records(med_callid_t *callid, med_entry_t **entries, u_int64_t *count);
int medmysql_trash_entries(const char *callid, struct medmysql_batches *);
int medmysql_backup_entries(const char *callid, struct medmysql_batches *);
int medmysql_delete_entries(const char *callid, struct medmysql_batches *);
int medmysql_insert_cdrs(cdr_entry_t *records, u_int64_t count, struct medmysql_batches *);
int medmysql_load_maps(GHashTable *host_table, GHashTable *ip_table);
int medmysql_load_uuids(GHashTable *uuid_table);
void medmysql_batch_start(struct medmysql_batches *);
int medmysql_flush_cdr(struct medmysql_batches *);
int medmysql_flush_medlist(struct medmysql_str *);
void medmysql_batch_end(struct medmysql_batches *);
#endif /* _MED_MYSQL_H */
Loading…
Cancel
Save