TT#28163 abstractize packet sequencer and move to lib

Change-Id: I7bfa0f6e223bfe21c0e155a159fc4d125117bc3b
changes/97/18597/9
Richard Fuchs 8 years ago
parent f8aad04948
commit a967f9dcbb

@ -22,6 +22,7 @@ flags = [
'-D_GNU_SOURCE',
'-D__DEBUG=1',
'-D__YCM=1',
'-I../daemon',
'-DRTPENGINE_VERSION="dummy"',
'-DRE_PLUGIN_DIR="/usr/lib/rtpengine"',
'-DWITH_IPTABLES_OPTION',

@ -5,6 +5,7 @@
#include <glib.h>
#include "str.h"
#include "log.h"
#include "loglib.h"
#include "resample.h"
@ -266,3 +267,122 @@ void codeclib_init() {
avfilter_register_all();
avformat_network_init();
}
static int ptr_cmp(const void *a, const void *b, void *dummy) {
if (a < b)
return -1;
if (a > b)
return 1;
return 0;
}
void packet_sequencer_init(packet_sequencer_t *ps, GDestroyNotify ffunc) {
ps->packets = g_tree_new_full(ptr_cmp, NULL, NULL, ffunc);
ps->seq = -1;
}
void packet_sequencer_destroy(packet_sequencer_t *ps) {
g_tree_destroy(ps->packets);
}
struct tree_searcher {
int find_seq,
found_seq;
};
static int packet_tree_search(const void *testseq_p, const void *ts_p) {
struct tree_searcher *ts = (void *) ts_p;
int testseq = GPOINTER_TO_INT(testseq_p);
// called as a binary search test function. we're looking for the lowest
// seq number that is higher than find_seq. if our test number is too low,
// we proceed with higher numbers. if it's too high, we proceed to the lower
// numbers, but remember the lowest we've seen along that path.
if (G_UNLIKELY(testseq == ts->find_seq)) {
// we've struck gold
ts->found_seq = testseq;
return 0;
}
if (testseq < ts->find_seq)
return 1;
// testseq > ts->find_seq
if (ts->found_seq == -1 || testseq < ts->found_seq)
ts->found_seq = testseq;
return -1;
}
// caller must take care of locking
void *packet_sequencer_next_packet(packet_sequencer_t *ps) {
// see if we have a packet with the correct seq nr in the queue
seq_packet_t *packet = g_tree_lookup(ps->packets, GINT_TO_POINTER(ps->seq));
if (G_LIKELY(packet != NULL)) {
dbg("returning in-sequence packet (seq %i)", ps->seq);
goto out;
}
// why not? do we have anything? (we should)
int nnodes = g_tree_nnodes(ps->packets);
if (G_UNLIKELY(nnodes == 0)) {
dbg("packet queue empty");
return NULL;
}
if (G_LIKELY(nnodes < 10)) { // XXX arbitrary value
dbg("only %i packets in queue - waiting for more", nnodes);
return NULL; // need to wait for more
}
// packet was probably lost. search for the next highest seq
struct tree_searcher ts = { .find_seq = ps->seq + 1, .found_seq = -1 };
packet = g_tree_search(ps->packets, packet_tree_search, &ts);
if (packet) {
// bullseye
dbg("lost packet - returning packet with next seq %i", packet->seq);
goto out;
}
if (G_UNLIKELY(ts.found_seq == -1)) {
// didn't find anything. seq must have wrapped around. retry
// starting from zero
ts.find_seq = 0;
packet = g_tree_search(ps->packets, packet_tree_search, &ts);
if (packet) {
dbg("lost packet - returning packet with next seq %i (after wrap)", packet->seq);
goto out;
}
if (G_UNLIKELY(ts.found_seq == -1))
abort();
}
// pull out the packet we found
packet = g_tree_lookup(ps->packets, GINT_TO_POINTER(ts.found_seq));
if (G_UNLIKELY(packet == NULL))
abort();
dbg("lost multiple packets - returning packet with next highest seq %i", packet->seq);
out:
g_tree_steal(ps->packets, GINT_TO_POINTER(packet->seq));
ps->seq = (packet->seq + 1) & 0xffff;
return packet;
}
int packet_sequencer_insert(packet_sequencer_t *ps, seq_packet_t *p) {
// check seq for dupes
if (G_UNLIKELY(ps->seq == -1)) {
// first packet we see
ps->seq = p->seq;
goto seq_ok;
}
int diff = p->seq - ps->seq;
if (diff >= 0x8000)
return -1;
if (diff < 0 && diff > -0x8000)
return -1;
// seq ok - fall thru
seq_ok:
if (g_tree_lookup(ps->packets, GINT_TO_POINTER(p->seq)))
return -1;
g_tree_insert(ps->packets, GINT_TO_POINTER(p->seq), p);
return 0;
}

@ -12,11 +12,15 @@ struct codec_def_s;
struct decoder_s;
struct format_s;
struct resample_s;
struct seq_packet_s;
struct packet_sequencer_s;
typedef struct codec_def_s codec_def_t;
typedef struct decoder_s decoder_t;
typedef struct format_s format_t;
typedef struct resample_s resample_t;
typedef struct seq_packet_s seq_packet_t;
typedef struct packet_sequencer_s packet_sequencer_t;
@ -52,6 +56,14 @@ struct decoder_s {
unsigned int mixer_idx;
};
struct seq_packet_s {
int seq;
};
struct packet_sequencer_s {
GTree *packets;
int seq;
};
void codeclib_init(void);
@ -67,6 +79,13 @@ int decoder_input_data(decoder_t *dec, const str *data, unsigned long ts,
void packet_sequencer_init(packet_sequencer_t *ps, GDestroyNotify);
void packet_sequencer_destroy(packet_sequencer_t *ps);
void *packet_sequencer_next_packet(packet_sequencer_t *ps);
int packet_sequencer_insert(packet_sequencer_t *ps, seq_packet_t *);
INLINE int format_eq(const format_t *a, const format_t *b) {
if (G_UNLIKELY(a->clockrate != b->clockrate))
return 0;

@ -15,15 +15,6 @@
#include "db.h"
static int ptr_cmp(const void *a, const void *b, void *dummy) {
if (a < b)
return -1;
if (a > b)
return 1;
return 0;
}
static void packet_free(void *p) {
packet_t *packet = p;
if (!packet)
@ -35,7 +26,7 @@ static void packet_free(void *p) {
void ssrc_free(void *p) {
ssrc_t *s = p;
g_tree_destroy(s->packets);
packet_sequencer_destroy(&s->sequencer);
output_close(s->output);
for (int i = 0; i < G_N_ELEMENTS(s->decoders); i++)
decoder_close(s->decoders[i]);
@ -56,8 +47,7 @@ static ssrc_t *ssrc_get(stream_t *stream, unsigned long ssrc) {
ret->metafile = mf;
ret->stream = stream;
ret->ssrc = ssrc;
ret->packets = g_tree_new_full(ptr_cmp, NULL, NULL, packet_free);
ret->seq = -1;
packet_sequencer_init(&ret->sequencer, packet_free);
char buf[256];
snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc);
@ -75,82 +65,6 @@ out:
}
struct tree_searcher {
int find_seq,
found_seq;
};
static int ssrc_tree_search(const void *testseq_p, const void *ts_p) {
struct tree_searcher *ts = (void *) ts_p;
int testseq = GPOINTER_TO_INT(testseq_p);
// called as a binary search test function. we're looking for the lowest
// seq number that is higher than find_seq. if our test number is too low,
// we proceed with higher numbers. if it's too high, we proceed to the lower
// numbers, but remember the lowest we've seen along that path.
if (G_UNLIKELY(testseq == ts->find_seq)) {
// we've struck gold
ts->found_seq = testseq;
return 0;
}
if (testseq < ts->find_seq)
return 1;
// testseq > ts->find_seq
if (ts->found_seq == -1 || testseq < ts->found_seq)
ts->found_seq = testseq;
return -1;
}
// ssrc is locked
static packet_t *ssrc_next_packet(ssrc_t *ssrc) {
// see if we have a packet with the correct seq nr in the queue
packet_t *packet = g_tree_lookup(ssrc->packets, GINT_TO_POINTER(ssrc->seq));
if (G_LIKELY(packet != NULL)) {
dbg("returning in-sequence packet (seq %i)", ssrc->seq);
return packet;
}
// why not? do we have anything? (we should)
int nnodes = g_tree_nnodes(ssrc->packets);
if (G_UNLIKELY(nnodes == 0)) {
dbg("packet queue empty");
return NULL;
}
if (G_LIKELY(nnodes < 10)) { // XXX arbitrary value
dbg("only %i packets in queue - waiting for more", nnodes);
return NULL; // need to wait for more
}
// packet was probably lost. search for the next highest seq
struct tree_searcher ts = { .find_seq = ssrc->seq + 1, .found_seq = -1 };
packet = g_tree_search(ssrc->packets, ssrc_tree_search, &ts);
if (packet) {
// bullseye
dbg("lost packet - returning packet with next seq %i", packet->seq);
return packet;
}
if (G_UNLIKELY(ts.found_seq == -1)) {
// didn't find anything. seq must have wrapped around. retry
// starting from zero
ts.find_seq = 0;
packet = g_tree_search(ssrc->packets, ssrc_tree_search, &ts);
if (packet) {
dbg("lost packet - returning packet with next seq %i (after wrap)", packet->seq);
return packet;
}
if (G_UNLIKELY(ts.found_seq == -1))
abort();
}
// pull out the packet we found
packet = g_tree_lookup(ssrc->packets, GINT_TO_POINTER(ts.found_seq));
if (G_UNLIKELY(packet == NULL))
abort();
dbg("lost multiple packets - returning packet with next highest seq %i", packet->seq);
return packet;
}
// ssrc is locked
static void packet_decode(ssrc_t *ssrc, packet_t *packet) {
// determine payload type and run decoder
@ -191,18 +105,16 @@ static void packet_decode(ssrc_t *ssrc, packet_t *packet) {
static void ssrc_run(ssrc_t *ssrc) {
while (1) {
// see if we have a packet with the correct seq nr in the queue
packet_t *packet = ssrc_next_packet(ssrc);
packet_t *packet = packet_sequencer_next_packet(&ssrc->sequencer);
if (G_UNLIKELY(packet == NULL))
break;
dbg("processing packet seq %i", packet->seq);
g_tree_steal(ssrc->packets, GINT_TO_POINTER(packet->seq));
dbg("processing packet seq %i", packet->p.seq);
packet_decode(ssrc, packet);
ssrc->seq = (packet->seq + 1) & 0xffff;
packet_free(packet);
dbg("packets left in queue: %i", g_tree_nnodes(ssrc->packets));
dbg("packets left in queue: %i", g_tree_nnodes(ssrc->sequencer.packets));
}
pthread_mutex_unlock(&ssrc->lock);
@ -241,31 +153,15 @@ void packet_process(stream_t *stream, unsigned char *buf, unsigned len) {
if (rtp_padding(packet->rtp, &packet->payload))
goto err;
packet->seq = ntohs(packet->rtp->seq_num);
packet->p.seq = ntohs(packet->rtp->seq_num);
unsigned long ssrc_num = ntohl(packet->rtp->ssrc);
log_info_ssrc = ssrc_num;
dbg("packet parsed successfully, seq %u", packet->seq);
dbg("packet parsed successfully, seq %u", packet->p.seq);
// insert into ssrc queue
ssrc_t *ssrc = ssrc_get(stream, ssrc_num);
// check seq for dupes
if (G_UNLIKELY(ssrc->seq == -1)) {
// first packet we see
ssrc->seq = packet->seq;
goto seq_ok;
}
int diff = packet->seq - ssrc->seq;
if (diff >= 0x8000)
goto dupe;
if (diff < 0 && diff > -0x8000)
goto dupe;
// seq ok - fall thru
seq_ok:
if (g_tree_lookup(ssrc->packets, GINT_TO_POINTER(packet->seq)))
if (packet_sequencer_insert(&ssrc->sequencer, &packet->p))
goto dupe;
g_tree_insert(ssrc->packets, GINT_TO_POINTER(packet->seq), packet);
// got a new packet, run the decoder
ssrc_run(ssrc);
@ -273,7 +169,7 @@ seq_ok:
return;
dupe:
dbg("skipping dupe packet (new seq %i prev seq %i)", packet->seq, ssrc->seq);
dbg("skipping dupe packet (new seq %i prev seq %i)", packet->p.seq, ssrc->sequencer.seq);
pthread_mutex_unlock(&ssrc->lock);
packet_free(packet);
log_info_ssrc = 0;

@ -51,12 +51,12 @@ typedef struct stream_s stream_t;
struct packet_s {
seq_packet_t p; // must be first
void *buffer;
// pointers into buffer
struct iphdr *ip;
struct ip6_hdr *ip6;
struct udphdr *udp;
int seq;
struct rtp_header *rtp;
str payload;
@ -69,8 +69,7 @@ struct ssrc_s {
stream_t *stream;
metafile_t *metafile;
unsigned long ssrc;
GTree *packets; // contains packet_t objects
int seq; // next expected seq
packet_sequencer_t sequencer;
decoder_t *decoders[128];
output_t *output;
};

Loading…
Cancel
Save