From 837c024b5f6b42166b18088bb8cf39cd047a7c00 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Thu, 19 Jan 2023 14:34:26 -0500 Subject: [PATCH] MT#56447 support multiple Janus subs in one req According to: { "request" : "join", "ptype" : "subscriber", "room" : , "use_msid" : , "autoupdate" : , "private_id" : , "streams" : [ { "feed" : , "mid" : "" "crossrefid" : "" // Optionally, simulcast or SVC targets (defaults if missing) }, // Other streams to subscribe to ] } { "videoroom" : "attached", "room" : , "streams" : [ { "mindex" : , "mid" : "", "type" : "", "feed_id" : , "feed_mid" : "", "feed_display" : "", "send" : , "ready" : }, // Other streams in the subscription, if any ] } Change-Id: Ieb38d4f562686283457a963334056b27927be974 --- daemon/janus.c | 85 +++++++++- t/auto-daemon-tests-websocket.py | 283 +++++++++++++++++++++++++++++++ 2 files changed, 361 insertions(+), 7 deletions(-) diff --git a/daemon/janus.c b/daemon/janus.c index f570cbcef..876c3af2f 100644 --- a/daemon/janus.c +++ b/daemon/janus.c @@ -335,6 +335,13 @@ static const char *janus_videoroom_join_sub(struct janus_handle *handle, struct } +static void janus_clear_ret_streams(GQueue *q) { + uint64_t *id; + while ((id = g_queue_pop_head(q))) + g_slice_free1(sizeof(*id), id); +} + + static const char *janus_videoroom_join(struct websocket_message *wm, struct janus_session *session, const char *transaction, struct janus_handle *handle, JsonBuilder *builder, JsonReader *reader, const char **successp, @@ -384,7 +391,10 @@ static const char *janus_videoroom_join(struct websocket_message *wm, struct jan if (is_pub && g_hash_table_lookup(room->publishers, &handle->id)) return "User already exists in the room as a publisher"; - uint64_t feed_id = 0; + uint64_t feed_id = 0; // set for single feed IDs, otherwise remains 0 + AUTO_CLEANUP_INIT(GString *feed_ids, __g_string_free, g_string_new("feeds ")); // for log output + AUTO_CLEANUP(GQueue ret_streams, janus_clear_ret_streams) = G_QUEUE_INIT; // return list for multiple subs + if (is_pub) { // random feed ID while (1) { @@ -422,10 +432,46 @@ static const char *janus_videoroom_join(struct websocket_message *wm, struct jan if (ret) return ret; } - else - return "JSON object does not contain 'message.feed' key"; json_reader_end_member(reader); + // handle list of subscriptions if given + if (json_reader_read_member(reader, "streams")) { + *retcode = 456; + if (!json_reader_is_array(reader)) + return "Invalid 'message.streams' key (not an array)"; + int eles = json_reader_count_elements(reader); + if (eles < 0) + return "Invalid 'message.streams' key (invalid array)"; + for (int i = 0; i < eles; i++) { + if (!json_reader_read_element(reader, i)) + return "Invalid 'message.streams' key (cannot read element)"; + if (!json_reader_is_object(reader)) + return "Invalid 'message.streams' key (contains not an object)"; + if (!json_reader_read_member(reader, "feed")) + return "Invalid 'message.streams' key (doesn't contain 'feed')"; + uint64_t fid = jr_str_int(reader); // leave `feed_id` zero + if (!fid) + return "Invalid 'message.streams' key (contains invalid 'feed')"; + const char *ret = janus_videoroom_join_sub(handle, room, retcode, fid, + call, &srcs); + if (ret) + return ret; + json_reader_end_member(reader); + json_reader_end_element(reader); + + g_string_append_printf(feed_ids, "%" PRIu64 ", ", fid); + + uint64_t *fidp = g_slice_alloc(sizeof(*fidp)); + *fidp = fid; + g_queue_push_tail(&ret_streams, fidp); + } + } + json_reader_end_member(reader); + + *retcode = 456; + if (!srcs.length) + return "No feeds to subscribe to given"; + AUTO_CLEANUP_GBUF(dest_handle_buf); dest_handle_buf = g_strdup_printf("%" PRIu64, handle->id); str dest_handle_str; @@ -469,9 +515,15 @@ static const char *janus_videoroom_join(struct websocket_message *wm, struct jan handle->room = room_id; - ilog(LOG_INFO, "Handle %" PRIu64 " has joined room %" PRIu64 " as %s (feed %" PRIu64 ")", + // single or multiple feed IDs? + if (feed_id) + g_string_printf(feed_ids, "feed %" PRIu64, feed_id); + else if (feed_ids->len >= 2) // truncate trailing ", " + g_string_truncate(feed_ids, feed_ids->len - 2); + + ilog(LOG_INFO, "Handle %" PRIu64 " has joined room %" PRIu64 " as %s (%s)", handle->id, room_id, - is_pub ? "publisher" : "subscriber", feed_id); + is_pub ? "publisher" : "subscriber", feed_ids->str); *successp = "event"; @@ -491,8 +543,27 @@ static const char *janus_videoroom_join(struct websocket_message *wm, struct jan json_builder_add_string_value(builder, "attached"); json_builder_set_member_name(builder, "room"); json_builder_add_int_value(builder, room_id); - json_builder_set_member_name(builder, "id"); - json_builder_add_int_value(builder, feed_id); + + // output format: single feed ID or multiple? + if (feed_id) { + json_builder_set_member_name(builder, "id"); + json_builder_add_int_value(builder, feed_id); + } + else { + json_builder_set_member_name(builder, "streams"); + json_builder_begin_array(builder); + uint64_t idx = 0; + for (GList *l = ret_streams.head; l; l = l->next) { + uint64_t *fidp = l->data; + json_builder_begin_object(builder); + json_builder_set_member_name(builder, "mindex"); + json_builder_add_int_value(builder, idx++); + json_builder_set_member_name(builder, "feed_id"); + json_builder_add_int_value(builder, *fidp); + json_builder_end_object(builder); + } + json_builder_end_array(builder); + } } } diff --git a/t/auto-daemon-tests-websocket.py b/t/auto-daemon-tests-websocket.py index 6d889ccf8..2304ce381 100644 --- a/t/auto-daemon-tests-websocket.py +++ b/t/auto-daemon-tests-websocket.py @@ -634,6 +634,289 @@ class TestVideoroom(unittest.TestCase): }, ) + def testVideoroomWebRTCAlt(self): + # alternative usage: publisher == controller, no extra feed_id, no room specified + + (token, session, control_handle, room) = self.startVideoroom() + + # timeout test + eventloop.run_until_complete(asyncio.sleep(3)) + + eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "exists", + "room": room, + }, + "handle_id": control_handle, + "session_id": session, + "token": token, + }, + ) + ) + self.assertEqual( + self._res, + { + "janus": "success", + "session_id": session, + "sender": control_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "success", + "room": room, + "exists": True, + }, + }, + }, + ) + + pub_handle = control_handle + + feed = self.createPublisher(token, session, room, pub_handle) + self.assertNotEqual(feed, control_handle) + + # publish as plain RTP + eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "configure", + "audio": True, + "video": True, + }, + "jsep": { + "type": "offer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.3\r\n" + "c=IN IP4 203.0.113.2\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 8000 RTP/AVP 8 0\r\n" + "a=sendonly\r\n" + ), + }, + "handle_id": pub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the event notification + eventloop.run_until_complete(testIJanus(self)) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=- \d+ \d+ IN IP4 203.0.113.1\r\n" + "s=rtpengine.*?\r\n" + "t=0 0\r\n" + "m=audio \d+ RTP/AVP 8\r\n" + "c=IN IP4 203.0.113.1\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=recvonly\r\n" + "a=rtcp:\d+\r\n$", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": pub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "room": room, + "configured": "ok", + "audio_codec": "PCMA", + }, + }, + "jsep": {"type": "answer", "sdp": sdp}, + }, + ) + + sub_handle = self.createHandle(token, session) + self.assertNotEqual(sub_handle, pub_handle) + self.assertNotEqual(sub_handle, control_handle) + + # subscriber expects full WebRTC attributes + eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "join", + "ptype": "subscriber", + "room": room, + "streams": [ + { "feed": feed }, + ], + }, + "handle_id": sub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the attached event + eventloop.run_until_complete(testIJanus(self)) + self.assertEqual(len(self._res["plugindata"]["data"]["streams"]), 1) + self.assertEqual(feed, self._res["plugindata"]["data"]["streams"][0]["feed_id"]) + self.assertNotEqual(feed, control_handle) + self.assertNotEqual(feed, session) + self.assertNotEqual(feed, room) + self.assertNotEqual(feed, pub_handle) + self.assertNotEqual(feed, sub_handle) + sdp = self._res["jsep"]["sdp"] + self.assertIsInstance(sdp, str) + self.assertRegex( + sdp, + re.compile( + "^v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.3\r\n" + "c=IN IP4 203.0.113.1\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio \d+ UDP/TLS/RTP/SAVPF 8\r\n" + "a=mid:1\r\n" + "a=rtpmap:8 PCMA/8000\r\n" + "a=sendonly\r\n" + "a=rtcp-mux\r\n" + "a=setup:actpass\r\n" + "a=fingerprint:sha-256 .{95}\r\n" + "a=tls-id:[0-9a-f]{32}\r\n" + "a=ice-ufrag:.{8}\r\n" + "a=ice-pwd:.{26}\r\n" + "a=ice-options:trickle\r\n" + "a=candidate:.{16} 1 UDP 2130706431 203.0.113.1 \d+ typ host\r\n" + "a=end-of-candidates\r\n$", + re.DOTALL, + ), + ) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": sub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "attached", + "room": room, + "streams": [ + { + "mindex": 0, + "feed_id": feed, + }, + ], + }, + }, + "jsep": {"type": "offer", "sdp": sdp}, + }, + ) + + # subscriber #1 answer + eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": {"request": "start", "room": room, "feed": feed}, + "jsep": { + "type": "answer", + "sdp": ( + "v=0\r\n" + "o=x 123 123 IN IP4 203.0.113.2\r\n" + "c=IN IP4 0.0.0.0\r\n" + "s=foobar\r\n" + "t=0 0\r\n" + "m=audio 9 RTP/AVP 8\r\n" + "a=mid:audio\r\n" + "a=ice-ufrag:abcd\r\n" + "a=ice-pwd:WD1pLsdgsdfsdWuEBb0vjyZr\r\n" + "a=ice-options:trickle\r\n" + "a=rtcp-mux\r\n" + "a=recvonly\r\n" + ), + }, + "handle_id": sub_handle, + "session_id": session, + "token": token, + }, + ) + ) + # ack is received first + self.assertEqual(self._res, {"janus": "ack", "session_id": session}) + # followed by the attached event + eventloop.run_until_complete(testIJanus(self)) + self.assertEqual( + self._res, + { + "janus": "event", + "session_id": session, + "sender": sub_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "event", + "started": "ok", + "room": room, + }, + }, + }, + ) + + self.destroyVideoroom(token, session, control_handle, room) + + eventloop.run_until_complete( + testIOJanus( + self, + { + "janus": "message", + "body": { + "request": "exists", + "room": room, + }, + "handle_id": control_handle, + "session_id": session, + "token": token, + }, + ) + ) + self.assertEqual( + self._res, + { + "janus": "success", + "session_id": session, + "sender": control_handle, + "plugindata": { + "plugin": "janus.plugin.videoroom", + "data": { + "videoroom": "success", + "room": room, + "exists": False, + }, + }, + }, + ) + def testVideoroomSDESDTLS(self): (token, session, control_handle, room) = self.startVideoroom()