diff --git a/lib/.ycm_extra_conf.py b/lib/.ycm_extra_conf.py index d60552bbf..2336c492f 100644 --- a/lib/.ycm_extra_conf.py +++ b/lib/.ycm_extra_conf.py @@ -22,6 +22,7 @@ flags = [ '-D_GNU_SOURCE', '-D__DEBUG=1', '-D__YCM=1', + '-I../daemon', '-DRTPENGINE_VERSION="dummy"', '-DRE_PLUGIN_DIR="/usr/lib/rtpengine"', '-DWITH_IPTABLES_OPTION', diff --git a/lib/codeclib.c b/lib/codeclib.c index a812b7d5b..0d390d133 100644 --- a/lib/codeclib.c +++ b/lib/codeclib.c @@ -5,6 +5,7 @@ #include #include "str.h" #include "log.h" +#include "loglib.h" #include "resample.h" @@ -266,3 +267,122 @@ void codeclib_init() { avfilter_register_all(); avformat_network_init(); } + + + + + + +static int ptr_cmp(const void *a, const void *b, void *dummy) { + if (a < b) + return -1; + if (a > b) + return 1; + return 0; +} + +void packet_sequencer_init(packet_sequencer_t *ps, GDestroyNotify ffunc) { + ps->packets = g_tree_new_full(ptr_cmp, NULL, NULL, ffunc); + ps->seq = -1; +} +void packet_sequencer_destroy(packet_sequencer_t *ps) { + g_tree_destroy(ps->packets); +} +struct tree_searcher { + int find_seq, + found_seq; +}; +static int packet_tree_search(const void *testseq_p, const void *ts_p) { + struct tree_searcher *ts = (void *) ts_p; + int testseq = GPOINTER_TO_INT(testseq_p); + // called as a binary search test function. we're looking for the lowest + // seq number that is higher than find_seq. if our test number is too low, + // we proceed with higher numbers. if it's too high, we proceed to the lower + // numbers, but remember the lowest we've seen along that path. + if (G_UNLIKELY(testseq == ts->find_seq)) { + // we've struck gold + ts->found_seq = testseq; + return 0; + } + if (testseq < ts->find_seq) + return 1; + // testseq > ts->find_seq + if (ts->found_seq == -1 || testseq < ts->found_seq) + ts->found_seq = testseq; + return -1; +} +// caller must take care of locking +void *packet_sequencer_next_packet(packet_sequencer_t *ps) { + // see if we have a packet with the correct seq nr in the queue + seq_packet_t *packet = g_tree_lookup(ps->packets, GINT_TO_POINTER(ps->seq)); + if (G_LIKELY(packet != NULL)) { + dbg("returning in-sequence packet (seq %i)", ps->seq); + goto out; + } + + // why not? do we have anything? (we should) + int nnodes = g_tree_nnodes(ps->packets); + if (G_UNLIKELY(nnodes == 0)) { + dbg("packet queue empty"); + return NULL; + } + if (G_LIKELY(nnodes < 10)) { // XXX arbitrary value + dbg("only %i packets in queue - waiting for more", nnodes); + return NULL; // need to wait for more + } + + // packet was probably lost. search for the next highest seq + struct tree_searcher ts = { .find_seq = ps->seq + 1, .found_seq = -1 }; + packet = g_tree_search(ps->packets, packet_tree_search, &ts); + if (packet) { + // bullseye + dbg("lost packet - returning packet with next seq %i", packet->seq); + goto out; + } + if (G_UNLIKELY(ts.found_seq == -1)) { + // didn't find anything. seq must have wrapped around. retry + // starting from zero + ts.find_seq = 0; + packet = g_tree_search(ps->packets, packet_tree_search, &ts); + if (packet) { + dbg("lost packet - returning packet with next seq %i (after wrap)", packet->seq); + goto out; + } + if (G_UNLIKELY(ts.found_seq == -1)) + abort(); + } + + // pull out the packet we found + packet = g_tree_lookup(ps->packets, GINT_TO_POINTER(ts.found_seq)); + if (G_UNLIKELY(packet == NULL)) + abort(); + + dbg("lost multiple packets - returning packet with next highest seq %i", packet->seq); + +out: + g_tree_steal(ps->packets, GINT_TO_POINTER(packet->seq)); + ps->seq = (packet->seq + 1) & 0xffff; + return packet; +} +int packet_sequencer_insert(packet_sequencer_t *ps, seq_packet_t *p) { + // check seq for dupes + if (G_UNLIKELY(ps->seq == -1)) { + // first packet we see + ps->seq = p->seq; + goto seq_ok; + } + + int diff = p->seq - ps->seq; + if (diff >= 0x8000) + return -1; + if (diff < 0 && diff > -0x8000) + return -1; + + // seq ok - fall thru +seq_ok: + if (g_tree_lookup(ps->packets, GINT_TO_POINTER(p->seq))) + return -1; + g_tree_insert(ps->packets, GINT_TO_POINTER(p->seq), p); + + return 0; +} diff --git a/lib/codeclib.h b/lib/codeclib.h index 632e9be62..472502bcb 100644 --- a/lib/codeclib.h +++ b/lib/codeclib.h @@ -12,11 +12,15 @@ struct codec_def_s; struct decoder_s; struct format_s; struct resample_s; +struct seq_packet_s; +struct packet_sequencer_s; typedef struct codec_def_s codec_def_t; typedef struct decoder_s decoder_t; typedef struct format_s format_t; typedef struct resample_s resample_t; +typedef struct seq_packet_s seq_packet_t; +typedef struct packet_sequencer_s packet_sequencer_t; @@ -52,6 +56,14 @@ struct decoder_s { unsigned int mixer_idx; }; +struct seq_packet_s { + int seq; +}; +struct packet_sequencer_s { + GTree *packets; + int seq; +}; + void codeclib_init(void); @@ -67,6 +79,13 @@ int decoder_input_data(decoder_t *dec, const str *data, unsigned long ts, +void packet_sequencer_init(packet_sequencer_t *ps, GDestroyNotify); +void packet_sequencer_destroy(packet_sequencer_t *ps); +void *packet_sequencer_next_packet(packet_sequencer_t *ps); +int packet_sequencer_insert(packet_sequencer_t *ps, seq_packet_t *); + + + INLINE int format_eq(const format_t *a, const format_t *b) { if (G_UNLIKELY(a->clockrate != b->clockrate)) return 0; diff --git a/recording-daemon/packet.c b/recording-daemon/packet.c index 63e540423..91b1e0d12 100644 --- a/recording-daemon/packet.c +++ b/recording-daemon/packet.c @@ -15,15 +15,6 @@ #include "db.h" -static int ptr_cmp(const void *a, const void *b, void *dummy) { - if (a < b) - return -1; - if (a > b) - return 1; - return 0; -} - - static void packet_free(void *p) { packet_t *packet = p; if (!packet) @@ -35,7 +26,7 @@ static void packet_free(void *p) { void ssrc_free(void *p) { ssrc_t *s = p; - g_tree_destroy(s->packets); + packet_sequencer_destroy(&s->sequencer); output_close(s->output); for (int i = 0; i < G_N_ELEMENTS(s->decoders); i++) decoder_close(s->decoders[i]); @@ -56,8 +47,7 @@ static ssrc_t *ssrc_get(stream_t *stream, unsigned long ssrc) { ret->metafile = mf; ret->stream = stream; ret->ssrc = ssrc; - ret->packets = g_tree_new_full(ptr_cmp, NULL, NULL, packet_free); - ret->seq = -1; + packet_sequencer_init(&ret->sequencer, packet_free); char buf[256]; snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc); @@ -75,82 +65,6 @@ out: } -struct tree_searcher { - int find_seq, - found_seq; -}; -static int ssrc_tree_search(const void *testseq_p, const void *ts_p) { - struct tree_searcher *ts = (void *) ts_p; - int testseq = GPOINTER_TO_INT(testseq_p); - // called as a binary search test function. we're looking for the lowest - // seq number that is higher than find_seq. if our test number is too low, - // we proceed with higher numbers. if it's too high, we proceed to the lower - // numbers, but remember the lowest we've seen along that path. - if (G_UNLIKELY(testseq == ts->find_seq)) { - // we've struck gold - ts->found_seq = testseq; - return 0; - } - if (testseq < ts->find_seq) - return 1; - // testseq > ts->find_seq - if (ts->found_seq == -1 || testseq < ts->found_seq) - ts->found_seq = testseq; - return -1; -} - - -// ssrc is locked -static packet_t *ssrc_next_packet(ssrc_t *ssrc) { - // see if we have a packet with the correct seq nr in the queue - packet_t *packet = g_tree_lookup(ssrc->packets, GINT_TO_POINTER(ssrc->seq)); - if (G_LIKELY(packet != NULL)) { - dbg("returning in-sequence packet (seq %i)", ssrc->seq); - return packet; - } - - // why not? do we have anything? (we should) - int nnodes = g_tree_nnodes(ssrc->packets); - if (G_UNLIKELY(nnodes == 0)) { - dbg("packet queue empty"); - return NULL; - } - if (G_LIKELY(nnodes < 10)) { // XXX arbitrary value - dbg("only %i packets in queue - waiting for more", nnodes); - return NULL; // need to wait for more - } - - // packet was probably lost. search for the next highest seq - struct tree_searcher ts = { .find_seq = ssrc->seq + 1, .found_seq = -1 }; - packet = g_tree_search(ssrc->packets, ssrc_tree_search, &ts); - if (packet) { - // bullseye - dbg("lost packet - returning packet with next seq %i", packet->seq); - return packet; - } - if (G_UNLIKELY(ts.found_seq == -1)) { - // didn't find anything. seq must have wrapped around. retry - // starting from zero - ts.find_seq = 0; - packet = g_tree_search(ssrc->packets, ssrc_tree_search, &ts); - if (packet) { - dbg("lost packet - returning packet with next seq %i (after wrap)", packet->seq); - return packet; - } - if (G_UNLIKELY(ts.found_seq == -1)) - abort(); - } - - // pull out the packet we found - packet = g_tree_lookup(ssrc->packets, GINT_TO_POINTER(ts.found_seq)); - if (G_UNLIKELY(packet == NULL)) - abort(); - - dbg("lost multiple packets - returning packet with next highest seq %i", packet->seq); - return packet; -} - - // ssrc is locked static void packet_decode(ssrc_t *ssrc, packet_t *packet) { // determine payload type and run decoder @@ -191,18 +105,16 @@ static void packet_decode(ssrc_t *ssrc, packet_t *packet) { static void ssrc_run(ssrc_t *ssrc) { while (1) { // see if we have a packet with the correct seq nr in the queue - packet_t *packet = ssrc_next_packet(ssrc); + packet_t *packet = packet_sequencer_next_packet(&ssrc->sequencer); if (G_UNLIKELY(packet == NULL)) break; - dbg("processing packet seq %i", packet->seq); - g_tree_steal(ssrc->packets, GINT_TO_POINTER(packet->seq)); + dbg("processing packet seq %i", packet->p.seq); packet_decode(ssrc, packet); - ssrc->seq = (packet->seq + 1) & 0xffff; packet_free(packet); - dbg("packets left in queue: %i", g_tree_nnodes(ssrc->packets)); + dbg("packets left in queue: %i", g_tree_nnodes(ssrc->sequencer.packets)); } pthread_mutex_unlock(&ssrc->lock); @@ -241,31 +153,15 @@ void packet_process(stream_t *stream, unsigned char *buf, unsigned len) { if (rtp_padding(packet->rtp, &packet->payload)) goto err; - packet->seq = ntohs(packet->rtp->seq_num); + packet->p.seq = ntohs(packet->rtp->seq_num); unsigned long ssrc_num = ntohl(packet->rtp->ssrc); log_info_ssrc = ssrc_num; - dbg("packet parsed successfully, seq %u", packet->seq); + dbg("packet parsed successfully, seq %u", packet->p.seq); // insert into ssrc queue ssrc_t *ssrc = ssrc_get(stream, ssrc_num); - - // check seq for dupes - if (G_UNLIKELY(ssrc->seq == -1)) { - // first packet we see - ssrc->seq = packet->seq; - goto seq_ok; - } - - int diff = packet->seq - ssrc->seq; - if (diff >= 0x8000) - goto dupe; - if (diff < 0 && diff > -0x8000) - goto dupe; - // seq ok - fall thru -seq_ok: - if (g_tree_lookup(ssrc->packets, GINT_TO_POINTER(packet->seq))) + if (packet_sequencer_insert(&ssrc->sequencer, &packet->p)) goto dupe; - g_tree_insert(ssrc->packets, GINT_TO_POINTER(packet->seq), packet); // got a new packet, run the decoder ssrc_run(ssrc); @@ -273,7 +169,7 @@ seq_ok: return; dupe: - dbg("skipping dupe packet (new seq %i prev seq %i)", packet->seq, ssrc->seq); + dbg("skipping dupe packet (new seq %i prev seq %i)", packet->p.seq, ssrc->sequencer.seq); pthread_mutex_unlock(&ssrc->lock); packet_free(packet); log_info_ssrc = 0; diff --git a/recording-daemon/types.h b/recording-daemon/types.h index bf4deb469..9ebc2c937 100644 --- a/recording-daemon/types.h +++ b/recording-daemon/types.h @@ -51,12 +51,12 @@ typedef struct stream_s stream_t; struct packet_s { + seq_packet_t p; // must be first void *buffer; // pointers into buffer struct iphdr *ip; struct ip6_hdr *ip6; struct udphdr *udp; - int seq; struct rtp_header *rtp; str payload; @@ -69,8 +69,7 @@ struct ssrc_s { stream_t *stream; metafile_t *metafile; unsigned long ssrc; - GTree *packets; // contains packet_t objects - int seq; // next expected seq + packet_sequencer_t sequencer; decoder_t *decoders[128]; output_t *output; };