diff --git a/recording-daemon/decoder.c b/recording-daemon/decoder.c index 1f0aea0a0..c60db29a7 100644 --- a/recording-daemon/decoder.c +++ b/recording-daemon/decoder.c @@ -93,6 +93,7 @@ int decoder_input(decoder_t *dec, const str *data, unsigned long ts, output_t *o else { // shift pts according to rtp ts shift dec->pts += (ts - dec->rtp_ts) * output->avst->time_base.num * 8000 / output->avst->time_base.den; + // XXX handle lost packets here if timestamps don't line up? } dec->rtp_ts = ts; diff --git a/recording-daemon/packet.c b/recording-daemon/packet.c index 90d8c7eb4..655031101 100644 --- a/recording-daemon/packet.c +++ b/recording-daemon/packet.c @@ -11,12 +11,10 @@ #include "decoder.h" -static int packet_cmp(const void *A, const void *B, void *dummy) { - const packet_t *a = A, *b = B; - - if (a->seq < b->seq) +static int ptr_cmp(const void *a, const void *b, void *dummy) { + if (a < b) return -1; - if (a->seq > b->seq) + if (a > b) return 1; return 0; } @@ -52,7 +50,7 @@ static ssrc_t *ssrc_get(metafile_t *mf, unsigned long ssrc) { pthread_mutex_init(&ret->lock, NULL); ret->metafile = mf; ret->ssrc = ssrc; - ret->packets = g_tree_new_full(packet_cmp, NULL, NULL, packet_free); + ret->packets = g_tree_new_full(ptr_cmp, NULL, NULL, packet_free); ret->seq = -1; char buf[256]; @@ -68,45 +66,88 @@ out: } -static gboolean ssrc_tree_get_first(void *key, void *val, void *data) { - packet_t **out = data; - *out = val; - return TRUE; +struct tree_searcher { + int find_seq, + found_seq; +}; +static int ssrc_tree_search(const void *testseq_p, const void *ts_p) { + struct tree_searcher *ts = (void *) ts_p; + int testseq = GPOINTER_TO_INT(testseq_p); + // called as a binary search test function. we're looking for the lowest + // seq number that is higher than find_seq. if our test number is too low, + // we proceed with higher numbers. if it's too high, we proceed to the lower + // numbers, but remember the lowest we've seen along that path. + if (G_UNLIKELY(testseq == ts->find_seq)) { + // we've struck gold + ts->found_seq = testseq; + return 0; + } + if (testseq < ts->find_seq) + return 1; + // testseq > ts->find_seq + if (ts->found_seq == -1 || testseq < ts->found_seq) + ts->found_seq = testseq; + return -1; } // ssrc is locked and must be unlocked when returning static void ssrc_run(ssrc_t *ssrc) { while (1) { - // inspect first packet to see if seq is correct - packet_t *first = NULL; - g_tree_foreach(ssrc->packets, ssrc_tree_get_first, &first); - if (!first) - break; - if (first->seq != ssrc->seq) - break; // need to wait for more - + // see if we have a packet with the correct seq nr in the queue + packet_t *packet = g_tree_lookup(ssrc->packets, GINT_TO_POINTER(ssrc->seq)); + if (G_UNLIKELY(!packet)) { + // why not? do we have anything? (we should) + int nnodes = g_tree_nnodes(ssrc->packets); + if (G_UNLIKELY(nnodes == 0)) + break; + if (G_LIKELY(nnodes < 10)) // XXX arbitrary value + break; // need to wait for more + + // packet was probably lost. search for the next highest seq + struct tree_searcher ts = { .find_seq = ssrc->seq + 1, .found_seq = -1 }; + packet = g_tree_search(ssrc->packets, ssrc_tree_search, &ts); + if (packet) // bullseye + goto have_packet; + if (G_UNLIKELY(ts.found_seq == -1)) { + // didn't find anything. seq must have wrapped around. retry + // starting from zero + ts.find_seq = 0; + packet = g_tree_search(ssrc->packets, ssrc_tree_search, &ts); + if (packet) + goto have_packet; + if (G_UNLIKELY(ts.found_seq == -1)) + abort(); + } + + // pull out the packet we found + packet = g_tree_lookup(ssrc->packets, GINT_TO_POINTER(ts.found_seq)); + if (G_UNLIKELY(packet == NULL)) + abort(); + } + +have_packet:; // determine payload type and run decoder - unsigned int payload_type = first->rtp->m_pt & 0x7f; + unsigned int payload_type = packet->rtp->m_pt & 0x7f; metafile_t *mf = ssrc->metafile; pthread_mutex_lock(&mf->payloads_lock); char *payload_str = mf->payload_types[payload_type]; pthread_mutex_unlock(&mf->payloads_lock); - dbg("processing packet seq %i, payload type is %s", first->seq, payload_str); - g_tree_steal(ssrc->packets, first); + dbg("processing packet seq %i, payload type is %s", packet->seq, payload_str); + g_tree_steal(ssrc->packets, GINT_TO_POINTER(packet->seq)); // check if we have a decoder for this payload type yet - if (!ssrc->decoders[payload_type]) + if (G_UNLIKELY(!ssrc->decoders[payload_type])) ssrc->decoders[payload_type] = decoder_new(payload_type, payload_str); // XXX error handling - decoder_input(ssrc->decoders[payload_type], &first->payload, ntohl(first->rtp->timestamp), + decoder_input(ssrc->decoders[payload_type], &packet->payload, ntohl(packet->rtp->timestamp), ssrc->output); - packet_free(first); + ssrc->seq = (packet->seq + 1) & 0xffff; + packet_free(packet); dbg("packets left in queue: %i", g_tree_nnodes(ssrc->packets)); - ssrc->seq = (ssrc->seq + 1) & 0xffff; } pthread_mutex_unlock(&ssrc->lock); @@ -162,9 +203,9 @@ void packet_process(stream_t *stream, unsigned char *buf, unsigned len) { goto dupe; // seq ok - fall thru seq_ok: - if (g_tree_lookup(ssrc->packets, packet)) + if (g_tree_lookup(ssrc->packets, GINT_TO_POINTER(packet->seq))) goto dupe; - g_tree_insert(ssrc->packets, packet, packet); + g_tree_insert(ssrc->packets, GINT_TO_POINTER(packet->seq), packet); // got a new packet, run the decoder ssrc_run(ssrc);