diff --git a/perl/NGCP/Rtpclient/RTCP.pm b/perl/NGCP/Rtpclient/RTCP.pm new file mode 100644 index 000000000..9e74acd27 --- /dev/null +++ b/perl/NGCP/Rtpclient/RTCP.pm @@ -0,0 +1,107 @@ +package NGCP::Rtpclient::RTCP; + +use strict; +use warnings; +use Time::HiRes qw(time); +use Math::BigFloat; + +sub new { + my ($class, $cb_obj, $rtp_obj) = @_; + + $rtp_obj or return; + + my $self = {}; + bless $self, $class; + + $self->{cb_obj} = $cb_obj; + $self->{rtp_obj} = $rtp_obj; + + $self->{interval} = 2; # seconds + $self->{next_send} = time() + $self->{interval}; + + return $self; +} + +sub timer { + my ($self) = @_; + + time() < $self->{next_send} and return; + + my $pack = $self->_sr(); + + $self->{cb_obj}->rtcp_send($pack); + + $self->{next_send} = $self->{next_send} + $self->{interval}; +} + +sub input { + my ($self, $packet) = @_; + + my ($vprc, $pt, $len, $rest) = unpack('CCn a*', $packet); + ($vprc & 0xe0) == 0x80 or die; + my $rc = ($vprc & 0x1f); + $rc > 1 and die; + $len++; + $len <<= 2; + $len == length($packet) or die; + + if ($pt == 200) { + my ($ssrc, @sr) = unpack('NNNNNN', $rest); + $self->{last_sr}->{$ssrc} = { received_time => time(), packet => \@sr }; + } +} + +sub _sr { + my ($self) = @_; + + # receiver reports + my $rrs = ''; + my $num_rrs = 0; + my $others = $self->{rtp_obj}->{other_ssrcs}; + my @other_ssrcs = keys(%$others); + scalar(@other_ssrcs) <= 1 or die; + if (my $oss = $other_ssrcs[0]) { + my $ss = $others->{$oss}; + my ($lsr, $dlsr) = (0,0); + my $last_sr = $self->{last_sr}->{$ss->{ssrc}}; + if ($last_sr) { + # ntp timestamp fraction + $lsr = ($last_sr->{packet}->[0] << 16) | ($last_sr->{packet}->[1] >> 16); + $dlsr = (time() - $last_sr->{received_time}) * 65536; + } + # packets lost number + my $lost = $ss->{packets_lost} & 0x7ff; + my $lost_frac = $ss->{lost_last} / ($ss->{received_last} + $ss->{lost_last}); + $lost_frac *= 256; + $lost_frac = int($lost_frac); + $lost_frac < 0 and $lost_frac = 0; + $lost_frac > 255 and $lost_frac = 255; + $lost = $lost | ($lost_frac << 24); + + $rrs .= pack('NNNNNN', $ss->{ssrc}, $ss->{packets_lost}, $ss->{seq}, $ss->{jitter}, $lsr, $dlsr); + $num_rrs++; + } + + # actual sr + my $now = Math::BigFloat->new(time()); + $now->badd(2208988800); + my @parts = $now->dparts(); + my $ints = $parts[0]; + my $frac = $parts[1]; + $frac->bmul(Math::BigFloat->new('0x100000000')); + my $pl = pack("NNNNN", $ints, $frac, + $self->{rtp_obj}->{timestamp}->bstr(), + $self->{rtp_obj}->{packet_count}, $self->{rtp_obj}->{octet_count}); + + $pl .= $rrs; + + my $pack = $self->_header(200, $num_rrs, length($pl)) . $pl; + return $pack; +} + +sub _header { + my ($self, $type, $rc, $length) = @_; + return pack("CCnN", 0x80 | $rc, $type, (($length + 8) >> 2) - 1, $self->{rtp_obj}->{ssrc}); +} + +1; diff --git a/perl/NGCP/Rtpclient/RTP.pm b/perl/NGCP/Rtpclient/RTP.pm index 0f12a55b1..9980b7aac 100644 --- a/perl/NGCP/Rtpclient/RTP.pm +++ b/perl/NGCP/Rtpclient/RTP.pm @@ -4,6 +4,7 @@ use strict; use warnings; use Time::HiRes qw(time); use Math::BigInt; +use Math::BigFloat; sub new { my ($class, $cb_obj) = @_; @@ -20,6 +21,9 @@ sub new { $self->{timestamp} = Math::BigInt->new(int(rand(2**32))); $self->{seq} = rand(2**16); $self->{payload} = 100; + $self->{packet_count} = 0; + $self->{octet_count} = 0; + $self->{other_ssrcs} = {}; return $self; } @@ -41,6 +45,106 @@ sub timer { $self->{timestamp} += $self->{clockrate} / (1.0 / ($self->{ptime} / 1000)); # XXX might be fractional $self->{timestamp} > 0xffffffff and $self->{timestamp} -= Math::BigInt->new('0x100000000'); + + $self->{packet_count}++; + $self->{octet_count} += length($payload); +} + +sub input { + my ($self, $packet) = @_; + + my $now = time(); + + my ($vpxcc, $pt, $seq, $ts, $ssrc, $payload) = unpack("CCnNN a*", $packet); + $vpxcc == 0x80 or die; + $pt == 0 or die; + + my $remote = ($self->{other_ssrcs}->{$ssrc} //= { + ssrc => $ssrc, + packets_received => 0, + packets_lost => 0, + octets_received => 0, + roc => 0, + seq => $seq, # highest seen + jitter => 0, + queue_seq => $seq, # next expected seq -- to detect lost packets + queue => {}, + lost_last => 0, # since last SR/RR + received_last => 0, # since last SR/RR + dupes => 0, + }); + + $remote->{packets_received}++; + $remote->{received_last}++; + $remote->{octets_received} += length($payload); + + # normalize seq using roc + my $extseq = ($remote->{roc} << 16) | $seq; + my $diff = $extseq - $remote->{seq}; + if ($diff < -0x8000) { + $extseq += 0x10000; + } + elsif ($diff >= 0x8000) { + $extseq -= 0x10000; + } + + # update seq/roc if necessary -- highest seq seen + if ($extseq > $remote->{seq}) { + $remote->{seq} = $extseq; + $remote->{roc} = $extseq >> 16; + } + + # check dupes and packet loss + if ($extseq == $remote->{queue_seq}) { + # in sequence and expected + $remote->{queue_seq}++; + # see if we can pull packets out of the queue + while (exists($remote->{queue}->{$remote->{queue_seq}})) { + delete($remote->{queue}->{$remote->{queue_seq}}); + $remote->{queue_seq}++; + } + } + elsif ($extseq < $remote->{queue_seq}) { + $remote->{dupes}++; + } + else { + # ahead of sequence -- queue it up if not a dupe + if (exists($remote->{queue}->{$extseq})) { + $remote->{dupes}++; + } + else { + $remote->{queue}->{$extseq} = $packet; + # see if our "jitter buffer" is full and account for packet loss + my @seqs = keys(%{$remote->{queue}}); + if (@seqs >= 20) { + @seqs = sort {$a <=> $b} (@seqs); + # seek up to the lowest seq in buffer and count each missing + # seq as a lost packet + my $min = $seqs[0]; + $remote->{lost_since} += $min - $remote->{queue_seq}; + $remote->{packets_lost} += $min - $remote->{queue_seq}; + # now unqueue what we have as much as we can + $remote->{queue_seq} = $min; + while (my $qseq = shift(@seqs)) { + $qseq != $remote->{queue_seq} and last; + delete($remote->{queue}->{$qseq}); + $remote->{queue_seq}++; + } + } + } + + } + + # calc jitter + if ($remote->{last_ts} && $remote->{last_seq}) { + my $lt = Math::BigFloat->new($remote->{last_ts}); + $lt->bsub(Math::BigFloat->new($now)); + $lt->bmul($self->{clockrate}); + my $diff = $lt->bstr() - ($remote->{last_seq} - $extseq); + $remote->{jitter} = $remote->{jitter} + (abs($diff) - $remote->{jitter}) / 16; + } + $remote->{last_ts} = $now; + $remote->{last_seq} = $extseq; } 1; diff --git a/perl/NGCP/Rtpclient/SDP.pm b/perl/NGCP/Rtpclient/SDP.pm index 0c377541a..e9b19253f 100644 --- a/perl/NGCP/Rtpclient/SDP.pm +++ b/perl/NGCP/Rtpclient/SDP.pm @@ -233,4 +233,13 @@ sub endpoint { die; } +sub rtcp_endpoint { + my ($self) = @_; + my $conn = $self->rtcp_connection(); + my $port = $self->rtcp_port(); + $conn->{family} == &AF_INET and return pack_sockaddr_in($port, inet_aton($conn->{address})); + $conn->{family} == &AF_INET6 and return pack_sockaddr_in6($port, inet_pton(&AF_INET6, $conn->{address})); + die; +} + 1; diff --git a/perl/NGCP/Rtpengine/Test.pm b/perl/NGCP/Rtpengine/Test.pm index 6a4ac46ab..699a1faa4 100644 --- a/perl/NGCP/Rtpengine/Test.pm +++ b/perl/NGCP/Rtpengine/Test.pm @@ -16,6 +16,7 @@ use NGCP::Rtpclient::SDP; use NGCP::Rtpclient::ICE; use NGCP::Rtpclient::DTLS; use NGCP::Rtpclient::RTP; +use NGCP::Rtpclient::RTCP; use NGCP::Rtpengine; sub new { @@ -144,7 +145,7 @@ sub _new { # XXX support rtcp-mux and rtcp-less media for my $address (@addresses) { - $args{sockdomain} && $args{sockdomain} != $address->{sockdomain} and next; + ($args{sockdomain} && $args{sockdomain} != $address->{sockdomain}) and next; my $rtp = IO::Socket::IP->new(Type => &SOCK_DGRAM, Proto => 'udp', LocalHost => $address->{address}, LocalPort => $parent->{media_port}++) @@ -197,6 +198,7 @@ sub _new { $self->{media_receive_queues} = [[],[]]; # for each component $self->{media_packets_sent} = [0,0]; $self->{media_packets_received} = [0,0]; + $self->{client_components} = [undef,undef]; return $self; } @@ -318,7 +320,7 @@ sub _answered { $self->{ice} and $self->{ice}->decode($self->{remote_media}->decode_ice()); } -sub delete { +sub teardown { my ($self, %args) = @_; my $req = $self->_default_req_args('delete', 'from-tag' => $self->{tag}, %args); @@ -343,12 +345,16 @@ sub _input { $$input eq $exp or die; $self->{media_packets_received}->[$component]++; $$input = ''; + + $self->{client_components}->[$component] and + $self->{client_components}->[$component]->input($exp); } sub _timer { my ($self) = @_; $self->{ice} and $self->{ice}->timer(); $self->{rtp} and $self->{rtp}->timer(); + $self->{rtcp} and $self->{rtcp}->timer(); } sub _peer_addr_check { @@ -369,8 +375,15 @@ sub _peer_addr_check { sub start_rtp { my ($self) = @_; $self->{rtp} and die; - my $dest = $self->{remote_media}->endpoint(); $self->{rtp} = NGCP::Rtpclient::RTP->new($self) or die; + $self->{client_components}->[0] = $self->{rtp}; +} + +sub start_rtcp { + my ($self) = @_; + $self->{rtcp} and die; + $self->{rtcp} = NGCP::Rtpclient::RTCP->new($self, $self->{rtp}) or die; + $self->{client_components}->[1] = $self->{rtcp}; } sub stop { diff --git a/t/test-basic.pl b/t/test-basic.pl index 1ab294f7e..4a1c76b20 100755 --- a/t/test-basic.pl +++ b/t/test-basic.pl @@ -11,12 +11,17 @@ my ($a, $b) = $r->client_pair( {sockdomain => &Socket::AF_INET} ); -$r->timer_once(3, sub { $b->answer($a, ICE => 'remove'); $a->start_rtp(); }); +$r->timer_once(3, sub { + $b->answer($a, ICE => 'remove'); + $a->start_rtp(); + $a->start_rtcp(); + }); $r->timer_once(10, sub { $r->stop(); }); $a->offer($b, ICE => 'remove'); $b->start_rtp(); +$b->start_rtcp(); $r->run(); -$a->delete(); +$a->teardown(); diff --git a/t/test-unidirectional.pl b/t/test-unidirectional.pl index 426e28d4c..584c42b66 100755 --- a/t/test-unidirectional.pl +++ b/t/test-unidirectional.pl @@ -19,4 +19,4 @@ $b->start_rtp(); $r->run(); -$a->delete(); +$a->teardown();