MT#56447 support multiple Janus subs in one req

According to:

{
	"request" : "join",
	"ptype" : "subscriber",
	"room" : <unique ID of the room to subscribe in>,
	"use_msid" : <whether subscriptions should include an msid that references the publisher; false by default>,
	"autoupdate" : <whether a new SDP offer is sent automatically when a subscribed publisher leaves; true by default>,
	"private_id" : <unique ID of the publisher that originated this request; optional, unless mandated by the room configuration>,
	"streams" : [
		{
			"feed" : <unique ID of publisher owning the stream to subscribe to>,
			"mid" : "<unique mid of the publisher stream to subscribe to; optional>"
			"crossrefid" : "<id to map this subscription with entries in streams list; optional>"
			// Optionally, simulcast or SVC targets (defaults if missing)
		},
		// Other streams to subscribe to
	]
}

{
	"videoroom" : "attached",
	"room" : <room ID>,
	"streams" : [
		{
			"mindex" : <unique m-index of this stream>,
			"mid" : "<unique mid of this stream>",
			"type" : "<type of this stream's media (audio|video|data)>",
			"feed_id" : <unique ID of the publisher originating this stream>,
			"feed_mid" : "<unique mid of this publisher's stream>",
			"feed_display" : "<display name of this publisher, if any>",
			"send" : <true|false; whether we configured the stream to relay media>,
			"ready" : <true|false; whether this stream is ready to start sending media (will be false at the beginning)>
		},
		// Other streams in the subscription, if any
	]
}

Change-Id: Ieb38d4f562686283457a963334056b27927be974
pull/1611/head
Richard Fuchs 3 years ago
parent 38c1f239bf
commit 837c024b5f

@ -335,6 +335,13 @@ static const char *janus_videoroom_join_sub(struct janus_handle *handle, struct
}
static void janus_clear_ret_streams(GQueue *q) {
uint64_t *id;
while ((id = g_queue_pop_head(q)))
g_slice_free1(sizeof(*id), id);
}
static const char *janus_videoroom_join(struct websocket_message *wm, struct janus_session *session,
const char *transaction,
struct janus_handle *handle, JsonBuilder *builder, JsonReader *reader, const char **successp,
@ -384,7 +391,10 @@ static const char *janus_videoroom_join(struct websocket_message *wm, struct jan
if (is_pub && g_hash_table_lookup(room->publishers, &handle->id))
return "User already exists in the room as a publisher";
uint64_t feed_id = 0;
uint64_t feed_id = 0; // set for single feed IDs, otherwise remains 0
AUTO_CLEANUP_INIT(GString *feed_ids, __g_string_free, g_string_new("feeds ")); // for log output
AUTO_CLEANUP(GQueue ret_streams, janus_clear_ret_streams) = G_QUEUE_INIT; // return list for multiple subs
if (is_pub) {
// random feed ID
while (1) {
@ -422,10 +432,46 @@ static const char *janus_videoroom_join(struct websocket_message *wm, struct jan
if (ret)
return ret;
}
else
return "JSON object does not contain 'message.feed' key";
json_reader_end_member(reader);
// handle list of subscriptions if given
if (json_reader_read_member(reader, "streams")) {
*retcode = 456;
if (!json_reader_is_array(reader))
return "Invalid 'message.streams' key (not an array)";
int eles = json_reader_count_elements(reader);
if (eles < 0)
return "Invalid 'message.streams' key (invalid array)";
for (int i = 0; i < eles; i++) {
if (!json_reader_read_element(reader, i))
return "Invalid 'message.streams' key (cannot read element)";
if (!json_reader_is_object(reader))
return "Invalid 'message.streams' key (contains not an object)";
if (!json_reader_read_member(reader, "feed"))
return "Invalid 'message.streams' key (doesn't contain 'feed')";
uint64_t fid = jr_str_int(reader); // leave `feed_id` zero
if (!fid)
return "Invalid 'message.streams' key (contains invalid 'feed')";
const char *ret = janus_videoroom_join_sub(handle, room, retcode, fid,
call, &srcs);
if (ret)
return ret;
json_reader_end_member(reader);
json_reader_end_element(reader);
g_string_append_printf(feed_ids, "%" PRIu64 ", ", fid);
uint64_t *fidp = g_slice_alloc(sizeof(*fidp));
*fidp = fid;
g_queue_push_tail(&ret_streams, fidp);
}
}
json_reader_end_member(reader);
*retcode = 456;
if (!srcs.length)
return "No feeds to subscribe to given";
AUTO_CLEANUP_GBUF(dest_handle_buf);
dest_handle_buf = g_strdup_printf("%" PRIu64, handle->id);
str dest_handle_str;
@ -469,9 +515,15 @@ static const char *janus_videoroom_join(struct websocket_message *wm, struct jan
handle->room = room_id;
ilog(LOG_INFO, "Handle %" PRIu64 " has joined room %" PRIu64 " as %s (feed %" PRIu64 ")",
// single or multiple feed IDs?
if (feed_id)
g_string_printf(feed_ids, "feed %" PRIu64, feed_id);
else if (feed_ids->len >= 2) // truncate trailing ", "
g_string_truncate(feed_ids, feed_ids->len - 2);
ilog(LOG_INFO, "Handle %" PRIu64 " has joined room %" PRIu64 " as %s (%s)",
handle->id, room_id,
is_pub ? "publisher" : "subscriber", feed_id);
is_pub ? "publisher" : "subscriber", feed_ids->str);
*successp = "event";
@ -491,8 +543,27 @@ static const char *janus_videoroom_join(struct websocket_message *wm, struct jan
json_builder_add_string_value(builder, "attached");
json_builder_set_member_name(builder, "room");
json_builder_add_int_value(builder, room_id);
json_builder_set_member_name(builder, "id");
json_builder_add_int_value(builder, feed_id);
// output format: single feed ID or multiple?
if (feed_id) {
json_builder_set_member_name(builder, "id");
json_builder_add_int_value(builder, feed_id);
}
else {
json_builder_set_member_name(builder, "streams");
json_builder_begin_array(builder);
uint64_t idx = 0;
for (GList *l = ret_streams.head; l; l = l->next) {
uint64_t *fidp = l->data;
json_builder_begin_object(builder);
json_builder_set_member_name(builder, "mindex");
json_builder_add_int_value(builder, idx++);
json_builder_set_member_name(builder, "feed_id");
json_builder_add_int_value(builder, *fidp);
json_builder_end_object(builder);
}
json_builder_end_array(builder);
}
}
}

@ -634,6 +634,289 @@ class TestVideoroom(unittest.TestCase):
},
)
def testVideoroomWebRTCAlt(self):
# alternative usage: publisher == controller, no extra feed_id, no room specified
(token, session, control_handle, room) = self.startVideoroom()
# timeout test
eventloop.run_until_complete(asyncio.sleep(3))
eventloop.run_until_complete(
testIOJanus(
self,
{
"janus": "message",
"body": {
"request": "exists",
"room": room,
},
"handle_id": control_handle,
"session_id": session,
"token": token,
},
)
)
self.assertEqual(
self._res,
{
"janus": "success",
"session_id": session,
"sender": control_handle,
"plugindata": {
"plugin": "janus.plugin.videoroom",
"data": {
"videoroom": "success",
"room": room,
"exists": True,
},
},
},
)
pub_handle = control_handle
feed = self.createPublisher(token, session, room, pub_handle)
self.assertNotEqual(feed, control_handle)
# publish as plain RTP
eventloop.run_until_complete(
testIOJanus(
self,
{
"janus": "message",
"body": {
"request": "configure",
"audio": True,
"video": True,
},
"jsep": {
"type": "offer",
"sdp": (
"v=0\r\n"
"o=x 123 123 IN IP4 203.0.113.3\r\n"
"c=IN IP4 203.0.113.2\r\n"
"s=foobar\r\n"
"t=0 0\r\n"
"m=audio 8000 RTP/AVP 8 0\r\n"
"a=sendonly\r\n"
),
},
"handle_id": pub_handle,
"session_id": session,
"token": token,
},
)
)
# ack is received first
self.assertEqual(self._res, {"janus": "ack", "session_id": session})
# followed by the event notification
eventloop.run_until_complete(testIJanus(self))
sdp = self._res["jsep"]["sdp"]
self.assertIsInstance(sdp, str)
self.assertRegex(
sdp,
re.compile(
"^v=0\r\n"
"o=- \d+ \d+ IN IP4 203.0.113.1\r\n"
"s=rtpengine.*?\r\n"
"t=0 0\r\n"
"m=audio \d+ RTP/AVP 8\r\n"
"c=IN IP4 203.0.113.1\r\n"
"a=rtpmap:8 PCMA/8000\r\n"
"a=recvonly\r\n"
"a=rtcp:\d+\r\n$",
re.DOTALL,
),
)
self.assertEqual(
self._res,
{
"janus": "event",
"session_id": session,
"sender": pub_handle,
"plugindata": {
"plugin": "janus.plugin.videoroom",
"data": {
"videoroom": "event",
"room": room,
"configured": "ok",
"audio_codec": "PCMA",
},
},
"jsep": {"type": "answer", "sdp": sdp},
},
)
sub_handle = self.createHandle(token, session)
self.assertNotEqual(sub_handle, pub_handle)
self.assertNotEqual(sub_handle, control_handle)
# subscriber expects full WebRTC attributes
eventloop.run_until_complete(
testIOJanus(
self,
{
"janus": "message",
"body": {
"request": "join",
"ptype": "subscriber",
"room": room,
"streams": [
{ "feed": feed },
],
},
"handle_id": sub_handle,
"session_id": session,
"token": token,
},
)
)
# ack is received first
self.assertEqual(self._res, {"janus": "ack", "session_id": session})
# followed by the attached event
eventloop.run_until_complete(testIJanus(self))
self.assertEqual(len(self._res["plugindata"]["data"]["streams"]), 1)
self.assertEqual(feed, self._res["plugindata"]["data"]["streams"][0]["feed_id"])
self.assertNotEqual(feed, control_handle)
self.assertNotEqual(feed, session)
self.assertNotEqual(feed, room)
self.assertNotEqual(feed, pub_handle)
self.assertNotEqual(feed, sub_handle)
sdp = self._res["jsep"]["sdp"]
self.assertIsInstance(sdp, str)
self.assertRegex(
sdp,
re.compile(
"^v=0\r\n"
"o=x 123 123 IN IP4 203.0.113.3\r\n"
"c=IN IP4 203.0.113.1\r\n"
"s=foobar\r\n"
"t=0 0\r\n"
"m=audio \d+ UDP/TLS/RTP/SAVPF 8\r\n"
"a=mid:1\r\n"
"a=rtpmap:8 PCMA/8000\r\n"
"a=sendonly\r\n"
"a=rtcp-mux\r\n"
"a=setup:actpass\r\n"
"a=fingerprint:sha-256 .{95}\r\n"
"a=tls-id:[0-9a-f]{32}\r\n"
"a=ice-ufrag:.{8}\r\n"
"a=ice-pwd:.{26}\r\n"
"a=ice-options:trickle\r\n"
"a=candidate:.{16} 1 UDP 2130706431 203.0.113.1 \d+ typ host\r\n"
"a=end-of-candidates\r\n$",
re.DOTALL,
),
)
self.assertEqual(
self._res,
{
"janus": "event",
"session_id": session,
"sender": sub_handle,
"plugindata": {
"plugin": "janus.plugin.videoroom",
"data": {
"videoroom": "attached",
"room": room,
"streams": [
{
"mindex": 0,
"feed_id": feed,
},
],
},
},
"jsep": {"type": "offer", "sdp": sdp},
},
)
# subscriber #1 answer
eventloop.run_until_complete(
testIOJanus(
self,
{
"janus": "message",
"body": {"request": "start", "room": room, "feed": feed},
"jsep": {
"type": "answer",
"sdp": (
"v=0\r\n"
"o=x 123 123 IN IP4 203.0.113.2\r\n"
"c=IN IP4 0.0.0.0\r\n"
"s=foobar\r\n"
"t=0 0\r\n"
"m=audio 9 RTP/AVP 8\r\n"
"a=mid:audio\r\n"
"a=ice-ufrag:abcd\r\n"
"a=ice-pwd:WD1pLsdgsdfsdWuEBb0vjyZr\r\n"
"a=ice-options:trickle\r\n"
"a=rtcp-mux\r\n"
"a=recvonly\r\n"
),
},
"handle_id": sub_handle,
"session_id": session,
"token": token,
},
)
)
# ack is received first
self.assertEqual(self._res, {"janus": "ack", "session_id": session})
# followed by the attached event
eventloop.run_until_complete(testIJanus(self))
self.assertEqual(
self._res,
{
"janus": "event",
"session_id": session,
"sender": sub_handle,
"plugindata": {
"plugin": "janus.plugin.videoroom",
"data": {
"videoroom": "event",
"started": "ok",
"room": room,
},
},
},
)
self.destroyVideoroom(token, session, control_handle, room)
eventloop.run_until_complete(
testIOJanus(
self,
{
"janus": "message",
"body": {
"request": "exists",
"room": room,
},
"handle_id": control_handle,
"session_id": session,
"token": token,
},
)
)
self.assertEqual(
self._res,
{
"janus": "success",
"session_id": session,
"sender": control_handle,
"plugindata": {
"plugin": "janus.plugin.videoroom",
"data": {
"videoroom": "success",
"room": room,
"exists": False,
},
},
},
)
def testVideoroomSDESDTLS(self):
(token, session, control_handle, room) = self.startVideoroom()

Loading…
Cancel
Save