You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rtpengine/perl/NGCP/Rtpengine/Test.pm

486 lines
12 KiB

package NGCP::Rtpengine::Test;
use strict;
use warnings;
use Socket;
use Socket6;
use IO::Socket;
use IO::Socket::IP;
use Bencode;
use Data::Dumper;
use Net::Interface;
use List::Util;
use IO::Multiplex;
use Time::HiRes qw(time);
use NGCP::Rtpclient::SDP;
use NGCP::Rtpclient::ICE;
use NGCP::Rtpclient::SDES;
use NGCP::Rtpclient::DTLS;
use NGCP::Rtpclient::RTP;
use NGCP::Rtpclient::RTCP;
use NGCP::Rtpengine;
sub new {
my ($class, %args) = @_;
my $self = {};
bless $self, $class;
srand(1234) if $ENV{RTPE_TEST_PSEUDO_RAND};
# detect local interfaces
my (@v4, @v6);
my @intfs = Net::Interface->interfaces();
if ($ENV{RTPE_TEST_V4_ADDRS}) {
@v4 = split(/ /, $ENV{RTPE_TEST_V4_ADDRS});
}
else {
@v4 = map {$_->address(&AF_INET)} @intfs;
@v4 = map {Socket6::inet_ntop(&AF_INET, $_)} @v4;
@v4 = grep {$_ !~ /^127\./} @v4;
}
@v4 = map { { address => $_, sockdomain => &AF_INET } } @v4;
@v4 or die("no IPv4 addresses found");
if ($ENV{RTPE_TEST_V6_ADDRS}) {
@v6 = split(/ /, $ENV{RTPE_TEST_V6_ADDRS});
}
else {
@v6 = map {$_->address(&AF_INET6)} @intfs;
@v6 = map {Socket6::inet_ntop(&AF_INET6, $_)} @v6;
@v6 = grep {$_ !~ /^::|^fe80:/} @v6;
}
@v6 = map { { address => $_, sockdomain => &AF_INET6 } } @v6;
@v6 or die("no IPv6 addresses found");
$self->{v4_addresses} = \@v4;
$self->{v6_addresses} = \@v6;
$self->{all_addresses} = [ @v4, @v6 ];
# supporting objects
$self->{mux} = IO::Multiplex->new();
$self->{mux}->set_callback_object($self);
$self->{media_port} = $args{media_port} // $ENV{RTPE_TEST_MEDIA_PORT} // 2000;
$self->{timers} = [];
$self->{clients} = [];
$self->{control} = NGCP::Rtpengine->new($args{host} // $ENV{RTPENGINE_HOST} // 'localhost',
$args{port} // $ENV{RTPENGINE_PORT} // 2223);
$self->{callid} = rand();
return $self;
};
sub client {
my ($self, %args) = @_;
my $cl = NGCP::Rtpengine::Test::Client->_new($self, %args);
push(@{$self->{clients}}, $cl);
return $cl;
}
sub client_pair {
my ($self, $args_A, $args_B) = @_;
my $a = $self->client(%$args_A);
my $b = $self->client(%$args_B);
$a->media_receiver($b);
$b->media_receiver($a);
return ($a, $b);
}
sub run {
my ($self) = @_;
$self->{mux}->loop();
}
sub stop {
my ($self) = @_;
$self->{mux}->endloop();
for my $cl (@{$self->{clients}}) {
$cl->stop();
}
}
sub timer_once {
my ($self, $delay, $sub) = @_;
push(@{$self->{timers}}, { sub => $sub, when => time() + $delay });
@{$self->{timers}} = sort {$a->{when} <=> $b->{when}} @{$self->{timers}};
}
sub mux_input {
my ($self, $mux, $fh, $input) = @_;
my $peer = $mux->udp_peer($fh);
for my $cl (@{$self->{clients}}) {
$$input eq '' and last;
$cl->_input($fh, $input, $peer);
}
$$input ne '' and die;
}
sub mux_timeout {
my ($self, $mux, $fh) = @_;
$mux->set_timeout($fh, 0.01);
my $now = time();
while (@{$self->{timers}} && $self->{timers}->[0]->{when} <= $now) {
my $t = shift(@{$self->{timers}});
$t->{sub}->();
}
for my $cl (@{$self->{clients}}) {
$cl->_timer();
}
}
package NGCP::Rtpengine::Test::Client;
use Socket;
use Data::Dumper;
sub _new {
my ($class, $parent, %args) = @_;
my $self = {};
bless $self, $class;
$self->{parent} = $parent;
$self->{tag} = rand();
$self->{codecs} = $args{codecs} // [qw(PCMU)];
# create media sockets
my @addresses = @{$parent->{all_addresses}};
@addresses = List::Util::shuffle @addresses;
my (@sockets, @rtp, @rtcp);
# XXX support rtcp-mux and rtcp-less media
for my $address (@addresses) {
($args{sockdomain} && $args{sockdomain} != $address->{sockdomain}) and next;
my $rtp = IO::Socket::IP->new(Type => &SOCK_DGRAM, Proto => 'udp',
LocalHost => $address->{address}, LocalPort => $parent->{media_port})
or die("$address->{address}:$parent->{media_port}");
$parent->{media_port}++;
my $rtcp = IO::Socket::IP->new(Type => &SOCK_DGRAM, Proto => 'udp',
LocalHost => $address->{address}, LocalPort => $parent->{media_port})
or die("$address->{address}:$parent->{media_port}");
$parent->{media_port}++;
push(@sockets, [$rtp, $rtcp]); # component 0 and 1
push(@rtp, $rtp);
push(@rtcp, $rtcp);
$parent->{mux}->add($rtp);
$parent->{mux}->add($rtcp);
$parent->{mux}->set_timeout($rtp, 0.01); # XXX overkill, only need this on one
}
@sockets or die;
$self->{sockets} = \@sockets;
$self->{rtp_sockets} = \@rtp;
$self->{rtcp_sockets} = \@rtcp;
$self->{main_sockets} = $sockets[0]; # for m= and o=
$self->{local_sdp} = NGCP::Rtpclient::SDP->new($self->{main_sockets}->[0]); # no global c=
$self->{component_peers} = []; # keep track of peer source addresses
# default protocol
my $proto = 'RTP/AVP';
$args{sdes} and $proto = 'RTP/SAVP';
$args{dtls} and $proto = 'UDP/TLS/RTP/SAVP';
$args{protocol} and $proto = $args{protocol};
$self->{local_media} = $self->{local_sdp}->add_media(NGCP::Rtpclient::SDP::Media->new(
$self->{main_sockets}->[0], $self->{main_sockets}->[1], # main rtp and rtcp
protocol => $proto,
codecs => $self->{codecs},
));
# XXX support multiple medias
if ($args{sdes}) {
$self->{sdes} = NGCP::Rtpclient::SDES->new(%{$args{sdes_args}});
}
if ($args{dtls}) {
$self->{dtls} = NGCP::Rtpclient::DTLS::Group->new($parent->{mux}, $self, [ \@rtp, \@rtcp ]);
$self->{local_media}->add_attrs($self->{dtls}->encode());
$self->{dtls}->accept(); # XXX support other modes
}
if ($args{ice}) {
$self->{ice} = NGCP::Rtpclient::ICE->new(2, 1); # 2 components, controlling XXX
my $pref = 65535;
for my $s (@sockets) {
$self->{ice}->add_candidate($pref--, 'host', @$s); # 2 components
}
$self->{local_media}->add_attrs($self->{ice}->encode());
}
$self->{media_receive_queues} = [[],[]]; # for each component
$self->{media_packets_sent} = [0,0];
$self->{media_packets_received} = [0,0];
$self->{client_components} = [undef,undef];
$self->{args} = \%args;
# copy args for the RTP client
$self->{rtp_args} = {};
for my $k (qw(packetloss)) {
exists($args{$k}) or next;
$self->{rtp_args}->{$k} = $args{$k};
}
return $self;
}
sub media_receiver {
my ($self, $other) = @_;
$self->{media_receiver} = $other;
}
sub media_to_receive {
my ($self, $component, $s) = @_;
push(@{$self->{media_receive_queues}->[$component]}, $s);
}
sub _packet_send {
my ($self, $component, $s) = @_;
my $local_socket = $self->{main_sockets}->[$component];
my $dest;
if (!$self->{ice}) {
if ($self->{remote_media}) {
$dest = $component == 0 ? $self->{remote_media}->endpoint()
: $self->{remote_media}->rtcp_endpoint();
}
else {
$dest = $self->{component_peers}->[$component]
}
}
else {
($local_socket, $dest) = $self->{ice}->get_send_component($component);
}
if ($self->{srtp}) {
$s = $self->{srtp}->encrypt($component, $s);
}
$local_socket->send($s, 0, $dest);
}
sub _media_send {
my ($self, $component, $s) = @_;
$self->_packet_send($component, $s);
$self->{media_packets_sent}->[$component]++;
$self->{media_receiver} and $self->{media_receiver}->media_to_receive($component, $s);
}
sub dtls_send {
my ($self, $component, $s) = @_;
$self->_packet_send($component, $s);
}
sub rtp_send {
my ($self, $s) = @_;
$self->_media_send(0, $s);
}
sub rtcp_send {
my ($self, $s) = @_;
$self->_media_send(1, $s);
}
sub _default_req_args {
my ($self, $cmd, %args) = @_;
my $req = { command => $cmd, 'call-id' => $self->{parent}->{callid} };
for my $cp (qw(sdp from-tag to-tag ICE transport-protocol address-family label direction codec)) {
$args{$cp} and $req->{$cp} = $args{$cp};
}
for my $cp (@{$args{flags}}) {
push(@{$req->{flags}}, $cp);
}
return $req;
}
sub offer {
my ($self, $other, %args) = @_;
$self->{sdes} and $self->{local_media}->add_attrs($self->{sdes}->encode());
my $sdp_body = $self->{local_sdp}->encode();
# XXX validate SDP
my $req = $self->_default_req_args('offer', 'from-tag' => $self->{tag}, sdp => $sdp_body, %args);
my $out = $self->{parent}->{control}->req($req);
$other->_offered($out);
}
sub _offered {
my ($self, $req) = @_;
my $sdp_body = $req->{sdp} or die;
$self->{remote_sdp_raw} = $sdp_body;
$self->{remote_sdp} = NGCP::Rtpclient::SDP->decode($sdp_body);
# XXX validate SDP
@{$self->{remote_sdp}->{medias}} == 1 or die;
$self->{remote_media} = $self->{remote_sdp}->{medias}->[0];
$self->{local_sdp}->codec_negotiate($self->{remote_sdp});
if ($self->{sdes}) {
$self->{sdes}->decode($self->{remote_media});
$self->{sdes}->offered();
$self->{srtp} = NGCP::Rtpclient::SRTP::Context->new($self->{sdes}->{suite});
}
$self->{ice} and $self->{ice}->decode($self->{remote_media}->decode_ice());
}
sub answer {
my ($self, $other, %args) = @_;
$self->{sdes} and $self->{local_media}->add_attrs($self->{sdes}->encode());
my $sdp_body = $self->{local_sdp}->encode();
# XXX validate SDP
my $req = $self->_default_req_args('answer', 'from-tag' => $other->{tag}, 'to-tag' => $self->{tag},
sdp => $sdp_body, %args);
my $out = $self->{parent}->{control}->req($req);
$other->_answered($out);
}
sub _answered {
my ($self, $req) = @_;
my $sdp_body = $req->{sdp} or die;
$self->{remote_sdp_raw} = $sdp_body;
$self->{remote_sdp} = NGCP::Rtpclient::SDP->decode($sdp_body);
# XXX validate SDP
@{$self->{remote_sdp}->{medias}} == 1 or die;
$self->{remote_media} = $self->{remote_sdp}->{medias}->[0];
$self->{local_sdp}->codec_negotiate($self->{remote_sdp});
if ($self->{sdes}) {
$self->{sdes}->decode($self->{remote_media});
$self->{sdes}->answered();
$self->{srtp} = NGCP::Rtpclient::SRTP::Context->new($self->{sdes}->{suite});
}
$self->{ice} and $self->{ice}->decode($self->{remote_media}->decode_ice());
}
sub teardown {
my ($self, %args) = @_;
my $req = $self->_default_req_args('delete', 'from-tag' => $self->{tag}, %args);
my $out = $self->{parent}->{control}->req($req);
if ($args{dump}) {
my $dumper = Data::Dumper->new([$out]);
$dumper->Sortkeys(1);
print($dumper->Dump);
}
return $out;
}
sub _input {
my ($self, $fh, $input, $peer) = @_;
my $component = $self->_peer_addr_check($fh, $peer);
$self->{dtls} and $self->{dtls}->input($fh, $input, $peer);
$self->{ice} and $self->{ice}->input($fh, $input, $peer);
$$input eq '' and return;
defined($component) or return; # not one of ours
# must be RTP or RTCP input
if (!$self->{args}->{no_data_check}) {
if ($self->{srtp}) {
$$input = $self->{srtp}->decrypt($component, $$input);
}
my $exp = shift(@{$self->{media_receive_queues}->[$component]}) or die;
$$input eq $exp or die unpack('H*', $$input) . ' ne ' . unpack('H*', $exp);
}
else {
@{$self->{media_receive_queues}->[$component]} = ();
}
$self->{media_packets_received}->[$component]++;
$self->{client_components}->[$component] and
$self->{client_components}->[$component]->input($$input);
$$input = '';
}
sub _timer {
my ($self) = @_;
$self->{ice} and $self->{ice}->timer();
$self->{rtp} and $self->{rtp}->timer();
$self->{rtcp} and $self->{rtcp}->timer();
}
sub _peer_addr_check {
my ($self, $fh, $peer) = @_;
for my $sockets (@{$self->{sockets}}) {
for my $component (0, 1) {
if ($fh == $sockets->[$component]) {
$self->{component_peers}->[$component] = $peer;
return $component;
}
}
}
return;
}
sub start_rtp {
my ($self) = @_;
$self->{rtp} and die;
my %args = %{$self->{rtp_args}};
my $send_codec = $self->{local_media}->send_codec();
$args{send_codec} = $send_codec;
$self->{rtp} = NGCP::Rtpclient::RTP->new($self, %args) or die;
$self->{client_components}->[0] = $self->{rtp};
}
sub start_rtcp {
my ($self) = @_;
$self->{rtcp} and die;
$self->{rtcp} = NGCP::Rtpclient::RTCP->new($self, $self->{rtp}) or die;
$self->{client_components}->[1] = $self->{rtcp};
}
sub stop {
my ($self) = @_;
print("media packets sent: @{$self->{media_packets_sent}}\n");
print("media packets received: @{$self->{media_packets_received}}\n");
my @queues = map {scalar(@$_)} @{$self->{media_receive_queues}};
print("media packets outstanding: @queues\n");
}
sub remote_codecs {
my ($self) = @_;
my $list = $self->{remote_media}->{codec_list};
return join(',', map {"$_->{name}/$_->{clockrate}/$_->{channels}"} @$list);
}
sub send_codecs {
my ($self) = @_;
my $list = $self->{local_media}->{codecs_send};
return join(',', map {"$_->{name}/$_->{clockrate}/$_->{channels}"} @$list);
}
1;