diff --git a/daemon/Makefile b/daemon/Makefile index 9843099a7..d76e8b132 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -38,7 +38,7 @@ include ../lib/lib.Makefile SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \ - media_socket.c rtcp_xr.c homer.c recording.c + media_socket.c rtcp_xr.c homer.c recording.c statistics.c cdr.c LIBSRCS= loglib.c auxlib.c rtplib.c OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) diff --git a/daemon/aux.h b/daemon/aux.h index ef9b142ef..842243a6d 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -627,5 +627,15 @@ INLINE void *__uid_slice_alloc0(unsigned int size, GQueue *q, unsigned int offse return ret; } +#define TRUNCATED " ... Output truncated. Increase Output Buffer ... \n" + +#define truncate_output(x) strcpy(x - strlen(TRUNCATED) - 1, TRUNCATED) + +#define ADJUSTLEN(printlen,outbufend,replybuffer) do { \ + replybuffer += (printlen>=outbufend-replybuffer)?outbufend-replybuffer:printlen; \ + if (replybuffer == outbufend) \ + truncate_output(replybuffer); \ + } while (0); + #endif diff --git a/daemon/call.c b/daemon/call.c index ffa91bc8b..2328039cc 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -38,7 +38,8 @@ #include "log_funcs.h" #include "recording.h" #include "rtplib.h" - +#include "cdr.h" +#include "statistics.h" /* also serves as array index for callstream->peers[] */ @@ -113,37 +114,8 @@ const struct transport_protocol transport_protocols[] = { }; const int num_transport_protocols = G_N_ELEMENTS(transport_protocols); -static const char * const __term_reason_texts[] = { - [TIMEOUT] = "TIMEOUT", - [REGULAR] = "REGULAR", - [FORCED] = "FORCED", - [SILENT_TIMEOUT] = "SILENT_TIMEOUT", - [FINAL_TIMEOUT] = "FINAL_TIMEOUT", -}; -static const char * const __tag_type_texts[] = { - [FROM_TAG] = "FROM_TAG", - [TO_TAG] = "TO_TAG", -}; -static const char *const __opmode_texts[] = { - [OP_OFFER] = "offer", - [OP_ANSWER] = "answer", -}; - -static const char * get_term_reason_text(enum termination_reason t) { - return get_enum_array_text(__term_reason_texts, t, "UNKNOWN"); -} -const char * get_tag_type_text(enum tag_type t) { - return get_enum_array_text(__tag_type_texts, t, "UNKNOWN"); -} -const char *get_opmode_text(enum call_opmode m) { - return get_enum_array_text(__opmode_texts, m, "other"); -} - - /* ********** */ - - static void __monologue_destroy(struct call_monologue *monologue); static int monologue_destroy(struct call_monologue *ml); @@ -1744,62 +1716,6 @@ error_intf: return ERROR_NO_FREE_LOGS; } -static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) { - struct timeval dp, oa; - - mutex_lock(&s->total_average_lock); - - // new average = ((old average * old num sessions) + datapoint) / new num sessions - // ... but this will overflow when num sessions becomes very large - - // timeval_multiply(&t, &s->total_average_call_dur, s->total_managed_sess); - // timeval_add(&t, &t, add); - // s->total_managed_sess++; - // timeval_divide(&s->total_average_call_dur, &t, s->total_managed_sess); - - // alternative: - // new average = old average + (datapoint / new num sessions) - (old average / new num sessions) - - s->total_managed_sess++; - timeval_divide(&dp, add, s->total_managed_sess); - timeval_divide(&oa, &s->total_average_call_dur, s->total_managed_sess); - timeval_add(&s->total_average_call_dur, &s->total_average_call_dur, &dp); - timeval_subtract(&s->total_average_call_dur, &s->total_average_call_dur, &oa); - - mutex_unlock(&s->total_average_lock); -} - -static void timeval_totalstats_interval_call_duration_add(struct totalstats *s, - struct timeval *call_start, struct timeval *call_stop, - struct timeval *interval_start, int interval_dur_s) { - - /* work with graphite interval start val which might be changed elsewhere in the code*/ - struct timeval real_iv_start = *interval_start; - struct timeval call_duration; - struct timeval *call_start_in_iv = call_start; - - /* in case graphite interval needs to be the previous one */ - if (timercmp(&real_iv_start, call_stop, >)) { - struct timeval graph_dur = { .tv_sec = interval_dur_s, .tv_usec = 0LL }; - timeval_subtract(&real_iv_start, interval_start, &graph_dur); - } - - if (timercmp(&real_iv_start, call_start, >)) - call_start_in_iv = &real_iv_start; - - /* this should never happen and is here for sanitization of output */ - if (timercmp(call_start_in_iv, call_stop, >)) { - ilog(LOG_ERR, "Call start seems to exceed call stop"); - return; - } - - timeval_subtract(&call_duration, call_stop, call_start_in_iv); - - mutex_lock(&s->total_calls_duration_lock); - timeval_add(&s->total_calls_duration_interval, - &s->total_calls_duration_interval, &call_duration); - mutex_unlock(&s->total_calls_duration_lock); -} static int __rtp_stats_sort(const void *ap, const void *bp) { const struct rtp_stats *a = ap, *b = bp; @@ -1812,7 +1728,7 @@ static int __rtp_stats_sort(const void *ap, const void *bp) { return 0; } -static const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m) { +const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m) { struct packet_stream *ps; GList *values; struct rtp_stats *rtp_s; @@ -1882,12 +1798,10 @@ struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, return res; } -#define CDRBUFREMAINDER cdrbufend-cdrbufcur - /* called lock-free, but must hold a reference to the call */ void call_destroy(struct call *c) { struct callmaster *m; - struct packet_stream *ps=0, *ps2=0; + struct packet_stream *ps=0; struct stream_fd *sfd; struct poller *p; GList *l; @@ -1895,14 +1809,6 @@ void call_destroy(struct call *c) { struct call_monologue *ml; struct call_media *md; GList *k, *o; - struct timeval tim_result_duration; - static const int CDRBUFLENGTH = 4096*2; - char cdrbuffer[CDRBUFLENGTH]; - char* cdrbufcur = cdrbuffer; - char* cdrbufend = cdrbuffer+CDRBUFLENGTH-1; - int cdrlinecnt = 0; - int printlen=0; - int found = 0; const struct rtp_payload_type *rtp_pt; if (!c) { @@ -1924,68 +1830,22 @@ void call_destroy(struct call *c) { obj_put(c); - if (IS_FOREIGN_CALL(c)) { - atomic64_dec(&m->stats.foreign_sessions); - } - if(!IS_FOREIGN_CALL(c)) { - mutex_lock(&m->totalstats_interval.managed_sess_lock); - m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min, - g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions)); - mutex_unlock(&m->totalstats_interval.managed_sess_lock); - } + statistics_update_foreignown_dec(c); - if (!IS_FOREIGN_CALL(c)) { + if (IS_OWN_CALL(c)) { redis_delete(c, m->conf.redis_write); } rwlock_lock_w(&c->master_lock); /* at this point, no more packet streams can be added */ - if (!IS_FOREIGN_CALL(c)) { + if (IS_OWN_CALL(c)) { ilog(LOG_INFO, "Final packet stats:"); - - /* CDRs and statistics */ - if (_log_facility_cdr) { - printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"ci=%s, ",c->callid.s); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"created_from=%s, ", c->created_from); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"last_signal=%llu, ", (unsigned long long)c->last_signal); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"tos=%u, ", (unsigned int)c->tos); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - } for (l = c->monologues.head; l; l = l->next) { ml = l->data; - - if (!ml->terminated.tv_sec) { - gettimeofday(&ml->terminated, NULL); - ml->term_reason = UNKNOWN; - } - - timeval_subtract(&tim_result_duration,&ml->terminated,&ml->started); - - if (_log_facility_cdr) { - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, - "ml%i_start_time=%ld.%06lu, " - "ml%i_end_time=%ld.%06ld, " - "ml%i_duration=%ld.%06ld, " - "ml%i_termination=%s, " - "ml%i_local_tag=%s, " - "ml%i_local_tag_type=%s, " - "ml%i_remote_tag=%s, ", - cdrlinecnt, ml->started.tv_sec, ml->started.tv_usec, - cdrlinecnt, ml->terminated.tv_sec, ml->terminated.tv_usec, - cdrlinecnt, tim_result_duration.tv_sec, tim_result_duration.tv_usec, - cdrlinecnt, get_term_reason_text(ml->term_reason), - cdrlinecnt, ml->tag.s, - cdrlinecnt, get_tag_type_text(ml->tagtype), - cdrlinecnt, ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)"); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - } - + ilog(LOG_INFO, "--- Tag '"STR_FORMAT"', created " "%u:%02u ago for branch '"STR_FORMAT"', in dialogue with '"STR_FORMAT"'", STR_FMT(&ml->tag), @@ -1994,10 +1854,10 @@ void call_destroy(struct call *c) { STR_FMT(&ml->viabranch), ml->active_dialogue ? ml->active_dialogue->tag.len : 6, ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)"); - + for (k = ml->medias.head; k; k = k->next) { md = k->data; - + rtp_pt = __rtp_stats_codec(md); #define MLL_PREFIX "------ Media #%u ("STR_FORMAT" over %s) using " /* media log line prefix */ #define MLL_COMMON /* common args */ \ @@ -2009,119 +1869,16 @@ void call_destroy(struct call *c) { else ilog(LOG_INFO, MLL_PREFIX STR_FORMAT, MLL_COMMON, STR_FMT(&rtp_pt->encoding_with_params)); - - /* add PayloadType(codec) info in CDR logging */ - if (_log_facility_cdr && rtp_pt) { - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=%u, ", rtp_pt->payload_type); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - } else if (_log_facility_cdr && !rtp_pt) { - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=unknown, "); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - } - + for (o = md->streams.head; o; o = o->next) { ps = o->data; - + if (PS_ISSET(ps, FALLBACK_RTCP)) continue; - + char *addr = sockaddr_print_buf(&ps->endpoint.address); char *local_addr = ps->selected_sfd ? sockaddr_print_buf(&ps->selected_sfd->socket.local.address) : "0.0.0.0"; - - if (_log_facility_cdr) { - const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp"; - - if(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) { - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, - "ml%i_midx%u_%s_endpoint_ip=%s, " - "ml%i_midx%u_%s_endpoint_port=%u, " - "ml%i_midx%u_%s_local_relay_ip=%s, " - "ml%i_midx%u_%s_local_relay_port=%u, " - "ml%i_midx%u_%s_relayed_packets="UINT64F", " - "ml%i_midx%u_%s_relayed_bytes="UINT64F", " - "ml%i_midx%u_%s_relayed_errors="UINT64F", " - "ml%i_midx%u_%s_last_packet="UINT64F", " - "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ", - cdrlinecnt, md->index, protocol, addr, - cdrlinecnt, md->index, protocol, ps->endpoint.port, - cdrlinecnt, md->index, protocol, local_addr, - cdrlinecnt, md->index, protocol, - (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.packets), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.bytes), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.errors), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->last_packet), - cdrlinecnt, md->index, protocol, - ps->stats.in_tos_tclass); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); - } else { -#if (RE_HAS_MEASUREDELAY) - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, - "ml%i_midx%u_%s_endpoint_ip=%s, " - "ml%i_midx%u_%s_endpoint_port=%u, " - "ml%i_midx%u_%s_local_relay_ip=%s, " - "ml%i_midx%u_%s_local_relay_port=%u, " - "ml%i_midx%u_%s_relayed_packets="UINT64F", " - "ml%i_midx%u_%s_relayed_bytes="UINT64F", " - "ml%i_midx%u_%s_relayed_errors="UINT64F", " - "ml%i_midx%u_%s_last_packet="UINT64F", " - "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", " - "ml%i_midx%u_%s_delay_min=%.9f, " - "ml%i_midx%u_%s_delay_avg=%.9f, " - "ml%i_midx%u_%s_delay_max=%.9f, ", - cdrlinecnt, md->index, protocol, addr, - cdrlinecnt, md->index, protocol, ps->endpoint.port, - cdrlinecnt, md->index, protocol, local_addr, - cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.packets), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.bytes), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.errors), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->last_packet), - cdrlinecnt, md->index, protocol, - ps->stats.in_tos_tclass, - cdrlinecnt, md->index, protocol, (double) ps->stats.delay_min / 1000000, - cdrlinecnt, md->index, protocol, (double) ps->stats.delay_avg / 1000000, - cdrlinecnt, md->index, protocol, (double) ps->stats.delay_max / 1000000); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); -#else - printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, - "ml%i_midx%u_%s_endpoint_ip=%s, " - "ml%i_midx%u_%s_endpoint_port=%u, " - "ml%i_midx%u_%s_local_relay_ip=%s, " - "ml%i_midx%u_%s_local_relay_port=%u, " - "ml%i_midx%u_%s_relayed_packets="UINT64F", " - "ml%i_midx%u_%s_relayed_bytes="UINT64F", " - "ml%i_midx%u_%s_relayed_errors="UINT64F", " - "ml%i_midx%u_%s_last_packet="UINT64F", " - "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ", - cdrlinecnt, md->index, protocol, addr, - cdrlinecnt, md->index, protocol, ps->endpoint.port, - cdrlinecnt, md->index, protocol, local_addr, - cdrlinecnt, md->index, protocol, - (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.packets), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.bytes), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->stats.errors), - cdrlinecnt, md->index, protocol, - atomic64_get(&ps->last_packet), - cdrlinecnt, md->index, protocol, - ps->stats.in_tos_tclass); - ADJUSTLEN(printlen,cdrbufend,cdrbufcur); -#endif - } - } - + ilog(LOG_INFO, "--------- Port %15s:%-5u <> %15s:%-5u%s, " ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet", local_addr, (unsigned int) (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), @@ -2132,117 +1889,18 @@ void call_destroy(struct call *c) { atomic64_get(&ps->stats.errors), atomic64_get(&ps->last_packet)); - - - atomic64_add(&m->totalstats.total_relayed_packets, - atomic64_get(&ps->stats.packets)); - atomic64_add(&m->totalstats_interval.total_relayed_packets, - atomic64_get(&ps->stats.packets)); - atomic64_add(&m->totalstats.total_relayed_errors, - atomic64_get(&ps->stats.errors)); - atomic64_add(&m->totalstats_interval.total_relayed_errors, - atomic64_get(&ps->stats.errors)); - } - - ice_shutdown(&md->ice_agent); - } - if (_log_facility_cdr) - ++cdrlinecnt; - } - } - // --- for statistics getting one way stream or no relay at all - int total_nopacket_relayed_sess = 0; - - for (l = c->monologues.head; l; l = l->next) { - ml = l->data; - - // --- go through partner ml and search the RTP - for (k = ml->medias.head; k; k = k->next) { - md = k->data; - - for (o = md->streams.head; o; o = o->next) { - ps = o->data; - if ((PS_ISSET(ps, RTP) && !PS_ISSET(ps, RTCP))) { - // --- only RTP is interesting - found = 1; - break; - } - } - if (found) { break; } - } - found = 0; - - if (ml->active_dialogue) { - // --- go through partner ml and search the RTP - for (k = ml->active_dialogue->medias.head; k; k = k->next) { - md = k->data; - - for (o = md->streams.head; o; o = o->next) { - ps2 = o->data; - if ((PS_ISSET(ps2, RTP) && !PS_ISSET(ps2, RTCP))) { - // --- only RTP is interesting - found = 1; - break; - } - } - if (found) { break; } - } - } + statistics_update_totals(m,ps); - if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) { - if (atomic64_get(&ps->stats.packets)!=0 && !IS_FOREIGN_CALL(c)){ - if (atomic64_get(&ps->stats.packets)!=0) { - atomic64_inc(&m->totalstats.total_oneway_stream_sess); - atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess); } - } - else { - total_nopacket_relayed_sess++; - } - } - } - if (!IS_FOREIGN_CALL(c)) { - atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); - atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); - } - - if (c->monologues.head) { - ml = c->monologues.head->data; - - if (!IS_FOREIGN_CALL(c)) { - if (ml->term_reason==TIMEOUT) { - atomic64_inc(&m->totalstats.total_timeout_sess); - atomic64_inc(&m->totalstats_interval.total_timeout_sess); - } else if (ml->term_reason==SILENT_TIMEOUT) { - atomic64_inc(&m->totalstats.total_silent_timeout_sess); - atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess); - } else if (ml->term_reason==REGULAR) { - atomic64_inc(&m->totalstats.total_regular_term_sess); - atomic64_inc(&m->totalstats_interval.total_regular_term_sess); - } else if (ml->term_reason==FORCED) { - atomic64_inc(&m->totalstats.total_forced_term_sess); - atomic64_inc(&m->totalstats_interval.total_forced_term_sess); + ice_shutdown(&md->ice_agent); } - - timeval_totalstats_average_add(&m->totalstats, &tim_result_duration); - timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration); - timeval_totalstats_interval_call_duration_add( - &m->totalstats_interval, &ml->started, &ml->terminated, - &m->latest_graphite_interval_start, - m->conf.graphite_interval); - } - - if (ml->term_reason==FINAL_TIMEOUT) { - atomic64_inc(&m->totalstats.total_final_timeout_sess); - atomic64_inc(&m->totalstats_interval.total_final_timeout_sess); } } + statistics_update_oneway(c); - if (_log_facility_cdr) - /* log it */ - cdrlog(cdrbuffer); + cdr_update_entry(c); for (l = c->streams.head; l; l = l->next) { ps = l->data; @@ -2394,19 +2052,11 @@ restart: } g_hash_table_insert(m->callhash, &c->callid, obj_get(c)); - if (type == CT_OWN_CALL) { - mutex_lock(&m->totalstats_interval.managed_sess_lock); - m->totalstats_interval.managed_sess_max = MAX( - m->totalstats_interval.managed_sess_max, - g_hash_table_size(m->callhash) - - atomic64_get(&m->stats.foreign_sessions)); - mutex_unlock(&m->totalstats_interval.managed_sess_lock); - } - else if (type == CT_FOREIGN_CALL) { /* foreign call*/ - c->foreign_call = 1; - atomic64_inc(&m->stats.foreign_sessions); - atomic64_inc(&m->totalstats.total_foreign_sessions); - } + if (type == CT_FOREIGN_CALL) /* foreign call*/ + c->foreign_call = 1; + + statistics_update_foreignown_inc(m,c); + rwlock_lock_w(&c->master_lock); rwlock_unlock_w(&m->hashlock); } diff --git a/daemon/call.h b/daemon/call.h index a5777cee9..c786bb074 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -6,7 +6,6 @@ /* XXX split everything into call_signalling.[ch] and call_packets.[ch] or w/e */ #include -#include #include #include @@ -19,17 +18,9 @@ #include "socket.h" #include "media_socket.h" #include "recording.h" +#include "statistics.h" #define UNDEFINED ((unsigned int) -1) -#define TRUNCATED " ... Output truncated. Increase Output Buffer ... \n" - -#define truncate_output(x) strcpy(x - strlen(TRUNCATED) - 1, TRUNCATED) - -#define ADJUSTLEN(printlen,outbufend,replybuffer) do { \ - replybuffer += (printlen>=outbufend-replybuffer)?outbufend-replybuffer:printlen; \ - if (replybuffer == outbufend) \ - truncate_output(replybuffer); \ - } while (0); enum termination_reason { UNKNOWN=0, @@ -109,6 +100,7 @@ enum call_type { #endif #define IS_FOREIGN_CALL(c) (c->foreign_call) +#define IS_OWN_CALL(c) !IS_FOREIGN_CALL(c) /* flags shared by several of the structs below */ #define SHARED_FLAG_IMPLICIT_RTCP 0x00000001 @@ -232,54 +224,6 @@ struct transport_protocol { }; extern const struct transport_protocol transport_protocols[]; -struct stats { - atomic64 packets; - atomic64 bytes; - atomic64 errors; - u_int64_t delay_min; - u_int64_t delay_avg; - u_int64_t delay_max; - u_int8_t in_tos_tclass; /* XXX shouldn't be here - not stats */ - atomic64 foreign_sessions; // unresponsible via redis notification -}; - -struct request_time { - mutex_t lock; - u_int64_t count; - struct timeval time_min, time_max, time_avg; -}; - -struct totalstats { - time_t started; - atomic64 total_timeout_sess; - atomic64 total_foreign_sessions; - atomic64 total_rejected_sess; - atomic64 total_silent_timeout_sess; - atomic64 total_final_timeout_sess; - atomic64 total_regular_term_sess; - atomic64 total_forced_term_sess; - atomic64 total_relayed_packets; - atomic64 total_relayed_errors; - atomic64 total_nopacket_relayed_sess; - atomic64 total_oneway_stream_sess; - - u_int64_t foreign_sessions; - u_int64_t own_sessions; - u_int64_t total_sessions; - - mutex_t total_average_lock; /* for these two below */ - u_int64_t total_managed_sess; - struct timeval total_average_call_dur; - - mutex_t managed_sess_lock; /* for these below */ - u_int64_t managed_sess_max; /* per graphite interval statistic */ - u_int64_t managed_sess_min; /* per graphite interval statistic */ - - mutex_t total_calls_duration_lock; /* for these two below */ - struct timeval total_calls_duration_interval; - - struct request_time offer, answer, delete; -}; struct stream_params { unsigned int index; /* starting with 1 */ @@ -314,14 +258,7 @@ struct loop_protector { unsigned char buf[RTP_LOOP_PROTECT]; }; -struct rtp_stats { - unsigned int payload_type; - atomic64 packets; - atomic64 bytes; - atomic64 kernel_packets; - atomic64 kernel_bytes; - atomic64 in_tos_tclass; -}; + struct packet_stream { mutex_t in_lock, @@ -444,7 +381,6 @@ struct call { unsigned int foreign_call; // created_via_redis_notify call struct recording *recording; - JsonReader *root_reader; }; struct callmaster_config { @@ -502,13 +438,6 @@ struct callmaster { struct homer_sender *homer; }; -struct call_stats { - time_t last_packet; - struct stats totals[4]; /* rtp in, rtcp in, rtp out, rtcp out */ -}; - - - struct callmaster *callmaster_new(struct poller *); void callmaster_get_all_calls(struct callmaster *m, GQueue *q); struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, @@ -545,16 +474,11 @@ void add_total_calls_duration_in_interval(struct callmaster *cm, struct timeval void __payload_type_free(void *p); void __rtp_stats_update(GHashTable *dst, GHashTable *src); -const char *get_tag_type_text(enum tag_type t); -const char *get_opmode_text(enum call_opmode); - - +const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m); #include "str.h" #include "rtp.h" - - INLINE void *call_malloc(struct call *c, size_t l) { void *ret; mutex_lock(&c->buffer_lock); diff --git a/daemon/cdr.c b/daemon/cdr.c new file mode 100644 index 000000000..0456a2a3c --- /dev/null +++ b/daemon/cdr.c @@ -0,0 +1,224 @@ +#include +#include "rtplib.h" +#include "cdr.h" +#include "call.h" +#include "poller.h" +#include "str.h" + +#define CDRBUFREMAINDER cdrbufend-cdrbufcur + +static const char * const __term_reason_texts[] = { + [TIMEOUT] = "TIMEOUT", + [REGULAR] = "REGULAR", + [FORCED] = "FORCED", + [SILENT_TIMEOUT] = "SILENT_TIMEOUT", + [FINAL_TIMEOUT] = "FINAL_TIMEOUT", +}; +static const char * const __tag_type_texts[] = { + [FROM_TAG] = "FROM_TAG", + [TO_TAG] = "TO_TAG", +}; +static const char *const __opmode_texts[] = { + [OP_OFFER] = "offer", + [OP_ANSWER] = "answer", +}; + +const char * get_tag_type_text(enum tag_type t) { + return get_enum_array_text(__tag_type_texts, t, "UNKNOWN"); +} +const char *get_opmode_text(enum call_opmode m) { + return get_enum_array_text(__opmode_texts, m, "other"); +} + +static const char * get_term_reason_text(enum termination_reason t) { + return get_enum_array_text(__term_reason_texts, t, "UNKNOWN"); +} + +void cdr_update_entry(struct call* c) { + GList *l; + struct call_monologue *ml; + struct timeval tim_result_duration; + int printlen=0; + int cdrlinecnt = 0; + static const int CDRBUFLENGTH = 4096*2; + char cdrbuffer[CDRBUFLENGTH]; + char* cdrbufcur = cdrbuffer; + char* cdrbufend = cdrbuffer+CDRBUFLENGTH-1; + struct call_media *md; + GList *k, *o; + const struct rtp_payload_type *rtp_pt; + struct packet_stream *ps=0; + + if (IS_OWN_CALL(c)) { + + /* CDRs and statistics */ + if (_log_facility_cdr) { + printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"ci=%s, ",c->callid.s); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"created_from=%s, ", c->created_from); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"last_signal=%llu, ", (unsigned long long)c->last_signal); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"tos=%u, ", (unsigned int)c->tos); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + } + + for (l = c->monologues.head; l; l = l->next) { + ml = l->data; + + if (!ml->terminated.tv_sec) { + gettimeofday(&ml->terminated, NULL); + ml->term_reason = UNKNOWN; + } + + timeval_subtract(&tim_result_duration,&ml->terminated,&ml->started); + + if (_log_facility_cdr) { + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, + "ml%i_start_time=%ld.%06lu, " + "ml%i_end_time=%ld.%06ld, " + "ml%i_duration=%ld.%06ld, " + "ml%i_termination=%s, " + "ml%i_local_tag=%s, " + "ml%i_local_tag_type=%s, " + "ml%i_remote_tag=%s, ", + cdrlinecnt, ml->started.tv_sec, ml->started.tv_usec, + cdrlinecnt, ml->terminated.tv_sec, ml->terminated.tv_usec, + cdrlinecnt, tim_result_duration.tv_sec, tim_result_duration.tv_usec, + cdrlinecnt, get_term_reason_text(ml->term_reason), + cdrlinecnt, ml->tag.s, + cdrlinecnt, get_tag_type_text(ml->tagtype), + cdrlinecnt, ml->active_dialogue ? ml->active_dialogue->tag.s : "(none)"); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + } + + for (k = ml->medias.head; k; k = k->next) { + md = k->data; + + rtp_pt = __rtp_stats_codec(md); + + /* add PayloadType(codec) info in CDR logging */ + if (_log_facility_cdr && rtp_pt) { + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=%u, ", rtp_pt->payload_type); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + } else if (_log_facility_cdr && !rtp_pt) { + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, "payload_type=unknown, "); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + } + + for (o = md->streams.head; o; o = o->next) { + ps = o->data; + + if (PS_ISSET(ps, FALLBACK_RTCP)) + continue; + + char *addr = sockaddr_print_buf(&ps->endpoint.address); + char *local_addr = ps->selected_sfd ? sockaddr_print_buf(&ps->selected_sfd->socket.local.address) : "0.0.0.0"; + + if (_log_facility_cdr) { + const char* protocol = (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? "rtcp" : "rtp"; + + if(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) { + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, + "ml%i_midx%u_%s_endpoint_ip=%s, " + "ml%i_midx%u_%s_endpoint_port=%u, " + "ml%i_midx%u_%s_local_relay_ip=%s, " + "ml%i_midx%u_%s_local_relay_port=%u, " + "ml%i_midx%u_%s_relayed_packets="UINT64F", " + "ml%i_midx%u_%s_relayed_bytes="UINT64F", " + "ml%i_midx%u_%s_relayed_errors="UINT64F", " + "ml%i_midx%u_%s_last_packet="UINT64F", " + "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ", + cdrlinecnt, md->index, protocol, addr, + cdrlinecnt, md->index, protocol, ps->endpoint.port, + cdrlinecnt, md->index, protocol, local_addr, + cdrlinecnt, md->index, protocol, + (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->last_packet), + cdrlinecnt, md->index, protocol, + ps->stats.in_tos_tclass); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); + } else { +#if (RE_HAS_MEASUREDELAY) + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, + "ml%i_midx%u_%s_endpoint_ip=%s, " + "ml%i_midx%u_%s_endpoint_port=%u, " + "ml%i_midx%u_%s_local_relay_ip=%s, " + "ml%i_midx%u_%s_local_relay_port=%u, " + "ml%i_midx%u_%s_relayed_packets="UINT64F", " + "ml%i_midx%u_%s_relayed_bytes="UINT64F", " + "ml%i_midx%u_%s_relayed_errors="UINT64F", " + "ml%i_midx%u_%s_last_packet="UINT64F", " + "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", " + "ml%i_midx%u_%s_delay_min=%.9f, " + "ml%i_midx%u_%s_delay_avg=%.9f, " + "ml%i_midx%u_%s_delay_max=%.9f, ", + cdrlinecnt, md->index, protocol, addr, + cdrlinecnt, md->index, protocol, ps->endpoint.port, + cdrlinecnt, md->index, protocol, local_addr, + cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->last_packet), + cdrlinecnt, md->index, protocol, + ps->stats.in_tos_tclass, + cdrlinecnt, md->index, protocol, (double) ps->stats.delay_min / 1000000, + cdrlinecnt, md->index, protocol, (double) ps->stats.delay_avg / 1000000, + cdrlinecnt, md->index, protocol, (double) ps->stats.delay_max / 1000000); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); +#else + printlen = snprintf(cdrbufcur, CDRBUFREMAINDER, + "ml%i_midx%u_%s_endpoint_ip=%s, " + "ml%i_midx%u_%s_endpoint_port=%u, " + "ml%i_midx%u_%s_local_relay_ip=%s, " + "ml%i_midx%u_%s_local_relay_port=%u, " + "ml%i_midx%u_%s_relayed_packets="UINT64F", " + "ml%i_midx%u_%s_relayed_bytes="UINT64F", " + "ml%i_midx%u_%s_relayed_errors="UINT64F", " + "ml%i_midx%u_%s_last_packet="UINT64F", " + "ml%i_midx%u_%s_in_tos_tclass=%" PRIu8 ", ", + cdrlinecnt, md->index, protocol, addr, + cdrlinecnt, md->index, protocol, ps->endpoint.port, + cdrlinecnt, md->index, protocol, local_addr, + cdrlinecnt, md->index, protocol, + (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.packets), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.bytes), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->stats.errors), + cdrlinecnt, md->index, protocol, + atomic64_get(&ps->last_packet), + cdrlinecnt, md->index, protocol, + ps->stats.in_tos_tclass); + ADJUSTLEN(printlen,cdrbufend,cdrbufcur); +#endif + } + } + + } + } + if (_log_facility_cdr) + ++cdrlinecnt; + } + } + + /* log it */ + cdrlog(cdrbuffer); + + +} + diff --git a/daemon/cdr.h b/daemon/cdr.h new file mode 100644 index 000000000..2ea19b54f --- /dev/null +++ b/daemon/cdr.h @@ -0,0 +1,21 @@ +/* + * cdr.h + * + * Created on: Mar 14, 2017 + * Author: fmetz + */ + +#ifndef CDR_H_ +#define CDR_H_ + +#include "aux.h" + +struct call; +enum tag_type; +enum call_opmode; + +const char *get_tag_type_text(enum tag_type t); +const char *get_opmode_text(enum call_opmode); +void cdr_update_entry(struct call* c); + +#endif /* CDR_H_ */ diff --git a/daemon/cli.c b/daemon/cli.c index b95fbb2de..f4d88999a 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -18,6 +18,7 @@ #include "redis.h" #include "control_ng.h" #include "media_socket.h" +#include "cdr.h" #include "rtpengine_config.h" diff --git a/daemon/crypto.c b/daemon/crypto.c index f3922178a..75f4651e6 100644 --- a/daemon/crypto.c +++ b/daemon/crypto.c @@ -28,16 +28,14 @@ static int hmac_sha1_rtp(struct crypto_context *, char *out, str *in, u_int64_t) static int hmac_sha1_rtcp(struct crypto_context *, char *out, str *in); static int aes_f8_encrypt_rtp(struct crypto_context *c, struct rtp_header *r, str *s, u_int64_t idx); static int aes_f8_encrypt_rtcp(struct crypto_context *c, struct rtcp_packet *r, str *s, u_int64_t idx); -static int aes_cm_session_key_init_128(struct crypto_context *c); -static int aes_cm_session_key_init_192(struct crypto_context *c); -static int aes_cm_session_key_init_256(struct crypto_context *c); +static int aes_cm_session_key_init(struct crypto_context *c); static int aes_f8_session_key_init(struct crypto_context *c); static int evp_session_key_cleanup(struct crypto_context *c); static int null_crypt_rtp(struct crypto_context *c, struct rtp_header *r, str *s, u_int64_t idx); static int null_crypt_rtcp(struct crypto_context *c, struct rtcp_packet *r, str *s, u_int64_t idx); /* all lengths are in bytes */ -const struct crypto_suite crypto_suites[] = { +struct crypto_suite __crypto_suites[] = { { .name = "AES_CM_128_HMAC_SHA1_80", .dtls_name = "SRTP_AES128_CM_SHA1_80", @@ -59,7 +57,7 @@ const struct crypto_suite crypto_suites[] = { .decrypt_rtcp = aes_cm_encrypt_rtcp, .hash_rtp = hmac_sha1_rtp, .hash_rtcp = hmac_sha1_rtcp, - .session_key_init = aes_cm_session_key_init_128, + .session_key_init = aes_cm_session_key_init, .session_key_cleanup = evp_session_key_cleanup, }, { @@ -83,7 +81,7 @@ const struct crypto_suite crypto_suites[] = { .decrypt_rtcp = aes_cm_encrypt_rtcp, .hash_rtp = hmac_sha1_rtp, .hash_rtcp = hmac_sha1_rtcp, - .session_key_init = aes_cm_session_key_init_128, + .session_key_init = aes_cm_session_key_init, .session_key_cleanup = evp_session_key_cleanup, }, { @@ -107,7 +105,7 @@ const struct crypto_suite crypto_suites[] = { .decrypt_rtcp = aes_cm_encrypt_rtcp, .hash_rtp = hmac_sha1_rtp, .hash_rtcp = hmac_sha1_rtcp, - .session_key_init = aes_cm_session_key_init_192, + .session_key_init = aes_cm_session_key_init, .session_key_cleanup = evp_session_key_cleanup, }, { @@ -131,7 +129,7 @@ const struct crypto_suite crypto_suites[] = { .decrypt_rtcp = aes_cm_encrypt_rtcp, .hash_rtp = hmac_sha1_rtp, .hash_rtcp = hmac_sha1_rtcp, - .session_key_init = aes_cm_session_key_init_192, + .session_key_init = aes_cm_session_key_init, .session_key_cleanup = evp_session_key_cleanup, }, { @@ -155,7 +153,7 @@ const struct crypto_suite crypto_suites[] = { .decrypt_rtcp = aes_cm_encrypt_rtcp, .hash_rtp = hmac_sha1_rtp, .hash_rtcp = hmac_sha1_rtcp, - .session_key_init = aes_cm_session_key_init_256, + .session_key_init = aes_cm_session_key_init, .session_key_cleanup = evp_session_key_cleanup, }, { @@ -179,7 +177,7 @@ const struct crypto_suite crypto_suites[] = { .decrypt_rtcp = aes_cm_encrypt_rtcp, .hash_rtp = hmac_sha1_rtp, .hash_rtcp = hmac_sha1_rtcp, - .session_key_init = aes_cm_session_key_init_256, + .session_key_init = aes_cm_session_key_init, .session_key_cleanup = evp_session_key_cleanup, }, { @@ -278,7 +276,8 @@ const struct crypto_suite crypto_suites[] = { }, }; -const int num_crypto_suites = G_N_ELEMENTS(crypto_suites); +const struct crypto_suite *crypto_suites = __crypto_suites; +const int num_crypto_suites = G_N_ELEMENTS(__crypto_suites); @@ -357,11 +356,12 @@ done: ; } -static void aes_ctr_no_ctx(unsigned char *out, str *in, const unsigned char *key, int keylen, const unsigned char *iv) { +static void aes_ctr_no_ctx(unsigned char *out, str *in, const unsigned char *key, const EVP_CIPHER *ciph, + const unsigned char *iv) +{ EVP_CIPHER_CTX *ctx; unsigned char block[16]; int len; - const EVP_CIPHER *ecb_cipher; #if OPENSSL_VERSION_NUMBER >= 0x10100000L ctx = EVP_CIPHER_CTX_new(); @@ -370,22 +370,7 @@ static void aes_ctr_no_ctx(unsigned char *out, str *in, const unsigned char *key ctx = &ctx_s; EVP_CIPHER_CTX_init(ctx); #endif - switch(keylen) { - case 16: - ecb_cipher = EVP_aes_128_ecb(); - break; - case 24: - ecb_cipher = EVP_aes_192_ecb(); - break; - case 32: - ecb_cipher = EVP_aes_256_ecb(); - break; - default: - // silence -Wmaybe-unintialized; must not end up here - assert(FALSE); - break; - } - EVP_EncryptInit_ex(ctx, ecb_cipher, NULL, key, NULL); + EVP_EncryptInit_ex(ctx, ciph, NULL, key, NULL); aes_ctr(out, in, ctx, iv); EVP_EncryptFinal_ex(ctx, block, &len); @@ -401,7 +386,7 @@ static void aes_ctr_no_ctx(unsigned char *out, str *in, const unsigned char *key * x: 112 bits * n <= 256 * out->len := n / 8 */ -static void prf_n(str *out, const unsigned char *key, int keylen, const unsigned char *x) { +static void prf_n(str *out, const unsigned char *key, const EVP_CIPHER *ciph, const unsigned char *x) { unsigned char iv[16]; unsigned char o[32]; unsigned char in[32]; @@ -414,7 +399,7 @@ static void prf_n(str *out, const unsigned char *key, int keylen, const unsigned /* iv[14] = iv[15] = 0; := x << 16 */ ZERO(in); /* outputs the key stream */ str_init_len(&in_s, (void *) in, out->len > 16 ? 32 : 16); - aes_ctr_no_ctx(o, &in_s, key, keylen, iv); + aes_ctr_no_ctx(o, &in_s, key, ciph, iv); memcpy(out->s, o, out->len); } @@ -436,7 +421,7 @@ int crypto_gen_session_key(struct crypto_context *c, str *out, unsigned char lab for (i = 13 - index_len; i < 14; i++) x[i] = key_id[i - (13 - index_len)] ^ x[i]; - prf_n(out, c->params.master_key, c->params.crypto_suite->master_key_len, x); + prf_n(out, c->params.master_key, c->params.crypto_suite->lib_cipher_ptr, x); #if CRYPTO_DEBUG ilog(LOG_DEBUG, "Generated session key: master key " @@ -629,35 +614,7 @@ static int hmac_sha1_rtcp(struct crypto_context *c, char *out, str *in) { return 0; } -static int aes_cm_session_key_init_128(struct crypto_context *c) { - evp_session_key_cleanup(c); - -#if OPENSSL_VERSION_NUMBER >= 0x10100000L - c->session_key_ctx[0] = EVP_CIPHER_CTX_new(); -#else - c->session_key_ctx[0] = g_slice_alloc(sizeof(EVP_CIPHER_CTX)); - EVP_CIPHER_CTX_init(c->session_key_ctx[0]); -#endif - EVP_EncryptInit_ex(c->session_key_ctx[0], EVP_aes_128_ecb(), NULL, - (unsigned char *) c->session_key, NULL); - return 0; -} - -static int aes_cm_session_key_init_192(struct crypto_context *c) { - evp_session_key_cleanup(c); - -#if OPENSSL_VERSION_NUMBER >= 0x10100000L - c->session_key_ctx[0] = EVP_CIPHER_CTX_new(); -#else - c->session_key_ctx[0] = g_slice_alloc(sizeof(EVP_CIPHER_CTX)); - EVP_CIPHER_CTX_init(c->session_key_ctx[0]); -#endif - EVP_EncryptInit_ex(c->session_key_ctx[0], EVP_aes_192_ecb(), NULL, - (unsigned char *) c->session_key, NULL); - return 0; -} - -static int aes_cm_session_key_init_256(struct crypto_context *c) { +static int aes_cm_session_key_init(struct crypto_context *c) { evp_session_key_cleanup(c); #if OPENSSL_VERSION_NUMBER >= 0x10100000L @@ -666,7 +623,7 @@ static int aes_cm_session_key_init_256(struct crypto_context *c) { c->session_key_ctx[0] = g_slice_alloc(sizeof(EVP_CIPHER_CTX)); EVP_CIPHER_CTX_init(c->session_key_ctx[0]); #endif - EVP_EncryptInit_ex(c->session_key_ctx[0], EVP_aes_256_ecb(), NULL, + EVP_EncryptInit_ex(c->session_key_ctx[0], c->params.crypto_suite->lib_cipher_ptr, NULL, (unsigned char *) c->session_key, NULL); return 0; } @@ -677,7 +634,7 @@ static int aes_f8_session_key_init(struct crypto_context *c) { int k_e_len, k_s_len; /* n_e, n_s */ unsigned char *key; - aes_cm_session_key_init_128(c); + aes_cm_session_key_init(c); k_e_len = c->params.crypto_suite->session_key_len; k_s_len = c->params.crypto_suite->session_salt_len; @@ -751,3 +708,21 @@ void crypto_dump_keys(struct crypto_context *in, struct crypto_context *out) { ilog(LOG_DEBUG, "SRTP keys, outgoing:"); dump_key(out); } + +void crypto_init_main() { + struct crypto_suite *cs; + for (int i = 0; i < num_crypto_suites; i++) { + cs = &__crypto_suites[i]; + switch(cs->master_key_len) { + case 16: + cs->lib_cipher_ptr = EVP_aes_128_ecb(); + break; + case 24: + cs->lib_cipher_ptr = EVP_aes_192_ecb(); + break; + case 32: + cs->lib_cipher_ptr = EVP_aes_256_ecb(); + break; + } + } +} diff --git a/daemon/crypto.h b/daemon/crypto.h index 0095697f9..de97ff436 100644 --- a/daemon/crypto.h +++ b/daemon/crypto.h @@ -56,6 +56,7 @@ struct crypto_suite { session_key_init_func session_key_init; session_key_cleanup_func session_key_cleanup; const char *dtls_profile_code; + const void *lib_cipher_ptr; }; struct crypto_session_params { @@ -97,11 +98,13 @@ struct rtp_ssrc_entry { u_int64_t index; }; -extern const struct crypto_suite crypto_suites[]; +extern const struct crypto_suite *crypto_suites; extern const int num_crypto_suites; +void crypto_init_main(); + const struct crypto_suite *crypto_find_suite(const str *); int crypto_gen_session_key(struct crypto_context *, str *, unsigned char, int); void crypto_dump_keys(struct crypto_context *in, struct crypto_context *out); diff --git a/daemon/log.c b/daemon/log.c index 450da4d55..e096284ba 100644 --- a/daemon/log.c +++ b/daemon/log.c @@ -58,7 +58,9 @@ void __ilog(int prio, const char *fmt, ...) { } void cdrlog(const char* cdrbuffer) { - syslog(LOG_INFO | _log_facility_cdr, "%s", cdrbuffer); + if (_log_facility_cdr) { + syslog(LOG_INFO | _log_facility_cdr, "%s", cdrbuffer); + } } diff --git a/daemon/main.c b/daemon/main.c index 310263304..05322dd45 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -514,6 +514,7 @@ static void init_everything() { sdp_init(); dtls_init(); ice_init(); + crypto_init_main(); interfaces_init(&interfaces); } diff --git a/daemon/recording.c b/daemon/recording.c index 0a6958dbd..4bd854bb9 100644 --- a/daemon/recording.c +++ b/daemon/recording.c @@ -19,6 +19,7 @@ #include "kernel.h" #include "bencode.h" #include "rtplib.h" +#include "cdr.h" diff --git a/daemon/redis.c b/daemon/redis.c index f6f431384..fde2ff9e3 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -13,6 +13,9 @@ #include #include #include +#include +#include +#include #include "compat.h" #include "aux.h" @@ -26,9 +29,6 @@ #include "rtplib.h" #include "str.h" -#include -#include -#include INLINE redisReply *redis_expect(int type, redisReply *r) { if (!r) @@ -699,16 +699,13 @@ INLINE str *json_reader_get_string_value_uri_enc(JsonReader *root_reader) { return out; // must be free'd } -static int json_get_hash(struct redis_hash *out, struct call* c, - const char *key, unsigned int id) +static int json_get_hash(struct redis_hash *out, + const char *key, unsigned int id, JsonReader *root_reader) { static unsigned int MAXKEYLENGTH = 512; char key_concatted[MAXKEYLENGTH]; int rc=0; - if (!c) - goto err; - if (id == -1) { rc = snprintf(key_concatted, MAXKEYLENGTH, "%s",key); } else { @@ -717,7 +714,7 @@ static int json_get_hash(struct redis_hash *out, struct call* c, if (rc>=MAXKEYLENGTH) rlog(LOG_ERROR,"Json key too long."); - if (!json_reader_read_member(c->root_reader, key_concatted)) { + if (!json_reader_read_member(root_reader, key_concatted)) { rlog(LOG_ERROR, "Could not read json member: %s",key_concatted); goto err; } @@ -726,16 +723,16 @@ static int json_get_hash(struct redis_hash *out, struct call* c, if (!out->ht) goto err; - gchar **members = json_reader_list_members(c->root_reader); + gchar **members = json_reader_list_members(root_reader); gchar **orig_members = members; - for (int i=0; i < json_reader_count_members (c->root_reader); ++i) { + for (int i=0; i < json_reader_count_members (root_reader); ++i) { - if (!json_reader_read_member(c->root_reader, *members)) { + if (!json_reader_read_member(root_reader, *members)) { rlog(LOG_ERROR, "Could not read json member: %s",*members); goto err3; } - str *val = json_reader_get_string_value_uri_enc(c->root_reader); + str *val = json_reader_get_string_value_uri_enc(root_reader); char* tmp = strdup(*members); if (g_hash_table_insert_check(out->ht, tmp, val) != TRUE) { @@ -743,12 +740,12 @@ static int json_get_hash(struct redis_hash *out, struct call* c, goto err3; } - json_reader_end_member(c->root_reader); + json_reader_end_member(root_reader); ++members; } // for g_strfreev(orig_members); - json_reader_end_member (c->root_reader); + json_reader_end_member (root_reader); return 0; @@ -876,18 +873,18 @@ static void *redis_list_get_ptr(struct redis_list *list, struct redis_hash *rh, static int json_build_list_cb(GQueue *q, struct call *c, const char *key, unsigned int idx, struct redis_list *list, - int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr) + int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr, JsonReader *root_reader) { char key_concatted[256]; snprintf(key_concatted, 256, "%s-%u", key, idx); - if (!json_reader_read_member(c->root_reader, key_concatted)) + if (!json_reader_read_member(root_reader, key_concatted)) rlog(LOG_ERROR,"Key in json not found:%s",key_concatted); - for (int jidx=0; jidx < json_reader_count_elements(c->root_reader); ++jidx) { - if (!json_reader_read_element(c->root_reader,jidx)) + for (int jidx=0; jidx < json_reader_count_elements(root_reader); ++jidx) { + if (!json_reader_read_element(root_reader,jidx)) rlog(LOG_ERROR,"Element in array not found."); - str *s = json_reader_get_string_value_uri_enc(c->root_reader); + str *s = json_reader_get_string_value_uri_enc(root_reader); if (!s) rlog(LOG_ERROR,"String in json not found."); if (cb(s, q, list, ptr)) { @@ -895,9 +892,9 @@ static int json_build_list_cb(GQueue *q, struct call *c, const char *key, return -1; } free(s); - json_reader_end_element(c->root_reader); + json_reader_end_element(root_reader); } - json_reader_end_member (c->root_reader); + json_reader_end_member (root_reader); return 0; } @@ -910,14 +907,14 @@ static int rbl_cb_simple(str *s, GQueue *q, struct redis_list *list, void *ptr) } static int json_build_list(GQueue *q, struct call *c, const char *key, const str *callid, - unsigned int idx, struct redis_list *list) + unsigned int idx, struct redis_list *list, JsonReader *root_reader) { - return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL); + return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL, root_reader); } -static int json_get_list_hash(struct redis_list *out, struct call* c, +static int json_get_list_hash(struct redis_list *out, const char *key, - const struct redis_hash *rh, const char *rh_num_key) + const struct redis_hash *rh, const char *rh_num_key, JsonReader *root_reader) { unsigned int i; @@ -931,7 +928,7 @@ static int json_get_list_hash(struct redis_list *out, struct call* c, goto err1; for (i = 0; i < out->len; i++) { - if (json_get_hash(&out->rh[i], c, key, i)) + if (json_get_hash(&out->rh[i], key, i, root_reader)) goto err2; } @@ -1155,7 +1152,7 @@ static int rbl_cb_plts(str *s, GQueue *q, struct redis_list *list, void *ptr) { g_hash_table_replace(med->rtp_payload_types, &pt->payload_type, pt); return 0; } -static int json_medias(struct redis *r, struct call *c, struct redis_list *medias) { +static int json_medias(struct redis *r, struct call *c, struct redis_list *medias, JsonReader *root_reader) { unsigned int i; struct redis_hash *rh; struct call_media *med; @@ -1203,7 +1200,7 @@ static int json_medias(struct redis *r, struct call *c, struct redis_list *media if (redis_hash_get_crypto_params(&med->sdes_out.params, rh, "sdes_out") < 0) return -1; - json_build_list_cb(NULL, c, "payload_types", i, NULL, rbl_cb_plts, med); + json_build_list_cb(NULL, c, "payload_types", i, NULL, rbl_cb_plts, med, root_reader); /* XXX dtls */ medias->ptrs[i] = med; @@ -1264,7 +1261,7 @@ static int redis_link_sfds(struct redis_list *sfds, struct redis_list *streams) return 0; } -static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_list *medias) +static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_list *medias, JsonReader *root_reader) { unsigned int i; struct call_monologue *ml, *other_ml; @@ -1276,7 +1273,7 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active"); - if (json_build_list(&q, c, "other_tags", &c->callid, i, tags)) + if (json_build_list(&q, c, "other_tags", &c->callid, i, tags, root_reader)) return -1; for (l = q.head; l; l = l->next) { other_ml = l->data; @@ -1286,7 +1283,7 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ } g_queue_clear(&q); - if (json_build_list(&ml->medias, c, "medias", &c->callid, i, medias)) + if (json_build_list(&ml->medias, c, "medias", &c->callid, i, medias, root_reader)) return -1; } @@ -1294,7 +1291,7 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ } static int json_link_streams(struct call *c, struct redis_list *streams, - struct redis_list *sfds, struct redis_list *medias) + struct redis_list *sfds, struct redis_list *medias, JsonReader *root_reader) { unsigned int i; struct packet_stream *ps; @@ -1308,7 +1305,7 @@ static int json_link_streams(struct call *c, struct redis_list *streams, ps->rtcp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink"); ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling"); - if (json_build_list(&ps->sfds, c, "stream_sfds", &c->callid, i, sfds)) + if (json_build_list(&ps->sfds, c, "stream_sfds", &c->callid, i, sfds, root_reader)) return -1; if (ps->media) @@ -1319,7 +1316,7 @@ static int json_link_streams(struct call *c, struct redis_list *streams, } static int json_link_medias(struct redis *r, struct call *c, struct redis_list *medias, - struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) + struct redis_list *streams, struct redis_list *maps, struct redis_list *tags, JsonReader *root_reader) { unsigned int i; struct call_media *med; @@ -1330,9 +1327,9 @@ static int json_link_medias(struct redis *r, struct call *c, struct redis_list * med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag"); if (!med->monologue) return -1; - if (json_build_list(&med->streams, c, "streams", &c->callid, i, streams)) + if (json_build_list(&med->streams, c, "streams", &c->callid, i, streams, root_reader)) return -1; - if (json_build_list(&med->endpoint_maps, c, "maps", &c->callid, i, maps)) + if (json_build_list(&med->endpoint_maps, c, "maps", &c->callid, i, maps, root_reader)) return -1; } return 0; @@ -1368,7 +1365,7 @@ static int rbl_cb_intf_sfds(str *s, GQueue *q, struct redis_list *list, void *pt } static int json_link_maps(struct redis *r, struct call *c, struct redis_list *maps, - struct redis_list *sfds) + struct redis_list *sfds, JsonReader *root_reader) { unsigned int i; struct endpoint_map *em; @@ -1377,7 +1374,7 @@ static int json_link_maps(struct redis *r, struct call *c, struct redis_list *ma em = maps->ptrs[i]; if (json_build_list_cb(&em->intf_sfds, c, "map_sfds", em->unique_id, sfds, - rbl_cb_intf_sfds, em)) + rbl_cb_intf_sfds, em, root_reader)) return -1; } return 0; @@ -1414,29 +1411,27 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * if (!c) goto err1; - c->root_reader = root_reader; // attach the json to the call in order to restore data from there - err = "call already exists"; if (c->last_signal) goto err2; err = "'call' data incomplete"; - if (json_get_hash(&call, c, "json", -1)) + if (json_get_hash(&call, "json", -1, root_reader)) goto err2; err = "'tags' incomplete"; - if (json_get_list_hash(&tags, c, "tag", &call, "num_tags")) + if (json_get_list_hash(&tags, "tag", &call, "num_tags", root_reader)) goto err3; err = "'sfds' incomplete"; - if (json_get_list_hash(&sfds, c, "sfd", &call, "num_sfds")) + if (json_get_list_hash(&sfds, "sfd", &call, "num_sfds", root_reader)) goto err4; err = "'streams' incomplete"; - if (json_get_list_hash(&streams, c, "stream", &call, "num_streams")) + if (json_get_list_hash(&streams, "stream", &call, "num_streams", root_reader)) goto err5; err = "'medias' incomplete"; - if (json_get_list_hash(&medias, c, "media", &call, "num_medias")) + if (json_get_list_hash(&medias, "media", &call, "num_medias", root_reader)) goto err6; err = "'maps' incomplete"; - if (json_get_list_hash(&maps, c, "map", &call, "num_maps")) + if (json_get_list_hash(&maps, "map", &call, "num_maps", root_reader)) goto err7; err = "missing 'created' timestamp"; @@ -1470,7 +1465,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * if (redis_tags(c, &tags)) goto err8; err = "failed to create medias"; - if (json_medias(r, c, &medias)) + if (json_medias(r, c, &medias, root_reader)) goto err8; err = "failed to create maps"; if (redis_maps(c, &maps)) @@ -1480,16 +1475,16 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * if (redis_link_sfds(&sfds, &streams)) goto err8; err = "failed to link streams"; - if (json_link_streams(c, &streams, &sfds, &medias)) + if (json_link_streams(c, &streams, &sfds, &medias, root_reader)) goto err8; err = "failed to link tags"; - if (json_link_tags(c, &tags, &medias)) + if (json_link_tags(c, &tags, &medias, root_reader)) goto err8; err = "failed to link medias"; - if (json_link_medias(r, c, &medias, &streams, &maps, &tags)) + if (json_link_medias(r, c, &medias, &streams, &maps, &tags, root_reader)) goto err8; err = "failed to link maps"; - if (json_link_maps(r, c, &maps, &sfds)) + if (json_link_maps(r, c, &maps, &sfds, root_reader)) goto err8; // presence of this key determines whether we were recording at all diff --git a/daemon/statistics.c b/daemon/statistics.c new file mode 100644 index 000000000..db20c3505 --- /dev/null +++ b/daemon/statistics.c @@ -0,0 +1,208 @@ +#include "call.h" +#include "statistics.h" + +static void timeval_totalstats_average_add(struct totalstats *s, const struct timeval *add) { + struct timeval dp, oa; + + mutex_lock(&s->total_average_lock); + + // new average = ((old average * old num sessions) + datapoint) / new num sessions + // ... but this will overflow when num sessions becomes very large + + // timeval_multiply(&t, &s->total_average_call_dur, s->total_managed_sess); + // timeval_add(&t, &t, add); + // s->total_managed_sess++; + // timeval_divide(&s->total_average_call_dur, &t, s->total_managed_sess); + + // alternative: + // new average = old average + (datapoint / new num sessions) - (old average / new num sessions) + + s->total_managed_sess++; + timeval_divide(&dp, add, s->total_managed_sess); + timeval_divide(&oa, &s->total_average_call_dur, s->total_managed_sess); + timeval_add(&s->total_average_call_dur, &s->total_average_call_dur, &dp); + timeval_subtract(&s->total_average_call_dur, &s->total_average_call_dur, &oa); + + mutex_unlock(&s->total_average_lock); +} + +static void timeval_totalstats_interval_call_duration_add(struct totalstats *s, + struct timeval *call_start, struct timeval *call_stop, + struct timeval *interval_start, int interval_dur_s) { + + /* work with graphite interval start val which might be changed elsewhere in the code*/ + struct timeval real_iv_start = *interval_start; + struct timeval call_duration; + struct timeval *call_start_in_iv = call_start; + + /* in case graphite interval needs to be the previous one */ + if (timercmp(&real_iv_start, call_stop, >)) { + struct timeval graph_dur = { .tv_sec = interval_dur_s, .tv_usec = 0LL }; + timeval_subtract(&real_iv_start, interval_start, &graph_dur); + } + + if (timercmp(&real_iv_start, call_start, >)) + call_start_in_iv = &real_iv_start; + + /* this should never happen and is here for sanitization of output */ + if (timercmp(call_start_in_iv, call_stop, >)) { + ilog(LOG_ERR, "Call start seems to exceed call stop"); + return; + } + + timeval_subtract(&call_duration, call_stop, call_start_in_iv); + + mutex_lock(&s->total_calls_duration_lock); + timeval_add(&s->total_calls_duration_interval, + &s->total_calls_duration_interval, &call_duration); + mutex_unlock(&s->total_calls_duration_lock); +} + + +void statistics_update_totals(struct callmaster* m, struct packet_stream *ps) { + atomic64_add(&m->totalstats.total_relayed_packets, + atomic64_get(&ps->stats.packets)); + atomic64_add(&m->totalstats_interval.total_relayed_packets, + atomic64_get(&ps->stats.packets)); + atomic64_add(&m->totalstats.total_relayed_errors, + atomic64_get(&ps->stats.errors)); + atomic64_add(&m->totalstats_interval.total_relayed_errors, + atomic64_get(&ps->stats.errors)); +} + +void statistics_update_foreignown_dec(struct call* c) { + struct callmaster *m; + + m = c->callmaster; + + if (IS_FOREIGN_CALL(c)) { + atomic64_dec(&m->stats.foreign_sessions); + } + + if(IS_OWN_CALL(c)) { + mutex_lock(&m->totalstats_interval.managed_sess_lock); + m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min, + g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions)); + mutex_unlock(&m->totalstats_interval.managed_sess_lock); + } + +} + +void statistics_update_foreignown_inc(struct callmaster *m, struct call* c) { + if (IS_OWN_CALL(c)) { + mutex_lock(&m->totalstats_interval.managed_sess_lock); + m->totalstats_interval.managed_sess_max = MAX( + m->totalstats_interval.managed_sess_max, + g_hash_table_size(m->callhash) + - atomic64_get(&m->stats.foreign_sessions)); + mutex_unlock(&m->totalstats_interval.managed_sess_lock); + } + else if (IS_FOREIGN_CALL(c)) { /* foreign call*/ + atomic64_inc(&m->stats.foreign_sessions); + atomic64_inc(&m->totalstats.total_foreign_sessions); + } + +} + +void statistics_update_oneway(struct call* c) { + struct callmaster *m; + struct packet_stream *ps=0, *ps2=0; + struct call_monologue *ml; + struct call_media *md; + GList *k, *o; + int found = 0; + GList *l; + struct timeval tim_result_duration; + + m = c->callmaster; + + // --- for statistics getting one way stream or no relay at all + int total_nopacket_relayed_sess = 0; + + for (l = c->monologues.head; l; l = l->next) { + ml = l->data; + + // --- go through partner ml and search the RTP + for (k = ml->medias.head; k; k = k->next) { + md = k->data; + + for (o = md->streams.head; o; o = o->next) { + ps = o->data; + if ((PS_ISSET(ps, RTP) && !PS_ISSET(ps, RTCP))) { + // --- only RTP is interesting + found = 1; + break; + } + } + if (found) { break; } + } + found = 0; + + if (ml->active_dialogue) { + // --- go through partner ml and search the RTP + for (k = ml->active_dialogue->medias.head; k; k = k->next) { + md = k->data; + + for (o = md->streams.head; o; o = o->next) { + ps2 = o->data; + if ((PS_ISSET(ps2, RTP) && !PS_ISSET(ps2, RTCP))) { + // --- only RTP is interesting + found = 1; + break; + } + } + if (found) { break; } + } + } + + if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) { + if (atomic64_get(&ps->stats.packets)!=0 && IS_OWN_CALL(c)){ + if (atomic64_get(&ps->stats.packets)!=0) { + atomic64_inc(&m->totalstats.total_oneway_stream_sess); + atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess); + } + } + else { + total_nopacket_relayed_sess++; + } + } + } + + if (IS_OWN_CALL(c)) { + atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); + atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); + } + + if (c->monologues.head) { + ml = c->monologues.head->data; + + if (IS_OWN_CALL(c)) { + if (ml->term_reason==TIMEOUT) { + atomic64_inc(&m->totalstats.total_timeout_sess); + atomic64_inc(&m->totalstats_interval.total_timeout_sess); + } else if (ml->term_reason==SILENT_TIMEOUT) { + atomic64_inc(&m->totalstats.total_silent_timeout_sess); + atomic64_inc(&m->totalstats_interval.total_silent_timeout_sess); + } else if (ml->term_reason==REGULAR) { + atomic64_inc(&m->totalstats.total_regular_term_sess); + atomic64_inc(&m->totalstats_interval.total_regular_term_sess); + } else if (ml->term_reason==FORCED) { + atomic64_inc(&m->totalstats.total_forced_term_sess); + atomic64_inc(&m->totalstats_interval.total_forced_term_sess); + } + + timeval_totalstats_average_add(&m->totalstats, &tim_result_duration); + timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration); + timeval_totalstats_interval_call_duration_add( + &m->totalstats_interval, &ml->started, &ml->terminated, + &m->latest_graphite_interval_start, + m->conf.graphite_interval); + } + + if (ml->term_reason==FINAL_TIMEOUT) { + atomic64_inc(&m->totalstats.total_final_timeout_sess); + atomic64_inc(&m->totalstats_interval.total_final_timeout_sess); + } + } + +} diff --git a/daemon/statistics.h b/daemon/statistics.h new file mode 100644 index 000000000..333a1e7ba --- /dev/null +++ b/daemon/statistics.h @@ -0,0 +1,80 @@ +#ifndef STATISTICS_H_ +#define STATISTICS_H_ + +#include "call.h" + +struct stats { + atomic64 packets; + atomic64 bytes; + atomic64 errors; + u_int64_t delay_min; + u_int64_t delay_avg; + u_int64_t delay_max; + u_int8_t in_tos_tclass; /* XXX shouldn't be here - not stats */ + atomic64 foreign_sessions; // unresponsible via redis notification +}; + + +struct request_time { + mutex_t lock; + u_int64_t count; + struct timeval time_min, time_max, time_avg; +}; + + +struct totalstats { + time_t started; + atomic64 total_timeout_sess; + atomic64 total_foreign_sessions; + atomic64 total_rejected_sess; + atomic64 total_silent_timeout_sess; + atomic64 total_final_timeout_sess; + atomic64 total_regular_term_sess; + atomic64 total_forced_term_sess; + atomic64 total_relayed_packets; + atomic64 total_relayed_errors; + atomic64 total_nopacket_relayed_sess; + atomic64 total_oneway_stream_sess; + + u_int64_t foreign_sessions; + u_int64_t own_sessions; + u_int64_t total_sessions; + + mutex_t total_average_lock; /* for these two below */ + u_int64_t total_managed_sess; + struct timeval total_average_call_dur; + + mutex_t managed_sess_lock; /* for these below */ + u_int64_t managed_sess_max; /* per graphite interval statistic */ + u_int64_t managed_sess_min; /* per graphite interval statistic */ + + mutex_t total_calls_duration_lock; /* for these two below */ + struct timeval total_calls_duration_interval; + + struct request_time offer, answer, delete; +}; + +struct rtp_stats { + unsigned int payload_type; + atomic64 packets; + atomic64 bytes; + atomic64 kernel_packets; + atomic64 kernel_bytes; + atomic64 in_tos_tclass; +}; + + +struct call_stats { + time_t last_packet; + struct stats totals[4]; /* rtp in, rtcp in, rtp out, rtcp out */ +}; + + +struct callmaster; + +void statistics_update_oneway(struct call *); +void statistics_update_foreignown_dec(struct call *); +void statistics_update_foreignown_inc(struct callmaster *m, struct call* c); +void statistics_update_totals(struct callmaster *, struct packet_stream *) ; + +#endif /* STATISTICS_H_ */ diff --git a/daemon/stun.h b/daemon/stun.h index 7cddb6e28..640d1392c 100644 --- a/daemon/stun.h +++ b/daemon/stun.h @@ -37,7 +37,7 @@ INLINE int is_stun(const str *s) { if (s->len < 20) return 0; - if ((b[0] & 0xb0) != 0x00) + if ((b[0] & 0xc0) != 0x00) return 0; if ((b[3] & 0x3) != 0x0) return 0; diff --git a/debian/changelog b/debian/changelog index 83afac89d..d70e38be1 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,52 @@ +ngcp-rtpengine (5.3.0.0+0~mr5.3.0.0) unstable; urgency=medium + + [ Frederic-Philippe Metz ] + * [749a7da] Implements redis onekey concept + * [9ebd06f] Some fixes for pull request #316 + * [6a3bcf5] Fixes STR_FMT comment my @rfuchs or pullrequest 316 + * [88b89ea] Fixes STR_FMT comment my @rfuchs or pull request #316 + * [ed760fb] Fix more issues from richards comments + * [ea9512c] Adds URI encoding for json strings in redis. + * [f3364d9] Omits redisreply in redis restore and eliminates 'multikey' feature + * [14b37eb] Removes multikey stuff + * [6985784] Fixes redis recording flag in onekey concept + * [d08dd6a] Fixes SRTP restore in onekey concept + * [279e5fa] Removes commented code + * [d904fb2] Removes 'json-' prefix from redis key (callid) + * [267b57c] Implemented comments from Richard from pull req #323 + + [ Richard Fuchs ] + * [2992031] TT#10252 add lsb-base to package depends as per lintian + * [8b18bc5] TT#10155 split recording daemon into separate debian package + * [f77726c] update redis one-key concept + * [f5cc21f] TT#10156 write recordings metadata into mysql database + * [d8cc8ca] combine two mallocs into one for redis restore + * [d513c6d] add missing lock for failed call restores + * [9a5cba4] use more precise avcodec/avformat version test macros + * [76ba587] TT#13000 add init.d setuid/setgid capability + * [db6a37a] TT#13000 handle output avio context failure + * [b4694eb] port of a3f27f8751c to kernel space + * [846886c] TT#13000 include recording filename suffix in db data + * [4566bd3] TT#13005 store recording metadata to database + + [ Michael Prokop ] + * [5784cb7] TT#10155 Use stop/start behavior for init script's restart action + + [ Victor Seva ] + * [89393e1] TT#11400 debian: fix binary only builds + * [77f9e4c] TT#12000 ngcp-rtpengine-daemon.default: add missing TABLE + + [ Anthony Alba ] + * [1fc77bc] base64: flush base64 decoding, and skip base64 padding in crypto line + * [3e2e024] Determine base64 padding from enc_salt_key_len + + [ Changli Gao ] + * [a3f27f8] STUN: The most significant 2 bits is 0xc0 in hex + + [ Sipwise Jenkins Builder ] + + -- Sipwise Jenkins Builder Wed, 22 Mar 2017 11:02:14 +0100 + ngcp-rtpengine (5.2.0.0+0~mr5.2.0.0) unstable; urgency=medium [ Pawel Kuzak ] diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index a64688f39..c8f534cb9 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -338,7 +338,7 @@ struct re_stream { int eof; }; -#define HASH_BITS 8 /* make configurable? */ +#define RE_HASH_BITS 8 /* make configurable? */ struct rtpengine_table { atomic_t refcnt; rwlock_t target_lock; @@ -358,10 +358,10 @@ struct rtpengine_table { struct list_head calls; /* protected by calls.lock */ - spinlock_t calls_hash_lock[1 << HASH_BITS]; - struct hlist_head calls_hash[1 << HASH_BITS]; - spinlock_t streams_hash_lock[1 << HASH_BITS]; - struct hlist_head streams_hash[1 << HASH_BITS]; + spinlock_t calls_hash_lock[1 << RE_HASH_BITS]; + struct hlist_head calls_hash[1 << RE_HASH_BITS]; + spinlock_t streams_hash_lock[1 << RE_HASH_BITS]; + struct hlist_head streams_hash[1 << RE_HASH_BITS]; }; struct re_cipher { @@ -1651,7 +1651,7 @@ static int validate_srtp(struct rtpengine_srtp *s) { /* XXX shared code */ -static void aes_ctr_128(unsigned char *out, const unsigned char *in, int in_len, +static void aes_ctr(unsigned char *out, const unsigned char *in, int in_len, struct crypto_cipher *tfm, const unsigned char *iv) { unsigned char ivx[16]; @@ -1770,7 +1770,7 @@ static int aes_ctr_128_no_ctx(unsigned char *out, const char *in, int in_len, return PTR_ERR(tfm); crypto_cipher_setkey(tfm, key, key_len); - aes_ctr_128(out, in, in_len, tfm, iv); + aes_ctr(out, in, in_len, tfm, iv); crypto_free_cipher(tfm); return 0; @@ -2479,7 +2479,7 @@ static int table_new_call(struct rtpengine_table *table, struct rtpengine_call_i /* check for name collisions */ call->hash_bucket = crc32_le(0x52342, info->call_id, strlen(info->call_id)); - call->hash_bucket = call->hash_bucket & ((1 << HASH_BITS) - 1); + call->hash_bucket = call->hash_bucket & ((1 << RE_HASH_BITS) - 1); spin_lock_irqsave(&table->calls_hash_lock[call->hash_bucket], flags); @@ -2655,7 +2655,7 @@ static int table_new_stream(struct rtpengine_table *table, struct rtpengine_stre /* check for name collisions */ stream->hash_bucket = crc32_le(0x52342 ^ info->call_idx, info->stream_name, strlen(info->stream_name)); - stream->hash_bucket = stream->hash_bucket & ((1 << HASH_BITS) - 1); + stream->hash_bucket = stream->hash_bucket & ((1 << RE_HASH_BITS) - 1); spin_lock_irqsave(&table->streams_hash_lock[stream->hash_bucket], flags); @@ -3563,7 +3563,7 @@ static int srtp_encrypt_aes_cm(struct re_crypto_context *c, ivi[2] ^= idxh; ivi[3] ^= idxl; - aes_ctr_128(r->payload, r->payload, r->payload_len, c->tfm[0], iv); + aes_ctr(r->payload, r->payload, r->payload_len, c->tfm[0], iv); return 0; } @@ -3742,7 +3742,7 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, u32 = (void *) skb->data; if (u32[1] != htonl(0x2112A442UL)) /* magic cookie */ goto not_stun; - if ((u32[0] & htonl(0xb0000003UL))) /* zero bits required by rfc */ + if ((u32[0] & htonl(0xc0000003UL))) /* zero bits required by rfc */ goto not_stun; u32 = (void *) &skb->data[datalen - 8]; if (u32[0] != htonl(0x80280004UL)) /* required fingerprint attribute */ diff --git a/recording-daemon/db.c b/recording-daemon/db.c index 2804212ba..12dd38755 100644 --- a/recording-daemon/db.c +++ b/recording-daemon/db.c @@ -13,7 +13,8 @@ static MYSQL_STMT __thread *stm_close_call, *stm_insert_stream, *stm_close_stream, - *stm_config_stream; + *stm_config_stream, + *stm_insert_metadata; static void my_stmt_close(MYSQL_STMT **st) { @@ -30,6 +31,7 @@ static void reset_conn() { my_stmt_close(&stm_insert_stream); my_stmt_close(&stm_close_stream); my_stmt_close(&stm_config_stream); + my_stmt_close(&stm_insert_metadata); mysql_close(mysql_conn); mysql_conn = NULL; } @@ -71,7 +73,7 @@ static int check_conn() { if (prep(&stm_insert_stream, "insert into recording_streams (`call`, local_filename, full_filename, " \ "file_format, " \ "output_type, " \ - "stream_id, ssrc) values (?,?,?,?,?,?,?)")) + "stream_id, ssrc) values (?,concat(?,'.',?),concat(?,'.',?),?,?,?,?)")) goto err; if (prep(&stm_close_call, "update recording_calls set end_time = now() where id = ?")) goto err; @@ -79,6 +81,9 @@ static int check_conn() { goto err; if (prep(&stm_config_stream, "update recording_streams set channels = ?, sample_rate = ? where id = ?")) goto err; + if (prep(&stm_insert_metadata, "insert into recording_metakeys (`call`, `key`, `value`) values " \ + "(?,?,?)")) + goto err; ilog(LOG_INFO, "Connection to MySQL established"); @@ -96,14 +101,20 @@ err: } -INLINE void my_str(MYSQL_BIND *b, const char *s) { +INLINE void my_str_len(MYSQL_BIND *b, const char *s, unsigned int len) { *b = (MYSQL_BIND) { .buffer_type = MYSQL_TYPE_STRING, .buffer = (void *) s, - .buffer_length = strlen(s), + .buffer_length = len, .length = &b->buffer_length, }; } +INLINE void my_str(MYSQL_BIND *b, const str *s) { + my_str_len(b, s->s, s->len); +} +INLINE void my_cstr(MYSQL_BIND *b, const char *s) { + my_str_len(b, s, strlen(s)); +} INLINE void my_ull(MYSQL_BIND *b, const unsigned long long *ull) { *b = (MYSQL_BIND) { .buffer_type = MYSQL_TYPE_LONGLONG, @@ -150,17 +161,58 @@ err: } -void db_do_call(metafile_t *mf) { - if (check_conn()) - return; +static void db_do_call_id(metafile_t *mf) { if (mf->db_id > 0) return; + if (!mf->call_id) + return; MYSQL_BIND b[1]; - my_str(&b[0], mf->call_id); + my_cstr(&b[0], mf->call_id); execute_wrap(&stm_insert_call, b, &mf->db_id); } +static void db_do_call_metadata(metafile_t *mf) { + if (!mf->metadata) + return; + if (mf->db_id <= 0) + return; + + MYSQL_BIND b[3]; + my_ull(&b[0], &mf->db_id); // stays persistent + + // XXX offload this parsing to proxy module -> bencode list/dictionary + str all_meta; + str_init(&all_meta, mf->metadata); + while (all_meta.len > 1) { + str token; + if (str_token(&token, &all_meta, '|')) { + // separator not found, use remainder as token + token = all_meta; + all_meta.len = 0; + } + str key; + if (str_token(&key, &token, ':')) { + // key:value separator not found, skip + continue; + } + + my_str(&b[1], &key); + my_str(&b[2], &token); + + execute_wrap(&stm_insert_metadata, b, NULL); + } + + mf->metadata = NULL; +} + +void db_do_call(metafile_t *mf) { + if (check_conn()) + return; + + db_do_call_id(mf); + db_do_call_metadata(mf); +} void db_do_stream(metafile_t *mf, output_t *op, const char *type, unsigned int id, unsigned long ssrc) { @@ -171,19 +223,21 @@ void db_do_stream(metafile_t *mf, output_t *op, const char *type, unsigned int i if (op->db_id > 0) return; - MYSQL_BIND b[7]; + MYSQL_BIND b[9]; my_ull(&b[0], &mf->db_id); - my_str(&b[1], op->file_name); - my_str(&b[2], op->full_filename); - my_str(&b[3], op->file_format); - my_str(&b[4], type); - b[5] = (MYSQL_BIND) { + my_cstr(&b[1], op->file_name); + my_cstr(&b[2], op->file_format); + my_cstr(&b[3], op->full_filename); + my_cstr(&b[4], op->file_format); + my_cstr(&b[5], op->file_format); + my_cstr(&b[6], type); + b[7] = (MYSQL_BIND) { .buffer_type = MYSQL_TYPE_LONG, .buffer = &id, .buffer_length = sizeof(id), .is_unsigned = 1, }; - b[6] = (MYSQL_BIND) { + b[8] = (MYSQL_BIND) { .buffer_type = MYSQL_TYPE_LONG, .buffer = &ssrc, .buffer_length = sizeof(ssrc), diff --git a/recording-daemon/metafile.c b/recording-daemon/metafile.c index 03e2d10ac..2d21892fa 100644 --- a/recording-daemon/metafile.c +++ b/recording-daemon/metafile.c @@ -94,6 +94,13 @@ static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned i } +// mf is locked +static void meta_metadata(metafile_t *mf, char *content) { + mf->metadata = g_string_chunk_insert(mf->gsc, content); + db_do_call(mf); +} + + // mf is locked static void meta_section(metafile_t *mf, char *section, char *content, unsigned long len) { unsigned long lu; @@ -103,6 +110,8 @@ static void meta_section(metafile_t *mf, char *section, char *content, unsigned mf->call_id = g_string_chunk_insert(mf->gsc, content); else if (!strcmp(section, "PARENT")) mf->parent = g_string_chunk_insert(mf->gsc, content); + else if (!strcmp(section, "METADATA")) + meta_metadata(mf, content); else if (sscanf_match(section, "STREAM %lu interface", &lu) == 1) meta_stream_interface(mf, lu, content); else if (sscanf_match(section, "STREAM %lu details", &lu) == 1) diff --git a/recording-daemon/types.h b/recording-daemon/types.h index c84f82ea2..e387ce42b 100644 --- a/recording-daemon/types.h +++ b/recording-daemon/types.h @@ -88,6 +88,7 @@ struct metafile_s { char *name; char *parent; char *call_id; + char *metadata; off_t pos; unsigned long long db_id;