mirror of https://github.com/sipwise/rtpengine.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
566 lines
14 KiB
566 lines
14 KiB
#include "homer.h"
|
|
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#include <unistd.h>
|
|
#include <errno.h>
|
|
#include <time.h>
|
|
#include <string.h>
|
|
#include <glib.h>
|
|
#include <sys/time.h>
|
|
|
|
#include "log.h"
|
|
#include "helpers.h"
|
|
#include "str.h"
|
|
#include "types.h"
|
|
|
|
#define SEND_QUEUE_LIMIT 200
|
|
|
|
TYPED_GQUEUE(gstring, GString)
|
|
|
|
struct homer_sender {
|
|
mutex_t lock;
|
|
|
|
endpoint_t endpoint;
|
|
int protocol;
|
|
int capture_id;
|
|
socket_t socket;
|
|
time_t retry;
|
|
|
|
gstring_q send_queue;
|
|
GString *partial;
|
|
|
|
int (*state)(struct homer_sender *);
|
|
};
|
|
|
|
|
|
|
|
static struct homer_sender *main_homer_sender;
|
|
|
|
|
|
|
|
|
|
static int send_hepv3 (GString *s, const str *id, int, const endpoint_t *src, const endpoint_t *dst,
|
|
const struct timeval *, int hep_capture_proto);
|
|
|
|
// state handlers
|
|
static int __established(struct homer_sender *hs);
|
|
static int __in_progress(struct homer_sender *hs);
|
|
static int __no_socket(struct homer_sender *hs);
|
|
|
|
|
|
|
|
|
|
static void __reset(struct homer_sender *hs) {
|
|
close_socket(&hs->socket);
|
|
hs->state = __no_socket;
|
|
hs->retry = time(NULL) + 30;
|
|
|
|
// discard partially written packet
|
|
if (hs->partial)
|
|
g_string_free(hs->partial, TRUE);
|
|
hs->partial = NULL;
|
|
}
|
|
|
|
static int __attempt_send(struct homer_sender *hs, GString *gs) {
|
|
int ret;
|
|
|
|
ret = write(hs->socket.fd, gs->str, gs->len);
|
|
if (ret == gs->len) {
|
|
// full write
|
|
g_string_free(gs, TRUE);
|
|
return 0;
|
|
}
|
|
if (ret < 0) {
|
|
if (errno != EWOULDBLOCK && errno != EAGAIN) {
|
|
ilog(LOG_ERR, "Write error to Homer at %s: %s",
|
|
endpoint_print_buf(&hs->endpoint), strerror(errno));
|
|
__reset(hs);
|
|
return 1;
|
|
}
|
|
ilog(LOG_DEBUG, "Home write blocked");
|
|
// XXX use poller for blocked writes?
|
|
return 2;
|
|
}
|
|
// partial write
|
|
ilog(LOG_DEBUG, "Home write blocked (partial write)");
|
|
g_string_erase(gs, 0, ret);
|
|
return 3;
|
|
}
|
|
|
|
static int __established(struct homer_sender *hs) {
|
|
char buf[16];
|
|
int ret;
|
|
GString *gs;
|
|
|
|
// test connection with a dummy read
|
|
ret = read(hs->socket.fd, buf, sizeof(buf));
|
|
if (ret < 0) {
|
|
if (errno != EWOULDBLOCK && errno != EAGAIN) {
|
|
ilog(LOG_ERR, "Connection error from Homer at %s: %s",
|
|
endpoint_print_buf(&hs->endpoint), strerror(errno));
|
|
__reset(hs);
|
|
return -1;
|
|
}
|
|
}
|
|
// XXX handle return data from Homer?
|
|
|
|
if (hs->partial) {
|
|
ilog(LOG_DEBUG, "dequeue partial packet to Homer");
|
|
ret = __attempt_send(hs, hs->partial);
|
|
if (ret == 3 || ret == 2) // partial write or not sent at all
|
|
return 0;
|
|
if (ret == 1) // write error, takes care of deleting hs->partial
|
|
return -1;
|
|
// ret == 0 -> sent OK, drop through to unqueue
|
|
hs->partial = NULL;
|
|
}
|
|
|
|
// unqueue as much as we can
|
|
while ((gs = t_queue_pop_head(&hs->send_queue))) {
|
|
ilog(LOG_DEBUG, "dequeue send queue to Homer");
|
|
ret = __attempt_send(hs, gs);
|
|
if (ret == 0) // everything sent OK
|
|
continue;
|
|
if (ret == 3) { // partial write
|
|
hs->partial = gs;
|
|
return 0;
|
|
}
|
|
t_queue_push_head(&hs->send_queue, gs);
|
|
if (ret == 1) // write error
|
|
return -1;
|
|
// ret == 2 -> blocked
|
|
return 0;
|
|
}
|
|
|
|
// everything unqueued
|
|
return 0;
|
|
}
|
|
|
|
static int __check_conn(struct homer_sender *hs, int ret) {
|
|
if (ret == 0) {
|
|
ilog(LOG_INFO, "Connection to Homer at %s has been established",
|
|
endpoint_print_buf(&hs->endpoint));
|
|
hs->state = __established;
|
|
return hs->state(hs);
|
|
}
|
|
if (ret == 1) {
|
|
ilog(LOG_DEBUG, "connection to Homer is in progress");
|
|
hs->state = __in_progress;
|
|
return 0;
|
|
}
|
|
|
|
ilog(LOG_ERR, "Failed to connect to Homer at %s: %s",
|
|
endpoint_print_buf(&hs->endpoint), strerror(errno));
|
|
|
|
__reset(hs);
|
|
return -1;
|
|
}
|
|
|
|
static int __in_progress(struct homer_sender *hs) {
|
|
int ret;
|
|
|
|
ilog(LOG_DEBUG, "connection to Homer is in progress - checking");
|
|
|
|
ret = connect_socket_retry(&hs->socket);
|
|
return __check_conn(hs, ret);
|
|
}
|
|
|
|
static int __no_socket(struct homer_sender *hs) {
|
|
int ret;
|
|
|
|
if (hs->retry > time(NULL))
|
|
return 0;
|
|
|
|
ilog(LOG_INFO, "Connecting to Homer at %s", endpoint_print_buf(&hs->endpoint));
|
|
|
|
ret = connect_socket_nb(&hs->socket, hs->protocol, &hs->endpoint);
|
|
return __check_conn(hs, ret);
|
|
}
|
|
|
|
void homer_sender_init(const endpoint_t *ep, int protocol, int capture_id) {
|
|
struct homer_sender *ret;
|
|
|
|
if (is_addr_unspecified(&ep->address))
|
|
return;
|
|
if (main_homer_sender)
|
|
return;
|
|
|
|
ret = malloc(sizeof(*ret));
|
|
ZERO(*ret);
|
|
mutex_init(&ret->lock);
|
|
ret->endpoint = *ep;
|
|
ret->protocol = protocol;
|
|
ret->capture_id = capture_id;
|
|
ret->retry = time(NULL);
|
|
|
|
ret->state = __no_socket;
|
|
|
|
main_homer_sender = ret;
|
|
|
|
return;
|
|
}
|
|
|
|
// takes over the GString
|
|
int homer_send(GString *s, const str *id, const endpoint_t *src,
|
|
const endpoint_t *dst, const struct timeval *tv, int hep_capture_proto)
|
|
{
|
|
if (!main_homer_sender)
|
|
goto out;
|
|
if (!s)
|
|
goto out;
|
|
if (!s->len) // empty write, shouldn't happen
|
|
goto out;
|
|
|
|
ilog(LOG_DEBUG, "JSON to send to Homer: '"STR_FORMAT"'", G_STR_FMT(s));
|
|
|
|
if (send_hepv3(s, id, main_homer_sender->capture_id, src, dst, tv, hep_capture_proto))
|
|
goto out;
|
|
|
|
mutex_lock(&main_homer_sender->lock);
|
|
if (main_homer_sender->send_queue.length < SEND_QUEUE_LIMIT) {
|
|
t_queue_push_tail(&main_homer_sender->send_queue, s);
|
|
s = NULL;
|
|
}
|
|
else
|
|
ilog(LOG_ERR, "Send queue length limit (%i) reached, dropping Homer message", SEND_QUEUE_LIMIT);
|
|
main_homer_sender->state(main_homer_sender);
|
|
mutex_unlock(&main_homer_sender->lock);
|
|
|
|
out:
|
|
if (s)
|
|
g_string_free(s, TRUE);
|
|
return 0;
|
|
}
|
|
|
|
|
|
|
|
|
|
// from captagent transport_hep.[ch]
|
|
|
|
struct hep_chunk {
|
|
uint16_t vendor_id;
|
|
uint16_t type_id;
|
|
uint16_t length;
|
|
} __attribute__((packed));
|
|
|
|
typedef struct hep_chunk hep_chunk_t;
|
|
|
|
struct hep_chunk_uint8 {
|
|
hep_chunk_t chunk;
|
|
uint8_t data;
|
|
} __attribute__((packed));
|
|
|
|
typedef struct hep_chunk_uint8 hep_chunk_uint8_t;
|
|
|
|
struct hep_chunk_uint16 {
|
|
hep_chunk_t chunk;
|
|
uint16_t data;
|
|
} __attribute__((packed));
|
|
|
|
typedef struct hep_chunk_uint16 hep_chunk_uint16_t;
|
|
|
|
struct hep_chunk_uint32 {
|
|
hep_chunk_t chunk;
|
|
uint32_t data;
|
|
} __attribute__((packed));
|
|
|
|
typedef struct hep_chunk_uint32 hep_chunk_uint32_t;
|
|
|
|
struct hep_chunk_str {
|
|
hep_chunk_t chunk;
|
|
char *data;
|
|
} __attribute__((packed));
|
|
|
|
typedef struct hep_chunk_str hep_chunk_str_t;
|
|
|
|
struct hep_chunk_ip4 {
|
|
hep_chunk_t chunk;
|
|
struct in_addr data;
|
|
} __attribute__((packed));
|
|
|
|
typedef struct hep_chunk_ip4 hep_chunk_ip4_t;
|
|
|
|
struct hep_chunk_ip6 {
|
|
hep_chunk_t chunk;
|
|
struct in6_addr data;
|
|
} __attribute__((packed));
|
|
|
|
typedef struct hep_chunk_ip6 hep_chunk_ip6_t;
|
|
|
|
struct hep_ctrl {
|
|
char id[4];
|
|
uint16_t length;
|
|
} __attribute__((packed));
|
|
|
|
typedef struct hep_ctrl hep_ctrl_t;
|
|
|
|
struct hep_chunk_payload {
|
|
hep_chunk_t chunk;
|
|
char *data;
|
|
} __attribute__((packed));
|
|
|
|
typedef struct hep_chunk_payload hep_chunk_payload_t;
|
|
|
|
/* Structure of HEP */
|
|
|
|
struct hep_generic {
|
|
hep_ctrl_t header;
|
|
hep_chunk_uint8_t ip_family;
|
|
hep_chunk_uint8_t ip_proto;
|
|
hep_chunk_uint16_t src_port;
|
|
hep_chunk_uint16_t dst_port;
|
|
hep_chunk_uint32_t time_sec;
|
|
hep_chunk_uint32_t time_usec;
|
|
hep_chunk_uint8_t proto_t;
|
|
hep_chunk_uint32_t capt_id;
|
|
} __attribute__((packed));
|
|
|
|
typedef struct hep_generic hep_generic_t;
|
|
|
|
// modifies the GString in place
|
|
static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t *src, const endpoint_t *dst,
|
|
const struct timeval *tv, int hep_capture_proto)
|
|
{
|
|
|
|
struct hep_generic *hg=NULL;
|
|
void* buffer;
|
|
unsigned int buflen=0, iplen=0,tlen=0;
|
|
hep_chunk_ip4_t src_ip4, dst_ip4;
|
|
hep_chunk_ip6_t src_ip6, dst_ip6;
|
|
hep_chunk_t payload_chunk;
|
|
//hep_chunk_t authkey_chunk;
|
|
hep_chunk_t correlation_chunk;
|
|
//static int errors = 0;
|
|
|
|
hg = malloc(sizeof(struct hep_generic));
|
|
memset(hg, 0, sizeof(struct hep_generic));
|
|
|
|
|
|
/* header set */
|
|
memcpy(hg->header.id, "\x48\x45\x50\x33", 4);
|
|
|
|
/* IP proto */
|
|
hg->ip_family.chunk.vendor_id = htons(0x0000);
|
|
hg->ip_family.chunk.type_id = htons(0x0001);
|
|
hg->ip_family.data = src->address.family->af;
|
|
hg->ip_family.chunk.length = htons(sizeof(hg->ip_family));
|
|
|
|
/* Proto ID */
|
|
hg->ip_proto.chunk.vendor_id = htons(0x0000);
|
|
hg->ip_proto.chunk.type_id = htons(0x0002);
|
|
hg->ip_proto.data = IPPROTO_UDP;
|
|
hg->ip_proto.chunk.length = htons(sizeof(hg->ip_proto));
|
|
|
|
|
|
/* IPv4 */
|
|
if(hg->ip_family.data == AF_INET) {
|
|
/* SRC IP */
|
|
src_ip4.chunk.vendor_id = htons(0x0000);
|
|
src_ip4.chunk.type_id = htons(0x0003);
|
|
src_ip4.data = src->address.ipv4;
|
|
src_ip4.chunk.length = htons(sizeof(src_ip4));
|
|
|
|
/* DST IP */
|
|
dst_ip4.chunk.vendor_id = htons(0x0000);
|
|
dst_ip4.chunk.type_id = htons(0x0004);
|
|
dst_ip4.data = dst->address.ipv4;
|
|
dst_ip4.chunk.length = htons(sizeof(dst_ip4));
|
|
|
|
iplen = sizeof(dst_ip4) + sizeof(src_ip4);
|
|
}
|
|
/* IPv6 */
|
|
else if(hg->ip_family.data == AF_INET6) {
|
|
/* SRC IPv6 */
|
|
src_ip6.chunk.vendor_id = htons(0x0000);
|
|
src_ip6.chunk.type_id = htons(0x0005);
|
|
src_ip6.data = src->address.ipv6;
|
|
src_ip6.chunk.length = htons(sizeof(src_ip6));
|
|
|
|
/* DST IPv6 */
|
|
dst_ip6.chunk.vendor_id = htons(0x0000);
|
|
dst_ip6.chunk.type_id = htons(0x0006);
|
|
dst_ip6.data = dst->address.ipv6;
|
|
dst_ip6.chunk.length = htons(sizeof(dst_ip6));
|
|
|
|
iplen = sizeof(dst_ip6) + sizeof(src_ip6);
|
|
}
|
|
|
|
/* SRC PORT */
|
|
hg->src_port.chunk.vendor_id = htons(0x0000);
|
|
hg->src_port.chunk.type_id = htons(0x0007);
|
|
hg->src_port.data = htons(src->port);
|
|
hg->src_port.chunk.length = htons(sizeof(hg->src_port));
|
|
|
|
/* DST PORT */
|
|
hg->dst_port.chunk.vendor_id = htons(0x0000);
|
|
hg->dst_port.chunk.type_id = htons(0x0008);
|
|
hg->dst_port.data = htons(dst->port);
|
|
hg->dst_port.chunk.length = htons(sizeof(hg->dst_port));
|
|
|
|
|
|
/* TIMESTAMP SEC */
|
|
hg->time_sec.chunk.vendor_id = htons(0x0000);
|
|
hg->time_sec.chunk.type_id = htons(0x0009);
|
|
hg->time_sec.data = htonl(tv->tv_sec);
|
|
hg->time_sec.chunk.length = htons(sizeof(hg->time_sec));
|
|
|
|
|
|
/* TIMESTAMP USEC */
|
|
hg->time_usec.chunk.vendor_id = htons(0x0000);
|
|
hg->time_usec.chunk.type_id = htons(0x000a);
|
|
hg->time_usec.data = htonl(tv->tv_usec);
|
|
hg->time_usec.chunk.length = htons(sizeof(hg->time_usec));
|
|
|
|
/* Protocol TYPE */
|
|
hg->proto_t.chunk.vendor_id = htons(0x0000);
|
|
hg->proto_t.chunk.type_id = htons(0x000b);
|
|
hg->proto_t.data = hep_capture_proto;
|
|
hg->proto_t.chunk.length = htons(sizeof(hg->proto_t));
|
|
|
|
/* Capture ID */
|
|
hg->capt_id.chunk.vendor_id = htons(0x0000);
|
|
hg->capt_id.chunk.type_id = htons(0x000c);
|
|
hg->capt_id.data = htonl(capt_id);
|
|
hg->capt_id.chunk.length = htons(sizeof(hg->capt_id));
|
|
|
|
/* Payload */
|
|
payload_chunk.vendor_id = htons(0x0000);
|
|
payload_chunk.type_id = 0 ? htons(0x0010) : htons(0x000f);
|
|
payload_chunk.length = htons(sizeof(payload_chunk) + s->len);
|
|
|
|
tlen = sizeof(struct hep_generic) + s->len + iplen + sizeof(hep_chunk_t);
|
|
|
|
#if 0
|
|
/* auth key */
|
|
if(profile_transport[idx].capt_password != NULL) {
|
|
|
|
tlen += sizeof(hep_chunk_t);
|
|
/* Auth key */
|
|
authkey_chunk.vendor_id = htons(0x0000);
|
|
authkey_chunk.type_id = htons(0x000e);
|
|
authkey_chunk.length = htons(sizeof(authkey_chunk) + strlen(profile_transport[idx].capt_password));
|
|
tlen += strlen(profile_transport[idx].capt_password);
|
|
}
|
|
#endif
|
|
|
|
/* correlation key */
|
|
//if(rcinfo->correlation_id.s && rcinfo->correlation_id.len > 0) {
|
|
|
|
tlen += sizeof(hep_chunk_t);
|
|
/* Correlation key */
|
|
correlation_chunk.vendor_id = htons(0x0000);
|
|
correlation_chunk.type_id = htons(0x0011);
|
|
correlation_chunk.length = htons(sizeof(correlation_chunk) + id->len);
|
|
tlen += id->len;
|
|
//}
|
|
|
|
/* total */
|
|
hg->header.length = htons(tlen);
|
|
|
|
buffer = (void*)malloc(tlen);
|
|
if (buffer==0){
|
|
ilog(LOG_ERR, "ERROR: out of memory");
|
|
free(hg);
|
|
return -1;
|
|
}
|
|
|
|
memcpy((void*) buffer, hg, sizeof(struct hep_generic));
|
|
buflen = sizeof(struct hep_generic);
|
|
|
|
/* IPv4 */
|
|
if(hg->ip_family.data == AF_INET) {
|
|
/* SRC IP */
|
|
memcpy((void*) buffer+buflen, &src_ip4, sizeof(struct hep_chunk_ip4));
|
|
buflen += sizeof(struct hep_chunk_ip4);
|
|
|
|
memcpy((void*) buffer+buflen, &dst_ip4, sizeof(struct hep_chunk_ip4));
|
|
buflen += sizeof(struct hep_chunk_ip4);
|
|
}
|
|
/* IPv6 */
|
|
else if(hg->ip_family.data == AF_INET6) {
|
|
/* SRC IPv6 */
|
|
memcpy((void*) buffer+buflen, &src_ip6, sizeof(struct hep_chunk_ip6));
|
|
buflen += sizeof(struct hep_chunk_ip6);
|
|
|
|
memcpy((void*) buffer+buflen, &dst_ip6, sizeof(struct hep_chunk_ip6));
|
|
buflen += sizeof(struct hep_chunk_ip6);
|
|
}
|
|
|
|
#if 0
|
|
/* AUTH KEY CHUNK */
|
|
if(profile_transport[idx].capt_password != NULL) {
|
|
|
|
memcpy((void*) buffer+buflen, &authkey_chunk, sizeof(struct hep_chunk));
|
|
buflen += sizeof(struct hep_chunk);
|
|
|
|
/* Now copying payload self */
|
|
memcpy((void*) buffer+buflen, profile_transport[idx].capt_password, strlen(profile_transport[idx].capt_password));
|
|
buflen+=strlen(profile_transport[idx].capt_password);
|
|
}
|
|
#endif
|
|
|
|
/* Correlation KEY CHUNK */
|
|
//if(rcinfo->correlation_id.s && rcinfo->correlation_id.len > 0) {
|
|
|
|
memcpy((void*) buffer+buflen, &correlation_chunk, sizeof(struct hep_chunk));
|
|
buflen += sizeof(struct hep_chunk);
|
|
|
|
/* Now copying payload self */
|
|
memcpy((void*) buffer+buflen, id->s, id->len);
|
|
buflen+= id->len;
|
|
//}
|
|
|
|
/* PAYLOAD CHUNK */
|
|
memcpy((void*) buffer+buflen, &payload_chunk, sizeof(struct hep_chunk));
|
|
buflen += sizeof(struct hep_chunk);
|
|
|
|
/* Now copying payload self */
|
|
memcpy((void*) buffer+buflen, s->str, s->len);
|
|
buflen+=s->len;
|
|
|
|
#if 0
|
|
/* make sleep after 100 errors */
|
|
if(errors > 50) {
|
|
LERR( "HEP server is down... retrying after sleep...");
|
|
if(!profile_transport[idx].usessl) {
|
|
sleep(2);
|
|
if(init_hepsocket_blocking(idx)) {
|
|
profile_transport[idx].initfails++;
|
|
}
|
|
|
|
errors=0;
|
|
}
|
|
#ifdef USE_SSL
|
|
else {
|
|
sleep(2);
|
|
|
|
if(initSSL(idx)) profile_transport[idx].initfails++;
|
|
|
|
errors=0;
|
|
}
|
|
#endif /* USE SSL */
|
|
|
|
}
|
|
|
|
/* send this packet out of our socket */
|
|
if(send_data(buffer, buflen, idx)) {
|
|
errors++;
|
|
stats.errors_total++;
|
|
}
|
|
#endif
|
|
|
|
g_string_truncate(s, 0);
|
|
g_string_append_len(s, buffer, buflen);
|
|
|
|
/* FREE */
|
|
free(buffer);
|
|
free(hg);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int has_homer(void) {
|
|
return main_homer_sender ? 1 : 0;
|
|
}
|