mirror of https://github.com/sipwise/sems.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1658 lines
64 KiB
1658 lines
64 KiB
#!/usr/bin/env python3
|
|
|
|
import re as _re
|
|
import struct as _struct
|
|
import time
|
|
import sems_tester
|
|
import socket
|
|
|
|
# RTP media ports — outside SEMS relay range (10000-20000) to avoid conflicts
|
|
_RTP_PORT_A = 9002 # leg A media port (advertised in SDP_A)
|
|
_RTP_PORT_B = 9004 # leg B media port (advertised in SDP_B)
|
|
_RTP_PORT_A2 = 9006 # leg A media port for re-INVITE (new origin version)
|
|
_RTP_PORT_B2 = 9008 # leg B media port for re-INVITE
|
|
|
|
# SDP used by leg A in INVITE (plain \n — sendSIP converts to \r\n)
|
|
_SDP_A = (
|
|
"v=0\n"
|
|
"o=- 1000000001 1000000001 IN IP4 127.0.0.1\n"
|
|
"s=-\n"
|
|
"c=IN IP4 127.0.0.1\n"
|
|
"t=0 0\n"
|
|
f"m=audio {_RTP_PORT_A} RTP/AVP 0 8\n"
|
|
"a=rtpmap:0 PCMU/8000\n"
|
|
"a=rtpmap:8 PCMA/8000\n"
|
|
"a=sendrecv\n"
|
|
)
|
|
|
|
# SDP used by leg B in 200 OK
|
|
_SDP_B = (
|
|
"v=0\n"
|
|
"o=- 2000000001 2000000001 IN IP4 127.0.0.1\n"
|
|
"s=-\n"
|
|
"c=IN IP4 127.0.0.1\n"
|
|
"t=0 0\n"
|
|
f"m=audio {_RTP_PORT_B} RTP/AVP 0 8\n"
|
|
"a=rtpmap:0 PCMU/8000\n"
|
|
"a=rtpmap:8 PCMA/8000\n"
|
|
"a=sendrecv\n"
|
|
)
|
|
|
|
|
|
def _make_invite(branch, call_id, cseq=1, src_port=57701):
|
|
sdp = _SDP_A.replace("\n", "\r\n")
|
|
return (
|
|
f"INVITE sip:bob@voip.sipwise.local SIP/2.0\n"
|
|
f"Via: SIP/2.0/UDP 127.0.0.1:{src_port};branch={branch};rport\n"
|
|
f"Max-Forwards: 70\n"
|
|
f"To: <sip:bob@voip.sipwise.local>\n"
|
|
f"From: Alice <sip:alice@voip.sipwise.local>;tag=a1b2c3d4\n"
|
|
f"Call-ID: {call_id}\n"
|
|
f"CSeq: {cseq} INVITE\n"
|
|
f"Contact: <sip:alice@127.0.0.1:{src_port}>\n"
|
|
f"Content-Type: application/sdp\n"
|
|
f"Content-Length: {len(sdp)}\n"
|
|
"\n"
|
|
+ _SDP_A
|
|
)
|
|
|
|
|
|
def _make_cancel(branch, call_id, cseq=1, src_port=57701):
|
|
return (
|
|
f"CANCEL sip:bob@voip.sipwise.local SIP/2.0\n"
|
|
f"Via: SIP/2.0/UDP 127.0.0.1:{src_port};branch={branch};rport\n"
|
|
f"Max-Forwards: 70\n"
|
|
f"To: <sip:bob@voip.sipwise.local>\n"
|
|
f"From: Alice <sip:alice@voip.sipwise.local>;tag=a1b2c3d4\n"
|
|
f"Call-ID: {call_id}\n"
|
|
f"CSeq: {cseq} CANCEL\n"
|
|
f"Content-Length: 0\n"
|
|
"\n"
|
|
)
|
|
|
|
|
|
def _make_ack(branch, call_id, to_tag, cseq=1, src_port=57701):
|
|
return (
|
|
f"ACK sip:bob@voip.sipwise.local SIP/2.0\n"
|
|
f"Via: SIP/2.0/UDP 127.0.0.1:{src_port};branch={branch};rport\n"
|
|
f"Max-Forwards: 70\n"
|
|
f"To: <sip:bob@voip.sipwise.local>;tag={to_tag}\n"
|
|
f"From: Alice <sip:alice@voip.sipwise.local>;tag=a1b2c3d4\n"
|
|
f"Call-ID: {call_id}\n"
|
|
f"CSeq: {cseq} ACK\n"
|
|
f"Content-Length: 0\n"
|
|
"\n"
|
|
)
|
|
|
|
|
|
def _make_bye(branch, call_id, to_tag, cseq=2, src_port=57701):
|
|
return (
|
|
f"BYE sip:bob@voip.sipwise.local SIP/2.0\n"
|
|
f"Via: SIP/2.0/UDP 127.0.0.1:{src_port};branch={branch};rport\n"
|
|
f"Max-Forwards: 70\n"
|
|
f"To: <sip:bob@voip.sipwise.local>;tag={to_tag}\n"
|
|
f"From: Alice <sip:alice@voip.sipwise.local>;tag=a1b2c3d4\n"
|
|
f"Call-ID: {call_id}\n"
|
|
f"CSeq: {cseq} BYE\n"
|
|
f"Content-Length: 0\n"
|
|
"\n"
|
|
)
|
|
|
|
|
|
def _hdr(msg, name):
|
|
"""Extract a header value from a received SIP message."""
|
|
m = _re.search(name + r": ([^\r\n]+)", msg)
|
|
return m.group(1) if m else ""
|
|
|
|
|
|
def _make_200_ok_invite(invite_msg, sdp=None, contact="<sip:bob@127.0.0.1:5070>"):
|
|
"""Build 200 OK for a received INVITE/re-INVITE.
|
|
sdp=None uses _SDP_B; contact defaults to leg-B's address."""
|
|
via = _hdr(invite_msg, "Via")
|
|
to = _hdr(invite_msg, "To")
|
|
from_ = _hdr(invite_msg, "From")
|
|
call_id = _hdr(invite_msg, "Call-ID")
|
|
cseq = _hdr(invite_msg, "CSeq")
|
|
to_tag = to + ";tag=uas-tag-001" if ";tag=" not in to else to
|
|
use_sdp = sdp if sdp is not None else _SDP_B
|
|
sdp_cr = use_sdp.replace("\n", "\r\n")
|
|
return (
|
|
f"SIP/2.0 200 OK\n"
|
|
f"Via: {via}\n"
|
|
f"To: {to_tag}\n"
|
|
f"From: {from_}\n"
|
|
f"Call-ID: {call_id}\n"
|
|
f"CSeq: {cseq}\n"
|
|
f"Contact: {contact}\n"
|
|
f"Content-Type: application/sdp\n"
|
|
f"Content-Length: {len(sdp_cr)}\n"
|
|
"\n"
|
|
+ use_sdp
|
|
)
|
|
|
|
|
|
def _make_provisional(code, reason, invite_msg, sdp=None):
|
|
"""Build 1xx response (UAS side), optionally with SDP body (e.g. 183)."""
|
|
via = _hdr(invite_msg, "Via")
|
|
to = _hdr(invite_msg, "To")
|
|
from_ = _hdr(invite_msg, "From")
|
|
call_id = _hdr(invite_msg, "Call-ID")
|
|
cseq = _hdr(invite_msg, "CSeq")
|
|
if sdp is not None:
|
|
sdp_cr = sdp.replace("\n", "\r\n")
|
|
return (
|
|
f"SIP/2.0 {code} {reason}\n"
|
|
f"Via: {via}\n"
|
|
f"To: {to};tag=uas-tag-001\n"
|
|
f"From: {from_}\n"
|
|
f"Call-ID: {call_id}\n"
|
|
f"CSeq: {cseq}\n"
|
|
f"Content-Type: application/sdp\n"
|
|
f"Content-Length: {len(sdp_cr)}\n"
|
|
"\n"
|
|
+ sdp
|
|
)
|
|
return (
|
|
f"SIP/2.0 {code} {reason}\n"
|
|
f"Via: {via}\n"
|
|
f"To: {to};tag=uas-tag-001\n"
|
|
f"From: {from_}\n"
|
|
f"Call-ID: {call_id}\n"
|
|
f"CSeq: {cseq}\n"
|
|
f"Content-Length: 0\n"
|
|
"\n"
|
|
)
|
|
|
|
|
|
def _make_final_uas(code, reason, invite_msg):
|
|
"""Build non-2xx final response from UAS (486, 404, 487, …)."""
|
|
via = _hdr(invite_msg, "Via")
|
|
to = _hdr(invite_msg, "To")
|
|
from_ = _hdr(invite_msg, "From")
|
|
call_id = _hdr(invite_msg, "Call-ID")
|
|
cseq = _hdr(invite_msg, "CSeq")
|
|
return (
|
|
f"SIP/2.0 {code} {reason}\n"
|
|
f"Via: {via}\n"
|
|
f"To: {to};tag=uas-tag-001\n"
|
|
f"From: {from_}\n"
|
|
f"Call-ID: {call_id}\n"
|
|
f"CSeq: {cseq}\n"
|
|
f"Content-Length: 0\n"
|
|
"\n"
|
|
)
|
|
|
|
|
|
def _make_200_ok_for(request_msg):
|
|
"""Build 200 OK mirroring the request headers (for BYE/CANCEL responses)."""
|
|
via = _hdr(request_msg, "Via")
|
|
to = _hdr(request_msg, "To")
|
|
from_ = _hdr(request_msg, "From")
|
|
call_id = _hdr(request_msg, "Call-ID")
|
|
cseq = _hdr(request_msg, "CSeq")
|
|
return (
|
|
f"SIP/2.0 200 OK\n"
|
|
f"Via: {via}\n"
|
|
f"To: {to}\n"
|
|
f"From: {from_}\n"
|
|
f"Call-ID: {call_id}\n"
|
|
f"CSeq: {cseq}\n"
|
|
f"Content-Length: 0\n"
|
|
"\n"
|
|
)
|
|
|
|
|
|
# SDPs for hold (re-INVITE)
|
|
_SDP_SENDONLY_A = (
|
|
"v=0\n"
|
|
"o=- 1000000002 1000000002 IN IP4 127.0.0.1\n"
|
|
"s=-\n"
|
|
"c=IN IP4 127.0.0.1\n"
|
|
"t=0 0\n"
|
|
f"m=audio {_RTP_PORT_A2} RTP/AVP 0 8\n"
|
|
"a=rtpmap:0 PCMU/8000\n"
|
|
"a=sendonly\n"
|
|
)
|
|
_SDP_RECVONLY_B = (
|
|
"v=0\n"
|
|
"o=- 2000000002 2000000002 IN IP4 127.0.0.1\n"
|
|
"s=-\n"
|
|
"c=IN IP4 127.0.0.1\n"
|
|
"t=0 0\n"
|
|
f"m=audio {_RTP_PORT_B2} RTP/AVP 0 8\n"
|
|
"a=rtpmap:0 PCMU/8000\n"
|
|
"a=recvonly\n"
|
|
)
|
|
_SDP_SENDONLY_B = (
|
|
"v=0\n"
|
|
"o=- 2000000002 2000000002 IN IP4 127.0.0.1\n"
|
|
"s=-\n"
|
|
"c=IN IP4 127.0.0.1\n"
|
|
"t=0 0\n"
|
|
f"m=audio {_RTP_PORT_B2} RTP/AVP 0 8\n"
|
|
"a=rtpmap:0 PCMU/8000\n"
|
|
"a=sendonly\n"
|
|
)
|
|
_SDP_RECVONLY_A = (
|
|
"v=0\n"
|
|
"o=- 1000000002 1000000002 IN IP4 127.0.0.1\n"
|
|
"s=-\n"
|
|
"c=IN IP4 127.0.0.1\n"
|
|
"t=0 0\n"
|
|
f"m=audio {_RTP_PORT_A2} RTP/AVP 0 8\n"
|
|
"a=rtpmap:0 PCMU/8000\n"
|
|
"a=recvonly\n"
|
|
)
|
|
# Unhold SDPs (sendrecv, version incremented)
|
|
_SDP_SENDRECV_A2 = (
|
|
"v=0\n"
|
|
"o=- 1000000003 1000000003 IN IP4 127.0.0.1\n"
|
|
"s=-\n"
|
|
"c=IN IP4 127.0.0.1\n"
|
|
"t=0 0\n"
|
|
f"m=audio {_RTP_PORT_A2} RTP/AVP 0 8\n"
|
|
"a=rtpmap:0 PCMU/8000\n"
|
|
"a=sendrecv\n"
|
|
)
|
|
_SDP_SENDRECV_B2 = (
|
|
"v=0\n"
|
|
"o=- 2000000003 2000000003 IN IP4 127.0.0.1\n"
|
|
"s=-\n"
|
|
"c=IN IP4 127.0.0.1\n"
|
|
"t=0 0\n"
|
|
f"m=audio {_RTP_PORT_B2} RTP/AVP 0 8\n"
|
|
"a=rtpmap:0 PCMU/8000\n"
|
|
"a=sendrecv\n"
|
|
)
|
|
|
|
|
|
def _make_reinvite(branch, call_id, to_tag, cseq, sdp, src_port=57701):
|
|
"""Build a re-INVITE from leg A within an established dialog."""
|
|
sdp_cr = sdp.replace("\n", "\r\n")
|
|
return (
|
|
f"INVITE sip:bob@voip.sipwise.local SIP/2.0\n"
|
|
f"Via: SIP/2.0/UDP 127.0.0.1:{src_port};branch={branch};rport\n"
|
|
f"Max-Forwards: 70\n"
|
|
f"To: <sip:bob@voip.sipwise.local>;tag={to_tag}\n"
|
|
f"From: Alice <sip:alice@voip.sipwise.local>;tag=a1b2c3d4\n"
|
|
f"Call-ID: {call_id}\n"
|
|
f"CSeq: {cseq} INVITE\n"
|
|
f"Contact: <sip:alice@127.0.0.1:{src_port}>\n"
|
|
f"Content-Type: application/sdp\n"
|
|
f"Content-Length: {len(sdp_cr)}\n"
|
|
"\n"
|
|
+ sdp
|
|
)
|
|
|
|
|
|
def _to_tag(msg):
|
|
m = _re.search(r"To:[^\r\n]*tag=([A-Za-z0-9\-_\.]+)", msg)
|
|
return m.group(1) if m else ""
|
|
|
|
|
|
def _sdp_media_port(msg):
|
|
"""Return the first m=audio port from a SIP message body."""
|
|
m = _re.search(r"m=audio (\d+)", msg)
|
|
return int(m.group(1)) if m else None
|
|
|
|
|
|
def _make_rtp(seq=1, ts=0, ssrc=1, payload_type=0):
|
|
"""Build a minimal 12-byte RTP header packet."""
|
|
return _struct.pack("!BBHII",
|
|
0x80, # V=2, P=0, X=0, CC=0
|
|
payload_type,
|
|
seq,
|
|
ts,
|
|
ssrc,
|
|
)
|
|
|
|
|
|
class TestB2B(sems_tester.TestCase):
|
|
_config_base = "b2b"
|
|
_sip_port = 5062
|
|
_uas_port = 5070
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
cls._uas_sock = cls.makeUASSocket(cls._uas_port)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls._uas_sock.close()
|
|
super().tearDownClass()
|
|
|
|
def setUp(self):
|
|
"""Non-blocking drain to remove stale datagrams that arrived between
|
|
tearDown of the previous test and the start of this one. Prevents
|
|
recvB2BINVITE from picking up a retransmission from a previous dialog
|
|
and returning a wrong b2b_cid for the current test."""
|
|
self._uas_sock.settimeout(0)
|
|
try:
|
|
while True:
|
|
self._uas_sock.recv(4096)
|
|
except BlockingIOError:
|
|
pass
|
|
finally:
|
|
self._uas_sock.settimeout(3)
|
|
|
|
def tearDown(self):
|
|
"""Drain leftover messages from the shared UAS socket between tests.
|
|
Uses 1.5*T1 timeout to catch SIP retransmissions that SEMS may still
|
|
be generating on a loaded CI host when the next test starts."""
|
|
self._uas_sock.settimeout(1.5)
|
|
try:
|
|
while True:
|
|
try:
|
|
self._uas_sock.recv(4096)
|
|
except TimeoutError:
|
|
break
|
|
finally:
|
|
self._uas_sock.settimeout(3)
|
|
|
|
def recvSIPSkipRetrans(self, sock, skip_startswith, b2b_cid=None):
|
|
"""Receive SIP, discarding messages that start with skip_startswith or
|
|
(when b2b_cid is given) belong to a different B2B dialog."""
|
|
for _ in range(10):
|
|
msg = self.recvSIP(sock)
|
|
if msg.startswith(skip_startswith):
|
|
continue
|
|
if b2b_cid is not None and b2b_cid not in msg:
|
|
continue
|
|
return msg
|
|
raise AssertionError("No matching SIP message in recvSIPSkipRetrans")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 1: Full call — leg A sends BYE
|
|
# ------------------------------------------------------------------
|
|
def testBasicCallLegAHangsUp(self):
|
|
src_port = 57701
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-001"
|
|
call_id = "test-b2b-call-001@127.0.0.1"
|
|
|
|
# Leg A → INVITE → SEMS
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
|
|
# SEMS → 100 Trying → Leg A
|
|
trying = self.recvSIP(leg_a)
|
|
self.assertSIP(trying,
|
|
"^SIP/2\\.0 100 [^\r]+\n"
|
|
"Via: SIP/2\\.0/UDP 127\\.0\\.0\\.1:57701;branch=z9hG4bK-b2b-001[^\r]*\n"
|
|
"To: <sip:bob@voip\\.sipwise\\.local>\n"
|
|
"From: Alice <sip:alice@voip\\.sipwise\\.local>;tag=a1b2c3d4\n"
|
|
"Call-ID: test-b2b-call-001@127\\.0\\.0\\.1\n"
|
|
"CSeq: 1 INVITE\n"
|
|
"Content-Length: 0\n"
|
|
)
|
|
|
|
# SEMS → INVITE (B2B) → Leg B
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
self.assertSIP(b2b_invite,
|
|
"^INVITE sip:bob@voip\\.sipwise\\.local SIP/2\\.0\n"
|
|
".*"
|
|
"Content-Type: application/sdp\n"
|
|
".*"
|
|
"m=audio \\d+ RTP/AVP"
|
|
)
|
|
|
|
# Leg B → 180 Ringing → SEMS (also stops INVITE retransmissions)
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# SEMS → 180 Ringing → Leg A
|
|
ringing = self.recvSIP(leg_a)
|
|
self.assertSIP(ringing,
|
|
"^SIP/2\\.0 180 Ringing\n"
|
|
"Via: SIP/2\\.0/UDP 127\\.0\\.0\\.1:57701;branch=z9hG4bK-b2b-001[^\r]*\n"
|
|
"To: <sip:bob@voip\\.sipwise\\.local>;tag=[A-Za-z0-9\\-_\\.]+\n"
|
|
"From: Alice <sip:alice@voip\\.sipwise\\.local>;tag=a1b2c3d4\n"
|
|
"Call-ID: test-b2b-call-001@127\\.0\\.0\\.1\n"
|
|
"CSeq: 1 INVITE\n"
|
|
".*Content-Length: 0\n"
|
|
)
|
|
|
|
# Leg B → 200 OK (SDP) → SEMS
|
|
self.sendToSIP(_make_200_ok_invite(b2b_invite), sems_addr, self._uas_sock)
|
|
|
|
# SEMS → 200 OK (SDP) → Leg A
|
|
ok_a = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_a,
|
|
"^SIP/2\\.0 200 OK\n"
|
|
"Via: SIP/2\\.0/UDP 127\\.0\\.0\\.1:57701;branch=z9hG4bK-b2b-001[^\r]*\n"
|
|
"To: <sip:bob@voip\\.sipwise\\.local>;tag=[A-Za-z0-9\\-_\\.]+\n"
|
|
"From: Alice <sip:alice@voip\\.sipwise\\.local>;tag=a1b2c3d4\n"
|
|
"Call-ID: test-b2b-call-001@127\\.0\\.0\\.1\n"
|
|
"CSeq: 1 INVITE\n"
|
|
".*"
|
|
"Content-Type: application/sdp\n"
|
|
".*"
|
|
"m=audio \\d+ RTP/AVP"
|
|
)
|
|
|
|
to_tag = _to_tag(ok_a)
|
|
|
|
# Leg A → ACK → SEMS
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-001-ack", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
|
|
# SEMS → ACK → Leg B
|
|
ack_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.assertSIP(ack_b,
|
|
"^ACK sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ ACK\n"
|
|
)
|
|
|
|
# Leg A → BYE → SEMS
|
|
self.sendSIP(_make_bye("z9hG4bK-b2b-001-bye", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
|
|
# SEMS → BYE → Leg B
|
|
bye_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.assertSIP(bye_b,
|
|
"^BYE sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ BYE\n"
|
|
)
|
|
|
|
# Leg B → 200 OK → SEMS
|
|
self.sendToSIP(_make_200_ok_for(bye_b), sems_addr, self._uas_sock)
|
|
|
|
# SEMS → 200 OK → Leg A
|
|
ok_bye = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_bye,
|
|
"^SIP/2\\.0 200 OK\n"
|
|
"Via: SIP/2\\.0/UDP 127\\.0\\.0\\.1:57701;branch=z9hG4bK-b2b-001-bye[^\r]*\n"
|
|
".*"
|
|
"CSeq: 2 BYE\n"
|
|
)
|
|
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 2: Full call with media flow — leg B sends BYE
|
|
# ------------------------------------------------------------------
|
|
def testBasicCallLegBHangsUp(self):
|
|
src_port = 57702
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-002"
|
|
call_id = "test-b2b-call-002@127.0.0.1"
|
|
|
|
# Bind media sockets before INVITE so their Unix paths exist
|
|
rtp_a = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
rtp_a.settimeout(3)
|
|
rtp_a.bind(("127.0.0.1", _RTP_PORT_A))
|
|
|
|
rtp_b = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
rtp_b.settimeout(3)
|
|
rtp_b.bind(("127.0.0.1", _RTP_PORT_B))
|
|
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # drain 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # drain 180
|
|
|
|
self.sendToSIP(_make_200_ok_invite(b2b_invite), sems_addr, self._uas_sock)
|
|
ok_a = self.recvSIP(leg_a)
|
|
to_tag = _to_tag(ok_a)
|
|
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-002-ack", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK on leg B
|
|
|
|
# ---- Media flow (RTP_Direct: SEMS passes SDP through unchanged) ----
|
|
# In b2b_invite SEMS forwarded leg A's SDP → leg B sends RTP to _RTP_PORT_A
|
|
# In ok_a SEMS forwarded leg B's SDP → leg A sends RTP to _RTP_PORT_B
|
|
dest_from_b = _sdp_media_port(b2b_invite) # port leg B should send to
|
|
dest_from_a = _sdp_media_port(ok_a) # port leg A should send to
|
|
|
|
self.assertEqual(dest_from_b, _RTP_PORT_A)
|
|
self.assertEqual(dest_from_a, _RTP_PORT_B)
|
|
|
|
# Leg B → Leg A
|
|
pkt_b2a = _make_rtp(seq=1, ts=160, ssrc=0xB0B0B0B0)
|
|
rtp_b.sendto(pkt_b2a, ("127.0.0.1", dest_from_b))
|
|
self.assertEqual(rtp_a.recv(256), pkt_b2a)
|
|
|
|
# Leg A → Leg B
|
|
pkt_a2b = _make_rtp(seq=1, ts=160, ssrc=0xA0A0A0A0)
|
|
rtp_a.sendto(pkt_a2b, ("127.0.0.1", dest_from_a))
|
|
self.assertEqual(rtp_b.recv(256), pkt_a2b)
|
|
|
|
rtp_a.close()
|
|
rtp_b.close()
|
|
|
|
# --- Leg B sends BYE ---
|
|
cid_b2b = _hdr(b2b_invite, "Call-ID")
|
|
from_b2b = _hdr(b2b_invite, "From")
|
|
bye_from_b = (
|
|
f"BYE sip:alice@voip.sipwise.local SIP/2.0\n"
|
|
f"Via: SIP/2.0/UDP 127.0.0.1:5070;branch=z9hG4bK-uas-bye-002;rport\n"
|
|
f"Max-Forwards: 70\n"
|
|
f"To: {from_b2b}\n"
|
|
f"From: <sip:bob@voip.sipwise.local>;tag=uas-tag-001\n"
|
|
f"Call-ID: {cid_b2b}\n"
|
|
f"CSeq: 10 BYE\n"
|
|
f"Content-Length: 0\n"
|
|
"\n"
|
|
)
|
|
self.sendToSIP(bye_from_b, sems_addr, self._uas_sock)
|
|
|
|
# SEMS → BYE → Leg A
|
|
bye_a = self.recvSIP(leg_a)
|
|
self.assertSIP(bye_a,
|
|
"^BYE sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ BYE\n"
|
|
".*"
|
|
"Call-ID: test-b2b-call-002@127\\.0\\.0\\.1\n"
|
|
)
|
|
|
|
# Leg A → 200 OK for BYE
|
|
self.sendSIP(_make_200_ok_for(bye_a), leg_a)
|
|
|
|
# SEMS → 200 OK → Leg B
|
|
ok_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.assertSIP(ok_b,
|
|
"^SIP/2\\.0 200 OK\n"
|
|
".*"
|
|
"CSeq: 10 BYE\n"
|
|
)
|
|
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 3: Early CANCEL after 180 Ringing
|
|
# ------------------------------------------------------------------
|
|
def testEarlyCancel(self):
|
|
src_port = 57703
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-003"
|
|
call_id = "test-b2b-call-003@127.0.0.1"
|
|
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 180
|
|
|
|
# Leg A → CANCEL
|
|
self.sendSIP(_make_cancel(branch, call_id, src_port=src_port), leg_a)
|
|
|
|
# SEMS → 200 OK for CANCEL → Leg A
|
|
ok_cancel = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_cancel,
|
|
"^SIP/2\\.0 200 OK\n"
|
|
".*"
|
|
"CSeq: 1 CANCEL\n"
|
|
)
|
|
|
|
# SEMS → CANCEL → Leg B
|
|
cancel_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.assertSIP(cancel_b,
|
|
"^CANCEL sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ CANCEL\n"
|
|
)
|
|
|
|
# Leg B → 200 OK for CANCEL
|
|
self.sendToSIP(_make_200_ok_for(cancel_b), sems_addr, self._uas_sock)
|
|
|
|
# Leg B → 487 Request Terminated
|
|
self.sendToSIP(_make_final_uas(487, "Request Terminated", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# SEMS → ACK → Leg B (for 487)
|
|
ack_b = self.recvSIPSkipRetrans(self._uas_sock, "INVITE", b2b_cid)
|
|
self.assertSIP(ack_b,
|
|
"^ACK sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ ACK\n"
|
|
)
|
|
|
|
# SEMS → 487 → Leg A
|
|
err_a = self.recvSIP(leg_a)
|
|
self.assertSIP(err_a,
|
|
"^SIP/2\\.0 487 [^\r]+\n"
|
|
".*"
|
|
"Call-ID: test-b2b-call-003@127\\.0\\.0\\.1\n"
|
|
".*"
|
|
"CSeq: 1 INVITE\n"
|
|
)
|
|
|
|
# Leg A → ACK (for 487)
|
|
self.sendSIP(
|
|
_make_ack("z9hG4bK-b2b-003-ack", call_id, _to_tag(err_a),
|
|
src_port=src_port), leg_a)
|
|
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 4: Callee returns 486 Busy Here
|
|
# ------------------------------------------------------------------
|
|
def testCalleeBusy(self):
|
|
src_port = 57704
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-004"
|
|
call_id = "test-b2b-call-004@127.0.0.1"
|
|
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
|
|
# Send 100 Trying first to stop INVITE retransmissions
|
|
self.sendToSIP(_make_provisional(100, "Trying", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# Leg B → 486 Busy Here
|
|
self.sendToSIP(_make_final_uas(486, "Busy Here", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# SEMS → ACK → Leg B (skip any retransmitted INVITE)
|
|
ack_b = self.recvSIPSkipRetrans(self._uas_sock, "INVITE", b2b_cid)
|
|
self.assertSIP(ack_b,
|
|
"^ACK sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ ACK\n"
|
|
)
|
|
|
|
# SEMS → 486 → Leg A
|
|
busy_a = self.recvSIP(leg_a)
|
|
self.assertSIP(busy_a,
|
|
"^SIP/2\\.0 486 Busy Here\n"
|
|
".*"
|
|
"Call-ID: test-b2b-call-004@127\\.0\\.0\\.1\n"
|
|
".*"
|
|
"CSeq: 1 INVITE\n"
|
|
)
|
|
|
|
# Leg A → ACK
|
|
self.sendSIP(
|
|
_make_ack("z9hG4bK-b2b-004-ack", call_id, _to_tag(busy_a),
|
|
src_port=src_port), leg_a)
|
|
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 5: Callee returns 404 Not Found
|
|
# ------------------------------------------------------------------
|
|
def testCalleeNotFound(self):
|
|
src_port = 57705
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-005"
|
|
call_id = "test-b2b-call-005@127.0.0.1"
|
|
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
|
|
# Send 100 Trying first to stop INVITE retransmissions
|
|
self.sendToSIP(_make_provisional(100, "Trying", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# Leg B → 404 Not Found
|
|
self.sendToSIP(_make_final_uas(404, "Not Found", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# SEMS → ACK → Leg B (skip any retransmitted INVITE)
|
|
ack_b = self.recvSIPSkipRetrans(self._uas_sock, "INVITE", b2b_cid)
|
|
self.assertSIP(ack_b,
|
|
"^ACK sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ ACK\n"
|
|
)
|
|
|
|
# SEMS → 404 → Leg A
|
|
notfound_a = self.recvSIP(leg_a)
|
|
self.assertSIP(notfound_a,
|
|
"^SIP/2\\.0 404 Not Found\n"
|
|
".*"
|
|
"Call-ID: test-b2b-call-005@127\\.0\\.0\\.1\n"
|
|
".*"
|
|
"CSeq: 1 INVITE\n"
|
|
)
|
|
|
|
# Leg A → ACK
|
|
self.sendSIP(
|
|
_make_ack("z9hG4bK-b2b-005-ack", call_id, _to_tag(notfound_a),
|
|
src_port=src_port), leg_a)
|
|
|
|
leg_a.close()
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 6: CANCEL before any provisional (no 180 from callee)
|
|
# ------------------------------------------------------------------
|
|
def testEarlyCancelBeforeRinging(self):
|
|
src_port = 57706
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-006"
|
|
call_id = "test-b2b-call-006@127.0.0.1"
|
|
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
|
|
# Leg B acknowledges INVITE (stops retransmissions) but doesn't ring yet
|
|
self.sendToSIP(_make_provisional(100, "Trying", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# Drain any buffered INVITE retransmissions and give SEMS time to process
|
|
# the 100 Trying before CANCEL arrives. Without this, on a loaded system
|
|
# SEMS may see CANCEL before 100 Trying and skip forwarding CANCEL to leg B
|
|
# (RFC 3261 §9.1 forbids CANCEL before any provisional is received).
|
|
self._uas_sock.settimeout(0.1)
|
|
try:
|
|
while True:
|
|
self.recvSIP(self._uas_sock)
|
|
except TimeoutError:
|
|
pass
|
|
finally:
|
|
self._uas_sock.settimeout(3)
|
|
|
|
# Leg A → CANCEL (before any 180)
|
|
self.sendSIP(_make_cancel(branch, call_id, src_port=src_port), leg_a)
|
|
|
|
# SEMS → 200 OK for CANCEL → Leg A
|
|
ok_cancel = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_cancel,
|
|
"^SIP/2\\.0 200 OK\n"
|
|
".*"
|
|
"CSeq: 1 CANCEL\n"
|
|
)
|
|
|
|
# SEMS → CANCEL → Leg B
|
|
cancel_b = self.recvSIPSkipRetrans(self._uas_sock, "INVITE", b2b_cid)
|
|
self.assertSIP(cancel_b,
|
|
"^CANCEL sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ CANCEL\n"
|
|
)
|
|
|
|
# Leg B → 200 OK for CANCEL → SEMS
|
|
self.sendToSIP(_make_200_ok_for(cancel_b), sems_addr, self._uas_sock)
|
|
|
|
# Leg B → 487 Request Terminated → SEMS
|
|
self.sendToSIP(_make_final_uas(487, "Request Terminated", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# SEMS → ACK → Leg B (for 487)
|
|
ack_b = self.recvSIPSkipRetrans(self._uas_sock, "INVITE", b2b_cid)
|
|
self.assertSIP(ack_b,
|
|
"^ACK sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ ACK\n"
|
|
)
|
|
|
|
# SEMS → 487 → Leg A
|
|
err_a = self.recvSIP(leg_a)
|
|
self.assertSIP(err_a,
|
|
"^SIP/2\\.0 487 [^\r]+\n"
|
|
".*"
|
|
"Call-ID: test-b2b-call-006@127\\.0\\.0\\.1\n"
|
|
".*"
|
|
"CSeq: 1 INVITE\n"
|
|
)
|
|
|
|
# Leg A → ACK (for 487)
|
|
self.sendSIP(
|
|
_make_ack("z9hG4bK-b2b-006-ack", call_id, _to_tag(err_a),
|
|
src_port=src_port), leg_a)
|
|
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 7: 183 Session Progress with SDP (early media)
|
|
# ------------------------------------------------------------------
|
|
def testEarlyMedia(self):
|
|
src_port = 57707
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-007"
|
|
call_id = "test-b2b-call-007@127.0.0.1"
|
|
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
|
|
# Leg B → 183 Session Progress with SDP → SEMS
|
|
self.sendToSIP(_make_provisional(183, "Session Progress", b2b_invite,
|
|
sdp=_SDP_B),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# SEMS → 183 with SDP → Leg A
|
|
prog_a = self.recvSIP(leg_a)
|
|
self.assertSIP(prog_a,
|
|
"^SIP/2\\.0 183 [^\r]+\n"
|
|
".*"
|
|
"Call-ID: test-b2b-call-007@127\\.0\\.0\\.1\n"
|
|
".*"
|
|
"Content-Type: application/sdp\n"
|
|
".*"
|
|
"m=audio \\d+ RTP/AVP"
|
|
)
|
|
|
|
# Leg B → 200 OK → SEMS
|
|
self.sendToSIP(_make_200_ok_invite(b2b_invite), sems_addr, self._uas_sock)
|
|
|
|
# SEMS → 200 OK → Leg A
|
|
ok_a = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_a,
|
|
"^SIP/2\\.0 200 OK\n"
|
|
".*"
|
|
"Call-ID: test-b2b-call-007@127\\.0\\.0\\.1\n"
|
|
".*"
|
|
"m=audio \\d+ RTP/AVP"
|
|
)
|
|
to_tag = _to_tag(ok_a)
|
|
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-007-ack", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK on leg B
|
|
|
|
# Leg A → BYE → SEMS
|
|
self.sendSIP(_make_bye("z9hG4bK-b2b-007-bye", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
bye_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.sendToSIP(_make_200_ok_for(bye_b), sems_addr, self._uas_sock)
|
|
ok_bye = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_bye, "^SIP/2\\.0 200 OK\n.*CSeq: 2 BYE\n")
|
|
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 8: re-INVITE hold from leg A (sendonly → recvonly)
|
|
# ------------------------------------------------------------------
|
|
def testReInviteHoldLegA(self):
|
|
src_port = 57708
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-008"
|
|
call_id = "test-b2b-call-008@127.0.0.1"
|
|
|
|
# --- Initial call setup ---
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 180
|
|
|
|
self.sendToSIP(_make_200_ok_invite(b2b_invite), sems_addr, self._uas_sock)
|
|
ok_a = self.recvSIP(leg_a)
|
|
to_tag = _to_tag(ok_a)
|
|
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-008-ack", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK on leg B
|
|
|
|
# --- re-INVITE: leg A puts call on hold (sendonly) ---
|
|
self.sendSIP(_make_reinvite("z9hG4bK-b2b-008-ri", call_id, to_tag,
|
|
cseq=2, sdp=_SDP_SENDONLY_A,
|
|
src_port=src_port), leg_a)
|
|
|
|
# SEMS → re-INVITE → Leg B (relay)
|
|
ri_b = self.recvSIPSkipRetrans(self._uas_sock, "ACK", b2b_cid)
|
|
self.assertSIP(ri_b,
|
|
"^INVITE sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"a=sendonly"
|
|
)
|
|
|
|
# Leg B → 200 OK (recvonly) → SEMS
|
|
self.sendToSIP(_make_200_ok_invite(ri_b, sdp=_SDP_RECVONLY_B),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# SEMS → 200 OK → Leg A (drain 100 Trying SEMS sent for the re-INVITE)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
ok_ri_a = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_ri_a,
|
|
"^SIP/2\\.0 200 OK\n"
|
|
".*"
|
|
"a=recvonly"
|
|
)
|
|
|
|
# Leg A → ACK for re-INVITE 200 OK
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-008-ri-ack", call_id, to_tag,
|
|
cseq=2, src_port=src_port), leg_a)
|
|
|
|
# SEMS → ACK → Leg B
|
|
ack_ri_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.assertSIP(ack_ri_b,
|
|
"^ACK sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ ACK\n"
|
|
)
|
|
|
|
# --- Tear down (leg A, cseq=3 after two INVITEs) ---
|
|
self.sendSIP(_make_bye("z9hG4bK-b2b-008-bye", call_id, to_tag,
|
|
cseq=3, src_port=src_port), leg_a)
|
|
bye_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.assertSIP(bye_b, "^BYE sip:[^\r]+ SIP/2\\.0\n.*CSeq: \\d+ BYE\n")
|
|
self.sendToSIP(_make_200_ok_for(bye_b), sems_addr, self._uas_sock)
|
|
ok_bye = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_bye, "^SIP/2\\.0 200 OK\n.*CSeq: 3 BYE\n")
|
|
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 9: re-INVITE hold from leg B (sendonly → recvonly)
|
|
# ------------------------------------------------------------------
|
|
def testReInviteHoldLegB(self):
|
|
src_port = 57709
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-009"
|
|
call_id = "test-b2b-call-009@127.0.0.1"
|
|
|
|
# --- Initial call setup ---
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 180
|
|
|
|
self.sendToSIP(_make_200_ok_invite(b2b_invite), sems_addr, self._uas_sock)
|
|
ok_a = self.recvSIP(leg_a)
|
|
to_tag = _to_tag(ok_a)
|
|
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-009-ack", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK on leg B
|
|
|
|
# --- re-INVITE: leg B puts call on hold (sendonly) ---
|
|
# Build re-INVITE in the B2B dialog (leg B → SEMS)
|
|
cid_b2b = b2b_cid
|
|
# In B2B dialog: b2b_invite From = SEMS's B-leg local tag (= To from leg B's view)
|
|
to_b2b = _hdr(b2b_invite, "From")
|
|
sdp_cr = _SDP_SENDONLY_B.replace("\n", "\r\n")
|
|
reinvite_from_b = (
|
|
f"INVITE sip:alice@voip.sipwise.local SIP/2.0\n"
|
|
f"Via: SIP/2.0/UDP 127.0.0.1:5070;branch=z9hG4bK-uas-ri-009;rport\n"
|
|
f"Max-Forwards: 70\n"
|
|
f"To: {to_b2b}\n"
|
|
f"From: <sip:bob@voip.sipwise.local>;tag=uas-tag-001\n"
|
|
f"Call-ID: {cid_b2b}\n"
|
|
f"CSeq: 2 INVITE\n"
|
|
f"Contact: <sip:bob@127.0.0.1:5070>\n"
|
|
f"Content-Type: application/sdp\n"
|
|
f"Content-Length: {len(sdp_cr)}\n"
|
|
"\n"
|
|
+ _SDP_SENDONLY_B
|
|
)
|
|
self.sendToSIP(reinvite_from_b, sems_addr, self._uas_sock)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain 100 Trying SEMS sends for the re-INVITE
|
|
|
|
# SEMS → re-INVITE → Leg A
|
|
ri_a = self.recvSIP(leg_a)
|
|
self.assertSIP(ri_a,
|
|
"^INVITE sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
f"Call-ID: {call_id}\n"
|
|
".*"
|
|
"a=sendonly"
|
|
)
|
|
|
|
# Leg A → 200 OK (recvonly) → SEMS (Contact must match leg_a's bound port)
|
|
self.sendSIP(_make_200_ok_invite(ri_a, sdp=_SDP_RECVONLY_A,
|
|
contact=f"<sip:alice@127.0.0.1:{src_port}>"), leg_a)
|
|
|
|
# SEMS → 200 OK → Leg B
|
|
ok_ri_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.assertSIP(ok_ri_b,
|
|
"^SIP/2\\.0 200 OK\n"
|
|
".*"
|
|
"a=recvonly"
|
|
)
|
|
|
|
# Leg B → ACK for re-INVITE 200 OK → SEMS
|
|
ack_from_b = (
|
|
f"ACK sip:alice@voip.sipwise.local SIP/2.0\n"
|
|
f"Via: SIP/2.0/UDP 127.0.0.1:5070;branch=z9hG4bK-uas-ri-ack-009;rport\n"
|
|
f"Max-Forwards: 70\n"
|
|
f"To: {to_b2b}\n"
|
|
f"From: <sip:bob@voip.sipwise.local>;tag=uas-tag-001\n"
|
|
f"Call-ID: {cid_b2b}\n"
|
|
f"CSeq: 2 ACK\n"
|
|
f"Content-Length: 0\n"
|
|
"\n"
|
|
)
|
|
self.sendToSIP(ack_from_b, sems_addr, self._uas_sock)
|
|
|
|
# SEMS → ACK → Leg A
|
|
ack_ri_a = self.recvSIP(leg_a)
|
|
self.assertSIP(ack_ri_a,
|
|
"^ACK sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
"CSeq: \\d+ ACK\n"
|
|
)
|
|
|
|
# --- Tear down (leg B sends BYE) ---
|
|
bye_from_b = (
|
|
f"BYE sip:alice@voip.sipwise.local SIP/2.0\n"
|
|
f"Via: SIP/2.0/UDP 127.0.0.1:5070;branch=z9hG4bK-uas-bye-009;rport\n"
|
|
f"Max-Forwards: 70\n"
|
|
f"To: {to_b2b}\n"
|
|
f"From: <sip:bob@voip.sipwise.local>;tag=uas-tag-001\n"
|
|
f"Call-ID: {cid_b2b}\n"
|
|
f"CSeq: 10 BYE\n"
|
|
f"Content-Length: 0\n"
|
|
"\n"
|
|
)
|
|
self.sendToSIP(bye_from_b, sems_addr, self._uas_sock)
|
|
|
|
bye_a = self.recvSIP(leg_a)
|
|
self.assertSIP(bye_a,
|
|
"^BYE sip:[^\r]+ SIP/2\\.0\n"
|
|
".*"
|
|
f"Call-ID: {call_id}\n"
|
|
)
|
|
self.sendSIP(_make_200_ok_for(bye_a), leg_a)
|
|
|
|
ok_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.assertSIP(ok_b, "^SIP/2\\.0 200 OK\n.*CSeq: 10 BYE\n")
|
|
|
|
leg_a.close()
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 10: INVITE retransmission — SEMS deduplicates, does not
|
|
# forward the retransmit to leg B
|
|
# ------------------------------------------------------------------
|
|
def testInviteRetransmission(self):
|
|
src_port = 57710
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-010"
|
|
call_id = "test-b2b-call-010@127.0.0.1"
|
|
|
|
invite = _make_invite(branch, call_id, src_port=src_port)
|
|
self.sendSIP(invite, leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
|
|
# Leg B → 100 Trying → SEMS (stops SEMS's own B2B INVITE retransmit timer)
|
|
self.sendToSIP(_make_provisional(100, "Trying", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
# Leg A retransmits the INVITE (same Via branch = same transaction)
|
|
self.sendSIP(invite, leg_a)
|
|
|
|
# SEMS should resend its last provisional (100 Trying / 100 Connecting) to leg A
|
|
retrans_resp = self.recvSIP(leg_a)
|
|
self.assertSIP(retrans_resp, "^SIP/2\\.0 100 [^\r]+\n")
|
|
|
|
# SEMS must NOT forward the retransmission to leg B
|
|
self._uas_sock.settimeout(0.2)
|
|
try:
|
|
while True:
|
|
try:
|
|
extra = self.recvSIP(self._uas_sock)
|
|
except socket.timeout:
|
|
break # no (more) messages — expected
|
|
if b2b_cid in extra:
|
|
self.fail("SEMS forwarded INVITE retransmission to leg B: " + repr(extra[:80]))
|
|
# else: stale message from a previous test — ignore
|
|
finally:
|
|
self._uas_sock.settimeout(3)
|
|
|
|
# Complete the call normally
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 180
|
|
|
|
self.sendToSIP(_make_200_ok_invite(b2b_invite), sems_addr, self._uas_sock)
|
|
ok_a = self.recvSIP(leg_a)
|
|
to_tag = _to_tag(ok_a)
|
|
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-010-ack", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK
|
|
|
|
self.sendSIP(_make_bye("z9hG4bK-b2b-010-bye", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
bye_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.sendToSIP(_make_200_ok_for(bye_b), sems_addr, self._uas_sock)
|
|
ok_bye = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_bye, "^SIP/2\\.0 200 OK\n.*CSeq: 2 BYE\n")
|
|
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 11: Callee 480 Temporarily Unavailable
|
|
# ------------------------------------------------------------------
|
|
def testCallee480(self):
|
|
src_port = 57711
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-011"
|
|
call_id = "test-b2b-call-011@127.0.0.1"
|
|
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
self.sendToSIP(_make_provisional(100, "Trying", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.sendToSIP(_make_final_uas(480, "Temporarily Unavailable", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
ack_b = self.recvSIPSkipRetrans(self._uas_sock, "INVITE", b2b_cid)
|
|
self.assertSIP(ack_b, "^ACK sip:[^\r]+ SIP/2\\.0\n.*CSeq: \\d+ ACK\n")
|
|
|
|
err_a = self.recvSIP(leg_a)
|
|
self.assertSIP(err_a,
|
|
"^SIP/2\\.0 480 [^\r]+\n"
|
|
".*Call-ID: test-b2b-call-011@127\\.0\\.0\\.1\n"
|
|
".*CSeq: 1 INVITE\n"
|
|
)
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-011-ack", call_id, _to_tag(err_a),
|
|
src_port=src_port), leg_a)
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 12: Callee 408 Request Timeout
|
|
# ------------------------------------------------------------------
|
|
def testCallee408(self):
|
|
src_port = 57712
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-012"
|
|
call_id = "test-b2b-call-012@127.0.0.1"
|
|
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
self.sendToSIP(_make_provisional(100, "Trying", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.sendToSIP(_make_final_uas(408, "Request Timeout", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
ack_b = self.recvSIPSkipRetrans(self._uas_sock, "INVITE", b2b_cid)
|
|
self.assertSIP(ack_b, "^ACK sip:[^\r]+ SIP/2\\.0\n.*CSeq: \\d+ ACK\n")
|
|
|
|
err_a = self.recvSIP(leg_a)
|
|
self.assertSIP(err_a,
|
|
"^SIP/2\\.0 \\d+ [^\r]+\n"
|
|
".*Call-ID: test-b2b-call-012@127\\.0\\.0\\.1\n"
|
|
".*CSeq: 1 INVITE\n"
|
|
)
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-012-ack", call_id, _to_tag(err_a),
|
|
src_port=src_port), leg_a)
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 13: Callee 503 Service Unavailable
|
|
# ------------------------------------------------------------------
|
|
def testCallee503(self):
|
|
src_port = 57713
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-013"
|
|
call_id = "test-b2b-call-013@127.0.0.1"
|
|
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
self.sendToSIP(_make_provisional(100, "Trying", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.sendToSIP(_make_final_uas(503, "Service Unavailable", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
|
|
ack_b = self.recvSIPSkipRetrans(self._uas_sock, "INVITE", b2b_cid)
|
|
self.assertSIP(ack_b, "^ACK sip:[^\r]+ SIP/2\\.0\n.*CSeq: \\d+ ACK\n")
|
|
|
|
err_a = self.recvSIP(leg_a)
|
|
self.assertSIP(err_a,
|
|
"^SIP/2\\.0 \\d+ [^\r]+\n"
|
|
".*Call-ID: test-b2b-call-013@127\\.0\\.0\\.1\n"
|
|
".*CSeq: 1 INVITE\n"
|
|
)
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-013-ack", call_id, _to_tag(err_a),
|
|
src_port=src_port), leg_a)
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 14: re-INVITE hold then unhold from leg A
|
|
# ------------------------------------------------------------------
|
|
def testReInviteUnhold(self):
|
|
src_port = 57714
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-014"
|
|
call_id = "test-b2b-call-014@127.0.0.1"
|
|
|
|
# --- Initial call setup ---
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 180
|
|
|
|
self.sendToSIP(_make_200_ok_invite(b2b_invite), sems_addr, self._uas_sock)
|
|
ok_a = self.recvSIP(leg_a)
|
|
to_tag = _to_tag(ok_a)
|
|
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-014-ack", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK on leg B
|
|
|
|
# --- re-INVITE: leg A holds (sendonly, cseq=2) ---
|
|
self.sendSIP(_make_reinvite("z9hG4bK-b2b-014-hold", call_id, to_tag,
|
|
cseq=2, sdp=_SDP_SENDONLY_A,
|
|
src_port=src_port), leg_a)
|
|
ri_b = self.recvSIPSkipRetrans(self._uas_sock, "ACK", b2b_cid)
|
|
self.assertSIP(ri_b, "^INVITE sip:[^\r]+ SIP/2\\.0\n.*a=sendonly")
|
|
|
|
self.sendToSIP(_make_200_ok_invite(ri_b, sdp=_SDP_RECVONLY_B),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 100 Trying for re-INVITE
|
|
ok_hold_a = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_hold_a, "^SIP/2\\.0 200 OK\n.*a=recvonly")
|
|
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-014-hold-ack", call_id, to_tag,
|
|
cseq=2, src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK on leg B
|
|
|
|
# --- re-INVITE: leg A unholds (sendrecv, cseq=3) ---
|
|
self.sendSIP(_make_reinvite("z9hG4bK-b2b-014-unhold", call_id, to_tag,
|
|
cseq=3, sdp=_SDP_SENDRECV_A2,
|
|
src_port=src_port), leg_a)
|
|
ri_unhold_b = self.recvSIPSkipRetrans(self._uas_sock, "ACK", b2b_cid)
|
|
self.assertSIP(ri_unhold_b, "^INVITE sip:[^\r]+ SIP/2\\.0\n.*a=sendrecv")
|
|
|
|
self.sendToSIP(_make_200_ok_invite(ri_unhold_b, sdp=_SDP_SENDRECV_B2),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 100 Trying for re-INVITE
|
|
ok_unhold_a = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_unhold_a, "^SIP/2\\.0 200 OK\n.*a=sendrecv")
|
|
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-014-unhold-ack", call_id, to_tag,
|
|
cseq=3, src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK on leg B
|
|
|
|
# --- Tear down (cseq=4 after three INVITEs) ---
|
|
self.sendSIP(_make_bye("z9hG4bK-b2b-014-bye", call_id, to_tag,
|
|
cseq=4, src_port=src_port), leg_a)
|
|
bye_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.assertSIP(bye_b, "^BYE sip:[^\r]+ SIP/2\\.0\n.*CSeq: \\d+ BYE\n")
|
|
self.sendToSIP(_make_200_ok_for(bye_b), sems_addr, self._uas_sock)
|
|
ok_bye = self.recvSIP(leg_a)
|
|
self.assertSIP(ok_bye, "^SIP/2\\.0 200 OK\n.*CSeq: 4 BYE\n")
|
|
|
|
leg_a.close()
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 15: CANCEL/200 OK race — leg A cancels while leg B answers
|
|
# ------------------------------------------------------------------
|
|
def testEarlyCancelRace(self):
|
|
src_port = 57715
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-015"
|
|
call_id = "test-b2b-call-015@127.0.0.1"
|
|
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 180
|
|
|
|
# Send 200 OK from leg B first so SEMS establishes the call, then send CANCEL.
|
|
# This tests the case where CANCEL arrives after the call is already established.
|
|
# sems-pbx only handles CANCEL for an established call via explicit BYE from leg A;
|
|
# it does not reliably BYE leg B if 200 OK INVITE arrives at an already-stopped session.
|
|
self.sendToSIP(_make_200_ok_invite(b2b_invite), sems_addr, self._uas_sock)
|
|
time.sleep(0.3) # allow SEMS to process 200 OK INVITE and establish the call
|
|
self.sendSIP(_make_cancel(branch, call_id, src_port=src_port), leg_a)
|
|
|
|
# SEMS → response to CANCEL → leg A.
|
|
# If SEMS processed leg B's 200 OK INVITE before the CANCEL it may
|
|
# forward that 200 OK to leg A first; ACK it and keep waiting.
|
|
# On a loaded machine SEMS may reply to CANCEL with 481 (INVITE
|
|
# transaction already completed) rather than 200 OK — accept either.
|
|
ok_cancel = None
|
|
got_200_invite = False
|
|
to_tag_invite = None
|
|
leg_a.settimeout(1)
|
|
try:
|
|
for _ in range(30):
|
|
try:
|
|
msg = self.recvSIP(leg_a)
|
|
except TimeoutError:
|
|
if got_200_invite:
|
|
# Already ACKed established call; CANCEL response may not
|
|
# arrive (SEMS may have already sent 481 and we missed it).
|
|
break
|
|
continue
|
|
if "CSeq: 1 CANCEL" in msg:
|
|
# SEMS responded to CANCEL (200 OK or error such as 481)
|
|
if msg.startswith("SIP/2.0 200"):
|
|
ok_cancel = msg
|
|
break
|
|
if msg.startswith("SIP/2.0 200") and "CSeq: 1 INVITE" in msg:
|
|
# Race: SEMS forwarded leg B's 200 OK before processing CANCEL.
|
|
# ACK it so SEMS can continue with teardown.
|
|
got_200_invite = True
|
|
to_tag_invite = _to_tag(msg)
|
|
self.sendSIP(
|
|
_make_ack(branch + "-ack", call_id, to_tag_invite,
|
|
src_port=src_port),
|
|
leg_a,
|
|
)
|
|
finally:
|
|
leg_a.settimeout(3)
|
|
|
|
if not got_200_invite:
|
|
self.assertIsNotNone(ok_cancel, "SEMS did not send 200 OK for CANCEL to leg A")
|
|
self.assertSIP(ok_cancel, "^SIP/2\\.0 200 OK\n.*CSeq: 1 CANCEL\n")
|
|
|
|
if got_200_invite:
|
|
# The call was briefly established before CANCEL arrived.
|
|
# Send BYE from leg A to trigger teardown — do NOT wait for 200 OK BYE
|
|
# yet: SEMS relays BYE to leg B and blocks until leg B replies.
|
|
# Reading leg_a before _uas_sock would deadlock both sides.
|
|
self.sendSIP(
|
|
_make_bye(branch + "-bye", call_id, to_tag_invite, cseq=2,
|
|
src_port=src_port),
|
|
leg_a,
|
|
)
|
|
# SEMS relays BYE to leg B. Receive it and respond FIRST — only then
|
|
# will SEMS send 200 OK BYE back to leg A.
|
|
bye_b = None
|
|
self._uas_sock.settimeout(1)
|
|
try:
|
|
for _ in range(30):
|
|
try:
|
|
msg = self.recvSIP(self._uas_sock)
|
|
except TimeoutError:
|
|
continue
|
|
if msg.startswith("BYE") and b2b_cid in msg:
|
|
bye_b = msg
|
|
break
|
|
# stale BYE/ACK/other from a previous test — consume silently
|
|
finally:
|
|
self._uas_sock.settimeout(3)
|
|
self.assertIsNotNone(bye_b, "SEMS did not send BYE to leg B after cancel race")
|
|
self.assertSIP(bye_b, "^BYE sip:[^\r]+ SIP/2\\.0\n.*CSeq: \\d+ BYE\n")
|
|
self.sendToSIP(_make_200_ok_for(bye_b), sems_addr, self._uas_sock)
|
|
# SEMS unblocked — now read 200 OK BYE from leg A
|
|
bye_ok_a = self.recvSIP(leg_a)
|
|
self.assertSIP(bye_ok_a, "^SIP/2\\.0 200 OK\n.*CSeq: 2 BYE\n")
|
|
else:
|
|
# Non-race path: SEMS sent CANCEL to leg B — respond so it can proceed.
|
|
self._uas_sock.settimeout(1)
|
|
try:
|
|
for _ in range(10):
|
|
try:
|
|
msg = self.recvSIP(self._uas_sock)
|
|
except TimeoutError:
|
|
break
|
|
if msg.startswith("CANCEL") and b2b_cid in msg:
|
|
self.sendToSIP(_make_200_ok_for(msg), sems_addr, self._uas_sock)
|
|
break
|
|
finally:
|
|
self._uas_sock.settimeout(3)
|
|
# SEMS → 487 Request Terminated → leg A
|
|
err_a = self.recvSIP(leg_a)
|
|
self.assertSIP(err_a,
|
|
"^SIP/2\\.0 487 [^\r]+\n"
|
|
f".*Call-ID: {call_id}\n"
|
|
".*CSeq: 1 INVITE\n"
|
|
)
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-015-ack", call_id, _to_tag(err_a),
|
|
src_port=src_port), leg_a)
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 16+17: Two concurrent calls — teardown of one must not
|
|
# affect the other
|
|
# ------------------------------------------------------------------
|
|
def testConcurrentCalls(self):
|
|
src_port_1, src_port_2 = 57716, 57717
|
|
leg_a1 = self.makeUACSocket(src_port_1)
|
|
leg_a2 = self.makeUACSocket(src_port_2)
|
|
call_id_1 = "test-b2b-call-016@127.0.0.1"
|
|
call_id_2 = "test-b2b-call-017@127.0.0.1"
|
|
|
|
def setup_call(leg_a, invite_branch, call_id, ack_branch, src_port):
|
|
"""Establish a call through ACK. Returns (to_tag, b2b_call_id, sems_addr)."""
|
|
self.sendSIP(_make_invite(invite_branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
b2b_inv, addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_call_id = _hdr(b2b_inv, "Call-ID")
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_inv), addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 180
|
|
self.sendToSIP(_make_200_ok_invite(b2b_inv), addr, self._uas_sock)
|
|
ok = self.recvSIP(leg_a)
|
|
tag = _to_tag(ok)
|
|
self.sendSIP(_make_ack(ack_branch, call_id, tag, src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_call_id) # drain ACK on leg B
|
|
return tag, b2b_call_id, addr
|
|
|
|
to_tag_1, cid_b2b_1, sems_addr = setup_call(
|
|
leg_a1, "z9hG4bK-b2b-016", call_id_1, "z9hG4bK-b2b-016-ack", src_port_1)
|
|
to_tag_2, cid_b2b_2, _ = setup_call(
|
|
leg_a2, "z9hG4bK-b2b-017", call_id_2, "z9hG4bK-b2b-017-ack", src_port_2)
|
|
|
|
# Tear down call 1 — must not touch call 2
|
|
self.sendSIP(_make_bye("z9hG4bK-b2b-016-bye", call_id_1, to_tag_1,
|
|
src_port=src_port_1), leg_a1)
|
|
bye_b1 = self.recvSIPForCall(self._uas_sock, cid_b2b_1)
|
|
# Verify BYE belongs to call 1 (B2B Call-ID)
|
|
self.assertSIP(bye_b1,
|
|
f"^BYE sip:[^\r]+ SIP/2\\.0\n.*Call-ID: {cid_b2b_1}\n")
|
|
self.sendToSIP(_make_200_ok_for(bye_b1), sems_addr, self._uas_sock)
|
|
self.assertSIP(self.recvSIP(leg_a1), "^SIP/2\\.0 200 OK\n.*CSeq: 2 BYE\n")
|
|
|
|
# Call 2 must still be alive: verify by tearing it down successfully
|
|
self.sendSIP(_make_bye("z9hG4bK-b2b-017-bye", call_id_2, to_tag_2,
|
|
src_port=src_port_2), leg_a2)
|
|
bye_b2 = self.recvSIPForCall(self._uas_sock, cid_b2b_2)
|
|
self.assertSIP(bye_b2,
|
|
f"^BYE sip:[^\r]+ SIP/2\\.0\n.*Call-ID: {cid_b2b_2}\n")
|
|
self.sendToSIP(_make_200_ok_for(bye_b2), sems_addr, self._uas_sock)
|
|
self.assertSIP(self.recvSIP(leg_a2), "^SIP/2\\.0 200 OK\n.*CSeq: 2 BYE\n")
|
|
|
|
leg_a1.close()
|
|
leg_a2.close()
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 18: RTP port update after re-INVITE
|
|
# initial RTP on PORT_A/B → re-INVITE changes to PORT_A2/B2
|
|
# → packets must flow on new ports
|
|
# ------------------------------------------------------------------
|
|
def testRtpAfterReInvite(self):
|
|
src_port = 57718
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-018"
|
|
call_id = "test-b2b-call-018@127.0.0.1"
|
|
|
|
# Bind all four media sockets before signaling so Unix paths exist
|
|
rtp_a = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
rtp_a.settimeout(3); rtp_a.bind(("127.0.0.1", _RTP_PORT_A))
|
|
rtp_b = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
rtp_b.settimeout(3); rtp_b.bind(("127.0.0.1", _RTP_PORT_B))
|
|
rtp_a2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
rtp_a2.settimeout(3); rtp_a2.bind(("127.0.0.1", _RTP_PORT_A2))
|
|
rtp_b2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
rtp_b2.settimeout(3); rtp_b2.bind(("127.0.0.1", _RTP_PORT_B2))
|
|
|
|
# --- Initial call setup ---
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
b2b_invite, sems_addr = self.recvB2BINVITE(self._uas_sock)
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 180
|
|
self.sendToSIP(_make_200_ok_invite(b2b_invite), sems_addr, self._uas_sock)
|
|
ok_a = self.recvSIP(leg_a)
|
|
to_tag = _to_tag(ok_a)
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-018-ack", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK
|
|
|
|
# Verify initial RTP (PORT_A ↔ PORT_B)
|
|
self.assertEqual(_sdp_media_port(b2b_invite), _RTP_PORT_A)
|
|
self.assertEqual(_sdp_media_port(ok_a), _RTP_PORT_B)
|
|
pkt1 = _make_rtp(seq=1, ts=160, ssrc=0x11111111)
|
|
rtp_b.sendto(pkt1, ("127.0.0.1", _RTP_PORT_A))
|
|
self.assertEqual(rtp_a.recv(256), pkt1)
|
|
rtp_a.sendto(pkt1, ("127.0.0.1", _RTP_PORT_B))
|
|
self.assertEqual(rtp_b.recv(256), pkt1)
|
|
|
|
# --- re-INVITE: leg A updates ports to A2 (sendrecv) ---
|
|
self.sendSIP(_make_reinvite("z9hG4bK-b2b-018-ri", call_id, to_tag,
|
|
cseq=2, sdp=_SDP_SENDRECV_A2,
|
|
src_port=src_port), leg_a)
|
|
ri_b = self.recvSIPSkipRetrans(self._uas_sock, "ACK", b2b_cid)
|
|
self.sendToSIP(_make_200_ok_invite(ri_b, sdp=_SDP_SENDRECV_B2),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 100 Trying for re-INVITE
|
|
ok_ri_a = self.recvSIP(leg_a)
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-018-ri-ack", call_id, to_tag,
|
|
cseq=2, src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK
|
|
|
|
# Verify RTP now flows on new ports (PORT_A2 ↔ PORT_B2)
|
|
self.assertEqual(_sdp_media_port(ri_b), _RTP_PORT_A2)
|
|
self.assertEqual(_sdp_media_port(ok_ri_a), _RTP_PORT_B2)
|
|
pkt2 = _make_rtp(seq=2, ts=320, ssrc=0x22222222)
|
|
rtp_b2.sendto(pkt2, ("127.0.0.1", _RTP_PORT_A2))
|
|
self.assertEqual(rtp_a2.recv(256), pkt2)
|
|
rtp_a2.sendto(pkt2, ("127.0.0.1", _RTP_PORT_B2))
|
|
self.assertEqual(rtp_b2.recv(256), pkt2)
|
|
|
|
rtp_a.close(); rtp_b.close(); rtp_a2.close(); rtp_b2.close()
|
|
|
|
# BYE (cseq=3 after two INVITEs)
|
|
self.sendSIP(_make_bye("z9hG4bK-b2b-018-bye", call_id, to_tag,
|
|
cseq=3, src_port=src_port), leg_a)
|
|
bye_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.sendToSIP(_make_200_ok_for(bye_b), sems_addr, self._uas_sock)
|
|
self.assertSIP(self.recvSIP(leg_a), "^SIP/2\\.0 200 OK\n.*CSeq: 3 BYE\n")
|
|
leg_a.close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scenario 19: RTP during hold (a=sendonly from leg A)
|
|
# sendonly: A→B flows on hold ports; B does not send (recvonly)
|
|
# In RTP_Direct mode SEMS does not filter by direction — direction
|
|
# enforcement is left to endpoints; this test verifies the active
|
|
# direction (A→B) and that the SDP direction flags are preserved.
|
|
# ------------------------------------------------------------------
|
|
def testRtpDirectionHold(self):
|
|
src_port = 57719
|
|
leg_a = self.makeUACSocket(src_port)
|
|
branch = "z9hG4bK-b2b-019"
|
|
call_id = "test-b2b-call-019@127.0.0.1"
|
|
|
|
# Only need hold-phase sockets
|
|
rtp_a2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
rtp_a2.settimeout(3); rtp_a2.bind(("127.0.0.1", _RTP_PORT_A2))
|
|
rtp_b2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
rtp_b2.settimeout(3); rtp_b2.bind(("127.0.0.1", _RTP_PORT_B2))
|
|
|
|
# --- Setup call ---
|
|
self.sendSIP(_make_invite(branch, call_id, src_port=src_port), leg_a)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
b2b_invite = sems_addr = None
|
|
for _ in range(10):
|
|
msg, addr = self.recvFromSIP(self._uas_sock)
|
|
if msg.startswith("INVITE"):
|
|
b2b_invite, sems_addr = msg, addr
|
|
break
|
|
self.assertIsNotNone(b2b_invite, "SEMS did not forward INVITE to leg B")
|
|
b2b_cid = _hdr(b2b_invite, "Call-ID")
|
|
self.sendToSIP(_make_provisional(180, "Ringing", b2b_invite),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 180
|
|
self.sendToSIP(_make_200_ok_invite(b2b_invite), sems_addr, self._uas_sock)
|
|
ok_a = self.recvSIP(leg_a)
|
|
to_tag = _to_tag(ok_a)
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-019-ack", call_id, to_tag,
|
|
src_port=src_port), leg_a)
|
|
# Wait for SEMS to forward the ACK to leg B, skipping stale messages.
|
|
# Must receive the ACK before sending re-INVITE to avoid a 491 rejection.
|
|
for _ in range(10):
|
|
msg = self.recvSIP(self._uas_sock)
|
|
if msg.startswith("ACK") and b2b_cid in msg:
|
|
break
|
|
|
|
# --- Hold: leg A sendonly (PORT_A2), leg B recvonly (PORT_B2) ---
|
|
self.sendSIP(_make_reinvite("z9hG4bK-b2b-019-hold", call_id, to_tag,
|
|
cseq=2, sdp=_SDP_SENDONLY_A,
|
|
src_port=src_port), leg_a)
|
|
ri_b = self.recvSIPSkipRetrans(self._uas_sock, "ACK", b2b_cid)
|
|
# SEMS must relay a=sendonly to leg B unchanged
|
|
self.assertSIP(ri_b, "^INVITE sip:[^\r]+ SIP/2\\.0\n.*a=sendonly")
|
|
self.sendToSIP(_make_200_ok_invite(ri_b, sdp=_SDP_RECVONLY_B),
|
|
sems_addr, self._uas_sock)
|
|
self.recvSIP(leg_a) # 100 Trying
|
|
ok_ri_a = self.recvSIP(leg_a)
|
|
# SEMS must relay a=recvonly back to leg A unchanged
|
|
self.assertSIP(ok_ri_a, "^SIP/2\\.0 200 OK\n.*a=recvonly")
|
|
self.sendSIP(_make_ack("z9hG4bK-b2b-019-hold-ack", call_id, to_tag,
|
|
cseq=2, src_port=src_port), leg_a)
|
|
self.recvSIPForCall(self._uas_sock, b2b_cid) # drain ACK
|
|
|
|
# sendonly → leg A sends to PORT_B2 (from recvonly answer); leg B receives
|
|
dest_a_to_b = _sdp_media_port(ok_ri_a)
|
|
self.assertEqual(dest_a_to_b, _RTP_PORT_B2)
|
|
pkt_a2b = _make_rtp(seq=1, ts=160, ssrc=0xA0A0A0A0)
|
|
rtp_a2.sendto(pkt_a2b, ("127.0.0.1", dest_a_to_b))
|
|
self.assertEqual(rtp_b2.recv(256), pkt_a2b)
|
|
|
|
# recvonly → leg B does not send; PORT_A2 stays silent
|
|
# (In RTP_Direct mode SEMS does not intercept; this verifies no
|
|
# accidental traffic arrives from the test harness itself.)
|
|
dest_b_to_a = _sdp_media_port(ri_b)
|
|
self.assertEqual(dest_b_to_a, _RTP_PORT_A2)
|
|
rtp_a2.settimeout(0.2)
|
|
with self.assertRaises(TimeoutError):
|
|
rtp_a2.recv(256)
|
|
rtp_a2.settimeout(3)
|
|
|
|
rtp_a2.close(); rtp_b2.close()
|
|
|
|
# BYE (cseq=3)
|
|
self.sendSIP(_make_bye("z9hG4bK-b2b-019-bye", call_id, to_tag,
|
|
cseq=3, src_port=src_port), leg_a)
|
|
bye_b = self.recvSIPForCall(self._uas_sock, b2b_cid)
|
|
self.sendToSIP(_make_200_ok_for(bye_b), sems_addr, self._uas_sock)
|
|
self.assertSIP(self.recvSIP(leg_a), "^SIP/2\\.0 200 OK\n.*CSeq: 3 BYE\n")
|
|
leg_a.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sems_tester.main()
|