paving the way to multi-interface support

pull/26/head
Richard Fuchs 11 years ago
parent fc6aa23dcb
commit 0cbb4665a5

@ -141,6 +141,9 @@ INLINE void in4_to_6(struct in6_addr *o, u_int32_t ip) {
o->s6_addr32[2] = htonl(0xffff);
o->s6_addr32[3] = ip;
}
INLINE u_int32_t in6_to_4(const struct in6_addr *a) {
return a->s6_addr32[3];
}
INLINE void smart_ntop(char *o, struct in6_addr *a, size_t len) {
const char *r;
@ -202,6 +205,18 @@ INLINE int smart_pton(int af, char *src, void *dst) {
return inet_pton(af, src, dst);
}
INLINE int pton_46(struct in6_addr *dst, const char *src) {
u_int32_t in4;
if (inet_pton(AF_INET6, src, dst) == 1)
return 0;
in4 = inet_addr(src);
if (in4 == INADDR_NONE)
return -1;
in4_to_6(dst, in4);
return 0;
}
INLINE int strmemcmp(const void *mem, int len, const char *str) {
int l = strlen(str);
if (l < len)

@ -334,7 +334,7 @@ static void stream_fd_closed(int fd, void *p, uintptr_t u) {
INLINE void __mp_address_translate(struct mp_address *o, const struct endpoint *ep) {
if (IN6_IS_ADDR_V4MAPPED(&ep->ip46)) {
o->family = AF_INET;
o->u.ipv4 = ep->ip46.s6_addr32[3];
o->u.ipv4 = in6_to_4(&ep->ip46);
}
else {
o->family = AF_INET6;
@ -349,6 +349,7 @@ void kernelize(struct packet_stream *stream) {
struct call *call = stream->call;
struct callmaster *cm = call->callmaster;
struct packet_stream *sink = NULL;
struct interface_address *ifa;
if (PS_ISSET(stream, KERNELIZED))
return;
@ -403,10 +404,11 @@ void kernelize(struct packet_stream *stream) {
mpt.src_addr.family = mpt.dst_addr.family;
mpt.src_addr.port = sink->sfd->fd.localport;
ifa = get_first_interface_address(cm, NULL, mpt.src_addr.family); /* XXX */
if (mpt.src_addr.family == AF_INET)
mpt.src_addr.u.ipv4 = cm->conf.ipv4;
mpt.src_addr.u.ipv4 = in6_to_4(&ifa->addr);
else
memcpy(mpt.src_addr.u.ipv6, &cm->conf.ipv6, sizeof(mpt.src_addr.u.ipv6));
memcpy(mpt.src_addr.u.ipv6, &ifa->addr, sizeof(mpt.src_addr.u.ipv6));
stream->handler->in->kernel(&mpt.decrypt, stream);
stream->handler->out->kernel(&mpt.encrypt, sink);
@ -537,6 +539,8 @@ void callmaster_msg_mh_src(struct callmaster *cm, struct msghdr *mh) {
struct in_pktinfo *pi;
struct in6_pktinfo *pi6;
struct sockaddr_in6 *sin6;
struct interface_address *ifa;
sin6 = mh->msg_name;
@ -548,9 +552,10 @@ void callmaster_msg_mh_src(struct callmaster *cm, struct msghdr *mh) {
ch->cmsg_level = IPPROTO_IP;
ch->cmsg_type = IP_PKTINFO;
ifa = get_first_interface_address(cm, NULL, AF_INET);
pi = (void *) CMSG_DATA(ch);
ZERO(*pi);
pi->ipi_spec_dst.s_addr = cm->conf.ipv4;
pi->ipi_spec_dst.s_addr = in6_to_4(&ifa->addr);
mh->msg_controllen = CMSG_SPACE(sizeof(*pi));
}
@ -559,9 +564,10 @@ void callmaster_msg_mh_src(struct callmaster *cm, struct msghdr *mh) {
ch->cmsg_level = IPPROTO_IPV6;
ch->cmsg_type = IPV6_PKTINFO;
ifa = get_first_interface_address(cm, NULL, AF_INET);
pi6 = (void *) CMSG_DATA(ch);
ZERO(*pi6);
pi6->ipi6_addr = cm->conf.ipv6;
pi6->ipi6_addr = ifa->addr;
mh->msg_controllen = CMSG_SPACE(sizeof(*pi6));
}
@ -2273,21 +2279,23 @@ static int call_stream_address4(char *o, struct packet_stream *ps, enum stream_a
u_int32_t ip4;
struct callmaster *m = ps->call->callmaster;
int l = 0;
struct interface_address *ifa;
ifa = get_first_interface_address(m, NULL, AF_INET);
if (format == SAF_NG) {
strcpy(o + l, "IP4 ");
l = 4;
}
ip4 = ps->advertised_endpoint.ip46.s6_addr32[3];
if (!ip4) {
if (!in6_to_4(&ps->advertised_endpoint.ip46)) {
strcpy(o + l, "0.0.0.0");
l += 7;
}
else if (m->conf.adv_ipv4)
l += sprintf(o + l, IPF, IPP(m->conf.adv_ipv4));
else
l += sprintf(o + l, IPF, IPP(m->conf.ipv4));
else {
ip4 = in6_to_4(&ifa->advertised);
l += sprintf(o + l, IPF, IPP(ip4));
}
*len = l;
return AF_INET;
@ -2296,6 +2304,9 @@ static int call_stream_address4(char *o, struct packet_stream *ps, enum stream_a
static int call_stream_address6(char *o, struct packet_stream *ps, enum stream_address_format format, int *len) {
struct callmaster *m = ps->call->callmaster;
int l = 0;
struct interface_address *ifa;
ifa = get_first_interface_address(m, NULL, AF_INET6);
if (format == SAF_NG) {
strcpy(o + l, "IP6 ");
@ -2307,10 +2318,7 @@ static int call_stream_address6(char *o, struct packet_stream *ps, enum stream_a
l += 2;
}
else {
if (!is_addr_unspecified(&m->conf.adv_ipv6))
inet_ntop(AF_INET6, &m->conf.adv_ipv6, o + l, 45); /* lies... */
else
inet_ntop(AF_INET6, &m->conf.ipv6, o + l, 45);
inet_ntop(AF_INET6, &ifa->advertised, o + l, 45); /* lies ... */
l += strlen(o + l);
}
@ -2323,6 +2331,7 @@ static csa_func __call_stream_address(struct packet_stream *ps, int variant) {
struct packet_stream *sink;
struct call_media *sink_media;
csa_func variants[2];
struct interface_address *ifa;
assert(variant >= 0);
assert(variant < G_N_ELEMENTS(variants));
@ -2330,11 +2339,12 @@ static csa_func __call_stream_address(struct packet_stream *ps, int variant) {
m = ps->call->callmaster;
sink = packet_stream_sink(ps);
sink_media = sink->media;
ifa = get_first_interface_address(m, NULL, AF_INET6);
variants[0] = call_stream_address4;
variants[1] = call_stream_address6;
if (is_addr_unspecified(&m->conf.ipv6)) {
if (!ifa) {
variants[1] = NULL;
goto done;
}
@ -2787,3 +2797,63 @@ const struct transport_protocol *transport_protocol(const str *s) {
out:
return NULL;
}
void callmaster_config_init(struct callmaster *m) {
GList *l;
struct interface_address *ifa;
struct local_interface *lif;
m->interfaces = g_hash_table_new(str_hash, str_equal);
for (l = m->conf.interfaces->head; l; l = l->next) {
ifa = l->data;
lif = g_hash_table_lookup(m->interfaces, &ifa->interface_name);
if (!lif) {
lif = g_slice_alloc0(sizeof(*lif));
lif->name = ifa->interface_name;
g_hash_table_insert(m->interfaces, &lif->name, lif);
g_queue_push_tail(&m->interface_list, lif);
}
if (IN6_IS_ADDR_V4MAPPED(&ifa->addr))
g_queue_push_tail(&lif->ipv4, ifa);
else
g_queue_push_tail(&lif->ipv6, ifa);
}
}
struct local_interface *get_local_interface(struct callmaster *m, str *name) {
struct local_interface *lif;
if (!name)
return m->interface_list.head->data;
lif = g_hash_table_lookup(m->interfaces, name);
return lif;
}
struct interface_address *get_first_interface_address(struct callmaster *m, str *name, int family) {
struct local_interface *lif;
GQueue *q;
lif = get_local_interface(m, name);
if (!lif)
return NULL;
switch (family) {
case AF_INET:
q = &lif->ipv4;
break;
case AF_INET6:
q = &lif->ipv6;
break;
default:
return NULL;
}
if (!q->head)
return NULL;
return q->head->data;
}

@ -328,13 +328,21 @@ struct call {
unsigned char tos;
};
struct local_interface {
str name;
GQueue ipv4; /* struct interface_address */
GQueue ipv6; /* struct interface_address */
};
struct interface_address {
str interface_name;
struct in6_addr addr;
struct in6_addr advertised;
};
struct callmaster_config {
int kernelfd;
int kernelid;
u_int32_t ipv4;
u_int32_t adv_ipv4;
struct in6_addr ipv6;
struct in6_addr adv_ipv6;
GQueue *interfaces; /* struct interface_address */
int port_min;
int port_max;
unsigned int timeout;
@ -342,7 +350,7 @@ struct callmaster_config {
struct redis *redis;
char *b2b_url;
unsigned char default_tos;
enum xmlrpc_format fmt;
enum xmlrpc_format fmt;
};
struct callmaster {
@ -351,6 +359,9 @@ struct callmaster {
rwlock_t hashlock;
GHashTable *callhash;
GHashTable *interfaces; /* struct local_interface */
GQueue interface_list; /* ditto */
mutex_t portlock;
u_int16_t lastport;
BIT_ARRAY_DECLARE(ports_used, 0x10000);
@ -378,6 +389,7 @@ struct call_stats {
struct callmaster *callmaster_new(struct poller *);
void callmaster_config_init(struct callmaster *);
void callmaster_msg_mh_src(struct callmaster *, struct msghdr *);
void callmaster_get_all_calls(struct callmaster *m, GQueue *q);
@ -418,6 +430,8 @@ void call_destroy(struct call *);
void kernelize(struct packet_stream *);
int call_stream_address_alt(char *, struct packet_stream *, enum stream_address_format, int *);
int call_stream_address(char *, struct packet_stream *, enum stream_address_format, int *);
struct local_interface *get_local_interface(struct callmaster *m, str *name);
struct interface_address *get_first_interface_address(struct callmaster *m, str *name, int family);
const struct transport_protocol *transport_protocol(const str *s);
@ -472,7 +486,9 @@ INLINE str *call_str_init_dup(struct call *c, char *s) {
return call_str_dup(c, &t);
}
INLINE int callmaster_has_ipv6(struct callmaster *m) {
return is_addr_unspecified(&m->conf.ipv6) ? 0 : 1;
if (get_first_interface_address(m, NULL, AF_INET6))
return 1;
return 0;
}
INLINE void callmaster_exclude_port(struct callmaster *m, u_int16_t p) {
/* XXX atomic bit field? */

@ -146,7 +146,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o
if (!c) {
ilog(LOG_WARNING, "["STR_FORMAT"] Got UDP LOOKUP for unknown call-id",
STR_FMT(&callid));
return str_sprintf("%s 0 " IPF "\n", out[RE_UDP_COOKIE], IPP(m->conf.ipv4));
return str_sprintf("%s 0 0.0.0.0\n", out[RE_UDP_COOKIE]);
}
monologue = call_get_mono_dialogue(c, &fromtag, &totag);
if (!monologue)

@ -33,34 +33,39 @@
#define die(x...) do { fprintf(stderr, x); exit(-1); } while(0)
#define die(x...) do { \
fprintf(stderr, x); \
fprintf(stderr, "\n"); \
ilog(LOG_CRIT, x); \
exit(-1); \
} while(0)
#define dlresolve(n) do { \
n ## _mod = dlsym(dlh, "mod_" #n); \
if (!n ## _mod) \
die("Failed to resolve symbol from plugin: %s\n", "mod_" #n); \
die("Failed to resolve symbol from plugin: %s", "mod_" #n); \
} while(0)
#define check_struct_size(x) do { \
unsigned long *uip; \
uip = dlsym(dlh, "__size_struct_" #x); \
if (!uip) \
die("Failed to resolve symbol from plugin: %s\n", "__size_struct_" #x); \
die("Failed to resolve symbol from plugin: %s", "__size_struct_" #x); \
if (*uip != sizeof(struct x)) \
die("Struct size mismatch in plugin: %s\n", #x); \
die("Struct size mismatch in plugin: %s", #x); \
} while(0)
#define check_struct_offset(x,y) do { \
unsigned long *uip; \
uip = dlsym(dlh, "__offset_struct_" #x "_" #y); \
if (!uip) \
die("Failed to resolve symbol from plugin: %s\n", \
die("Failed to resolve symbol from plugin: %s", \
"__offset_struct_" #x "_" #y); \
if (*uip != (unsigned long) &(((struct x *) 0)->y)) \
die("Struct offset mismatch in plugin: %s->%s\n", #x, #y); \
die("Struct offset mismatch in plugin: %s->%s", #x, #y); \
uip = dlsym(dlh, "__size_struct_" #x "_" #y); \
if (!uip) \
die("Failed to resolve symbol from plugin: %s\n", \
die("Failed to resolve symbol from plugin: %s", \
"__size_struct_" #x "_" #y); \
if (*uip != sizeof(((struct x *) 0)->y)) \
die("Struct member size mismatch in plugin: %s->%s\n", #x, #y); \
die("Struct member size mismatch in plugin: %s->%s", #x, #y); \
} while(0)
@ -77,10 +82,7 @@ static int global_shutdown;
static char *pidfile;
static gboolean foreground;
static u_int32_t ipv4;
static u_int32_t adv_ipv4;
static struct in6_addr ipv6;
static struct in6_addr adv_ipv6;
static GQueue interfaces = G_QUEUE_INIT;
static u_int32_t listenp;
static u_int16_t listenport;
static struct in6_addr udp_listenp;
@ -254,11 +256,51 @@ static void print_available_log_facilities () {
}
static struct interface_address *if_addr_parse(char *s) {
str name;
char *c;
struct in6_addr addr, adv;
struct interface_address *ifa;
/* name */
c = strchr(s, '/');
if (c) {
*c++ = 0;
str_init(&name, s);
s = c;
}
else
str_init(&name, "default");
/* advertised address */
c = strchr(s, '!');
if (c)
*c++ = 0;
/* address */
if (pton_46(&addr, s))
return NULL;
adv = addr;
if (c) {
if (pton_46(&adv, c))
return NULL;
}
ifa = g_slice_alloc(sizeof(*ifa));
ifa->interface_name = name;
ifa->addr = addr;
ifa->advertised = adv;
return ifa;
}
static void options(int *argc, char ***argv) {
char *ipv4s = NULL;
char *adv_ipv4s = NULL;
char *ipv6s = NULL;
char *adv_ipv6s = NULL;
char **if_a = NULL;
char **iter;
struct interface_address *ifa;
char *listenps = NULL;
char *listenudps = NULL;
char *listenngs = NULL;
@ -270,10 +312,7 @@ static void options(int *argc, char ***argv) {
{ "version", 'v', 0, G_OPTION_ARG_NONE, &version, "Print build time and exit", NULL },
{ "table", 't', 0, G_OPTION_ARG_INT, &table, "Kernel table to use", "INT" },
{ "no-fallback",'F', 0, G_OPTION_ARG_NONE, &no_fallback, "Only start when kernel module is available", NULL },
{ "ip", 'i', 0, G_OPTION_ARG_STRING, &ipv4s, "Local IPv4 address for RTP", "IP" },
{ "advertised-ip", 'a', 0, G_OPTION_ARG_STRING, &adv_ipv4s, "IPv4 address to advertise", "IP" },
{ "ip6", 'I', 0, G_OPTION_ARG_STRING, &ipv6s, "Local IPv6 address for RTP", "IP6" },
{ "advertised-ip6",'A',0,G_OPTION_ARG_STRING, &adv_ipv6s, "IPv6 address to advertise", "IP6" },
{ "interface", 'i', 0, G_OPTION_ARG_STRING_ARRAY,&if_a, "Local interface for RTP", "[NAME/]IP[!IP]"},
{ "listen-tcp", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" },
{ "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" },
{ "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" },
@ -287,10 +326,10 @@ static void options(int *argc, char ***argv) {
{ "redis", 'r', 0, G_OPTION_ARG_STRING, &redisps, "Connect to Redis database", "IP:PORT" },
{ "redis-db", 'R', 0, G_OPTION_ARG_INT, &redis_db, "Which Redis DB to use", "INT" },
{ "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" },
{ "log-level", 'L', 0, G_OPTION_ARG_INT, (void *)&log_level, "Mask log priorities above this level", "INT" },
{ "log-facility", 0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"},
{ "log-level", 'L', 0, G_OPTION_ARG_INT, (void *)&log_level,"Mask log priorities above this level","INT" },
{ "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"},
{ "log-stderr", 'E', 0, G_OPTION_ARG_NONE, &_log_stderr, "Log on stderr instead of syslog", NULL },
{ "xmlrpc-format", 'x', 0, G_OPTION_ARG_INT, &xmlrpc_fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only", "INT" },
{ "xmlrpc-format",'x', 0, G_OPTION_ARG_INT, &xmlrpc_fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only", "INT" },
{ NULL, }
};
@ -300,50 +339,38 @@ static void options(int *argc, char ***argv) {
c = g_option_context_new(" - next-generation media proxy");
g_option_context_add_main_entries(c, e, NULL);
if (!g_option_context_parse(c, argc, argv, &er))
die("Bad command line: %s\n", er->message);
die("Bad command line: %s", er->message);
if (version)
die("%s\n", RTPENGINE_VERSION);
die("%s", RTPENGINE_VERSION);
if (!ipv4s)
die("Missing option --ip\n");
if (!if_a)
die("Missing option --interface");
if (!listenps && !listenudps && !listenngs)
die("Missing option --listen-tcp, --listen-udp or --listen-ng\n");
die("Missing option --listen-tcp, --listen-udp or --listen-ng");
ipv4 = inet_addr(ipv4s);
if (ipv4 == -1)
die("Invalid IPv4 address (--ip)\n");
if (adv_ipv4s) {
adv_ipv4 = inet_addr(adv_ipv4s);
if (adv_ipv4 == -1)
die("Invalid IPv4 address (--advertised-ip)\n");
}
if (ipv6s) {
if (smart_pton(AF_INET6, ipv6s, &ipv6) != 1)
die("Invalid IPv6 address (--ip6)\n");
}
if (adv_ipv6s) {
if (smart_pton(AF_INET6, adv_ipv6s, &adv_ipv6) != 1)
die("Invalid IPv6 address (--advertised-ip6)\n");
for (iter = if_a; *iter; iter++) {
ifa = if_addr_parse(*iter);
if (!ifa)
die("Invalid interface specification: %s", *iter);
g_queue_push_tail(&interfaces, ifa);
}
if (listenps) {
if (parse_ip_port(&listenp, &listenport, listenps))
die("Invalid IP or port (--listen-tcp)\n");
die("Invalid IP or port (--listen-tcp)");
}
if (listenudps) {
if (parse_ip6_port(&udp_listenp, &udp_listenport, listenudps))
die("Invalid IP or port (--listen-udp)\n");
die("Invalid IP or port (--listen-udp)");
}
if (listenngs) {
if (parse_ip6_port(&ng_listenp, &ng_listenport, listenngs))
die("Invalid IP or port (--listen-ng)\n");
die("Invalid IP or port (--listen-ng)");
}
if (tos < 0 || tos > 255)
die("Invalid TOS value\n");
die("Invalid TOS value");
if (timeout <= 0)
timeout = 60;
@ -352,17 +379,16 @@ static void options(int *argc, char ***argv) {
if (redisps) {
if (parse_ip_port(&redis_ip, &redis_port, redisps) || !redis_ip)
die("Invalid IP or port (--redis)\n");
die("Invalid IP or port (--redis)");
if (redis_db < 0)
die("Must specify Redis DB number (--redis-db) when using Redis\n");
die("Must specify Redis DB number (--redis-db) when using Redis");
}
if (xmlrpc_fmt < 0 || xmlrpc_fmt > 1) {
die("Invalid XMLRPC format\n");
}
if (xmlrpc_fmt > 1)
die("Invalid XMLRPC format");
if ((log_level < LOG_EMERG) || (log_level > LOG_DEBUG))
die("Invalid log level (--log_level)\n");
die("Invalid log level (--log_level)");
setlogmask(LOG_UPTO(log_level));
if (log_facility_s) {
@ -502,21 +528,18 @@ void create_everything(struct main_context *ctx) {
no_kernel:
ctx->p = poller_new();
if (!ctx->p)
die("poller creation failed\n");
die("poller creation failed");
ctx->m = callmaster_new(ctx->p);
if (!ctx->m)
die("callmaster creation failed\n");
die("callmaster creation failed");
dtls_timer(ctx->p);
ZERO(mc);
mc.kernelfd = kfd;
mc.kernelid = table;
mc.ipv4 = ipv4;
mc.adv_ipv4 = adv_ipv4;
mc.ipv6 = ipv6;
mc.adv_ipv6 = adv_ipv6;
mc.interfaces = &interfaces;
mc.port_min = port_min;
mc.port_max = port_max;
mc.timeout = timeout;
@ -529,7 +552,7 @@ no_kernel:
if (listenport) {
ct = control_tcp_new(ctx->p, listenp, listenport, ctx->m);
if (!ct)
die("Failed to open TCP control connection port\n");
die("Failed to open TCP control connection port");
}
cu = NULL;
@ -537,7 +560,7 @@ no_kernel:
callmaster_exclude_port(ctx->m, udp_listenport);
cu = control_udp_new(ctx->p, udp_listenp, udp_listenport, ctx->m);
if (!cu)
die("Failed to open UDP control connection port\n");
die("Failed to open UDP control connection port");
}
cn = NULL;
@ -545,7 +568,7 @@ no_kernel:
callmaster_exclude_port(ctx->m, ng_listenport);
cn = control_ng_new(ctx->p, ng_listenp, ng_listenport, ctx->m);
if (!cn)
die("Failed to open UDP control connection port\n");
die("Failed to open UDP control connection port");
}
if (redis_ip) {
@ -554,24 +577,25 @@ no_kernel:
&& g_file_test("../../rtpengine-redis/redis.so", G_FILE_TEST_IS_REGULAR))
dlh = dlopen("../../rtpengine-redis/redis.so", RTLD_NOW | RTLD_GLOBAL);
if (!dlh)
die("Failed to open redis plugin, aborting (%s)\n", dlerror());
die("Failed to open redis plugin, aborting (%s)", dlerror());
strp = dlsym(dlh, "__module_version");
if (!strp || !*strp || strcmp(*strp, REDIS_MODULE_VERSION))
die("Incorrect redis module version: %s\n", *strp);
die("Incorrect redis module version: %s", *strp);
redis_mod_verify(dlh);
mc.redis = redis_new_mod(redis_ip, redis_port, redis_db);
if (!mc.redis)
die("Cannot start up without Redis database\n");
die("Cannot start up without Redis database");
}
ctx->m->conf = mc;
callmaster_config_init(ctx->m);
if (!foreground)
daemonize();
wpidfile();
if (redis_restore(ctx->m, mc.redis))
die("Refusing to continue without working Redis database\n");
die("Refusing to continue without working Redis database");
}
static void timer_loop(void *d) {

Loading…
Cancel
Save