MT#56447 support multiple websocket connections

... in websocket test script

Change-Id: I5c7a11340ae1b656607ea44e258e267ad0c4ebff
pull/1642/head
Richard Fuchs 3 years ago
parent 829b37e8d9
commit d3400de517

@ -17,7 +17,7 @@ from websockets import connect
eventloop = None
async def get_ws(cls, proto):
async def make_ws(cls, proto):
from platform import python_version
from websockets import __version__
@ -27,48 +27,67 @@ async def get_ws(cls, proto):
raise unittest.SkipTest(msg)
for _ in range(1, 300):
try:
cls._ws = await connect("ws://127.0.0.1:9191/", subprotocols=[proto])
break
conn = await connect("ws://127.0.0.1:9191/", subprotocols=[proto])
return conn
except FileNotFoundError:
await asyncio.sleep(0.1)
async def testIO(self, msg):
await self._ws.send(msg)
self._res = await asyncio.wait_for(self._ws.recv(), timeout=10)
async def get_ws(cls, proto, num=1):
cls._ws = []
for _ in range(num):
conn = await make_ws(cls, proto)
cls._ws.append(conn)
async def testIOJson(self, msg):
await self._ws.send(json.dumps(msg))
self._res = await asyncio.wait_for(self._ws.recv(), timeout=10)
async def get_more_ws(cls, proto, num=1):
for _ in range(num):
conn = await make_ws(cls, proto)
cls._ws.append(conn)
async def close_ws(cls):
for conn in cls._ws:
await conn.close()
cls._ws.clear()
async def testIO(self, msg, conn_num=0):
await self._ws[conn_num].send(msg)
self._res = await asyncio.wait_for(self._ws[conn_num].recv(), timeout=10)
async def testIOJson(self, msg, conn_num=0):
await self._ws[conn_num].send(json.dumps(msg))
self._res = await asyncio.wait_for(self._ws[conn_num].recv(), timeout=10)
self._res = json.loads(self._res)
async def testIJson(self):
self._res = await asyncio.wait_for(self._ws.recv(), timeout=10)
async def testIJson(self, conn_num=0):
self._res = await asyncio.wait_for(self._ws[conn_num].recv(), timeout=10)
self._res = json.loads(self._res)
async def testIJanus(self):
self._res = await asyncio.wait_for(self._ws.recv(), timeout=10)
async def testIJanus(self, conn_num=0):
self._res = await asyncio.wait_for(self._ws[conn_num].recv(), timeout=10)
self._res = json.loads(self._res)
self.assertEqual(self._res["transaction"], self._trans)
del self._res["transaction"]
async def testIOJanus(self, msg):
async def testIOJanus(self, msg, conn_num=0):
trans = str(uuid.uuid4())
msg["transaction"] = trans
self._trans = trans
await self._ws.send(json.dumps(msg))
await testIJanus(self)
await self._ws[conn_num].send(json.dumps(msg))
await testIJanus(self, conn_num)
async def testOJanus(self, msg):
async def testOJanus(self, msg, conn_num=0):
trans = str(uuid.uuid4())
msg["transaction"] = trans
self._trans = trans
await self._ws.send(json.dumps(msg))
await self._ws[conn_num].send(json.dumps(msg))
class TestWSEcho(unittest.TestCase):
@ -78,7 +97,7 @@ class TestWSEcho(unittest.TestCase):
@classmethod
def tearDownClass(cls):
eventloop.run_until_complete(cls._ws.close())
eventloop.run_until_complete(close_ws(cls))
def testEcho(self):
eventloop.run_until_complete(testIO(self, b"foobar"))
@ -96,7 +115,7 @@ class TestWSCli(unittest.TestCase):
@classmethod
def tearDownClass(cls):
eventloop.run_until_complete(cls._ws.close())
eventloop.run_until_complete(close_ws(cls))
def testListNumsessions(self):
# race condition here if this runs at the same as the janus test (creates call)
@ -120,7 +139,7 @@ class TestWSJanus(unittest.TestCase):
@classmethod
def tearDownClass(cls):
eventloop.run_until_complete(cls._ws.close())
eventloop.run_until_complete(close_ws(cls))
def testPing(self):
eventloop.run_until_complete(
@ -164,14 +183,19 @@ class TestWSJanus(unittest.TestCase):
class TestVideoroom(unittest.TestCase):
@classmethod
def setUpClass(cls):
eventloop.run_until_complete(get_ws(cls, "janus-protocol"))
cls.maxDiff = None
cls._ws = []
@classmethod
def tearDownClass(cls):
eventloop.run_until_complete(cls._ws.close())
eventloop.run_until_complete(close_ws(cls))
def startSession(self):
self.maxDiff = None
def startSession(self, conn_num=0):
# make sure we have a matching connection
if conn_num >= len(self._ws):
eventloop.run_until_complete(
get_more_ws(self, "janus-protocol", conn_num - len(self._ws) + 1)
)
token = str(uuid.uuid4())
@ -183,6 +207,7 @@ class TestVideoroom(unittest.TestCase):
"token": token,
"admin_secret": "dfgdfgdvgLyATjHPvckg",
},
conn_num,
)
)
self.assertEqual(
@ -199,6 +224,7 @@ class TestVideoroom(unittest.TestCase):
"token": token,
"admin_secret": "dfgdfgdvgLyATjHPvckg",
},
conn_num,
)
)
session = self._res["data"]["id"]
@ -208,6 +234,9 @@ class TestVideoroom(unittest.TestCase):
return (token, session)
def startVideoroom(self):
# start fresh
self.closeConns()
(token, session) = self.startSession()
handle = self.createHandle(token, session)
@ -280,7 +309,10 @@ class TestVideoroom(unittest.TestCase):
},
)
def createHandle(self, token, session):
def closeConns(self):
eventloop.run_until_complete(close_ws(self))
def createHandle(self, token, session, conn_num=0):
eventloop.run_until_complete(
testIOJanus(
self,
@ -291,6 +323,7 @@ class TestVideoroom(unittest.TestCase):
"token": token,
"opaque_id": None,
},
conn_num,
)
)
handle = self._res["data"]["id"]
@ -303,7 +336,7 @@ class TestVideoroom(unittest.TestCase):
return handle
def createPublisher(self, token, session, room, handle, pubs=[]):
def createPublisher(self, token, session, room, handle, pubs=[], conn_num=0):
eventloop.run_until_complete(
testIOJanus(
self,
@ -314,12 +347,13 @@ class TestVideoroom(unittest.TestCase):
"session_id": session,
"token": token,
},
conn_num,
)
)
# ack is received first
self.assertEqual(self._res, {"janus": "ack", "session_id": session})
# followed by the joined event
eventloop.run_until_complete(testIJanus(self))
eventloop.run_until_complete(testIJanus(self, conn_num))
feed = self._res["plugindata"]["data"]["id"]
self.assertIsInstance(feed, int)
self.assertNotEqual(feed, session)

Loading…
Cancel
Save