TT#35413 add support for the 'load limit' ng protocol extension

Change-Id: I3f3897794050e0807f3cddb8044b3aa37b703d9c
changes/44/20944/4
Richard Fuchs 8 years ago
parent 43f2971722
commit efcad1c628

@ -1414,6 +1414,22 @@ Optionally included keys are:
been requested for transcoding. Note that not all codecs support all packetization
intervals.
* `supports`
Contains a list of strings. Each string indicates support for an additional feature
that the controlling SIP proxy supports. Currently defined values are:
* `load limit`
Indicates support for an extension to the *ng* protocol to facilitate certain load
balancing mechanisms. If *rtpengine* is configured with certain session or load
limit options enabled (such as the `max-sessions` option), then normally *rtpengine*
would reply with an error to an `offer` if one of the limits is exceeded. If support
for the `load limit` extension is indicated, then instead of replying with an error,
*rtpengine* responds with the string `load limit` in the `result` key of the response
dictionary. The response dictionary may also contain the optional key `message` with
an explanatory string. No other key is required in the response dictionary.
An example of a complete `offer` request dictionary could be (SDP body abbreviated):

@ -17,6 +17,7 @@
#include "str.h"
#include "control_tcp.h"
#include "control_udp.h"
#include "control_ng.h"
#include "rtp.h"
#include "ice.h"
#include "recording.h"
@ -558,6 +559,10 @@ static void call_ng_flags_replace(struct sdp_ng_flags *out, str *s, void *dummy)
ilog(LOG_WARN, "Unknown 'replace' flag encountered: '" STR_FORMAT "'",
STR_FMT(s));
}
static void call_ng_flags_supports(struct sdp_ng_flags *out, str *s, void *dummy) {
if (!str_cmp(s, "load limit"))
out->supports_load_limit = 1;
}
static void call_ng_flags_codec_list(struct sdp_ng_flags *out, str *s, void *qp) {
str *s_copy;
s_copy = g_slice_alloc(sizeof(*s_copy));
@ -646,6 +651,7 @@ static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *inpu
call_ng_flags_list(out, input, "flags", call_ng_flags_flags, NULL);
call_ng_flags_list(out, input, "replace", call_ng_flags_replace, NULL);
call_ng_flags_list(out, input, "supports", call_ng_flags_supports, NULL);
diridx = 0;
if ((list = bencode_dictionary_get_expect(input, "direction", BENCODE_LIST))) {
@ -714,13 +720,38 @@ static void call_ng_free_flags(struct sdp_ng_flags *flags) {
g_queue_clear_full(&flags->codec_transcode, str_slice_free);
}
static enum load_limit_reasons call_offer_session_limit(void) {
enum load_limit_reasons ret = LOAD_LIMIT_NONE;
rwlock_lock_r(&rtpe_config.config_lock);
if (rtpe_config.max_sessions>=0) {
rwlock_lock_r(&rtpe_callhash_lock);
if (g_hash_table_size(rtpe_callhash) -
atomic64_get(&rtpe_stats.foreign_sessions) >= rtpe_config.max_sessions)
{
/* foreign calls can't get rejected
* total_rejected_sess applies only to "own" sessions */
atomic64_inc(&rtpe_totalstats.total_rejected_sess);
atomic64_inc(&rtpe_totalstats_interval.total_rejected_sess);
ilog(LOG_ERROR, "Parallel session limit reached (%i)",rtpe_config.max_sessions);
ret = LOAD_LIMIT_MAX_SESSIONS;
}
rwlock_unlock_r(&rtpe_callhash_lock);
}
rwlock_unlock_r(&rtpe_config.config_lock);
return ret;
}
static const char *call_offer_answer_ng(bencode_item_t *input,
bencode_item_t *output, enum call_opmode opmode, const char* addr,
const endpoint_t *sin)
{
str sdp, fromtag, totag = STR_NULL, callid, viabranch;
str label = STR_NULL;
char *errstr;
const char *errstr;
GQueue parsed = G_QUEUE_INIT;
GQueue streams = G_QUEUE_INIT;
struct call *call;
@ -744,12 +775,24 @@ static const char *call_offer_answer_ng(bencode_item_t *input,
bencode_dictionary_get_str(input, "via-branch", &viabranch);
bencode_dictionary_get_str(input, "label", &label);
if (sdp_parse(&sdp, &parsed))
return "Failed to parse SDP";
call_ng_process_flags(&flags, input);
flags.opmode = opmode;
if (opmode == OP_OFFER) {
enum load_limit_reasons limit = call_offer_session_limit();
if (limit != LOAD_LIMIT_NONE) {
if (!flags.supports_load_limit)
errstr = "Parallel session limit reached"; // legacy protocol
else
errstr = magic_load_limit_strings[limit];
goto out;
}
}
errstr = "Failed to parse SDP";
if (sdp_parse(&sdp, &parsed))
goto out;
if (flags.loop_protect && sdp_is_duplicate(&parsed)) {
ilog(LOG_INFO, "Ignoring message as SDP has already been processed by us");
bencode_dictionary_add_str(output, "sdp", &sdp);
@ -874,25 +917,6 @@ out:
const char *call_offer_ng(bencode_item_t *input, bencode_item_t *output, const char* addr,
const endpoint_t *sin)
{
rwlock_lock_r(&rtpe_config.config_lock);
if (rtpe_config.max_sessions>=0) {
rwlock_lock_r(&rtpe_callhash_lock);
if (g_hash_table_size(rtpe_callhash) -
atomic64_get(&rtpe_stats.foreign_sessions) >= rtpe_config.max_sessions) {
rwlock_unlock_r(&rtpe_callhash_lock);
/* foreign calls can't get rejected
* total_rejected_sess applies only to "own" sessions */
atomic64_inc(&rtpe_totalstats.total_rejected_sess);
atomic64_inc(&rtpe_totalstats_interval.total_rejected_sess);
ilog(LOG_ERROR, "Parallel session limit reached (%i)",rtpe_config.max_sessions);
rwlock_unlock_r(&rtpe_config.config_lock);
return "Parallel session limit reached";
}
rwlock_unlock_r(&rtpe_callhash_lock);
}
rwlock_unlock_r(&rtpe_config.config_lock);
return call_offer_answer_ng(input, output, OP_OFFER, addr, sin);
}

@ -58,6 +58,7 @@ struct sdp_ng_flags {
reset:1,
record_call:1,
loop_protect:1,
supports_load_limit:1,
dtls_off:1,
sdes_off:1,
sdes_unencrypted_srtp:1,

@ -20,6 +20,10 @@ mutex_t rtpe_cngs_lock;
GHashTable *rtpe_cngs_hash;
struct control_ng *rtpe_control_ng;
const char magic_load_limit_strings[__LOAD_LIMIT_MAX][64] = {
[LOAD_LIMIT_MAX_SESSIONS] = "Parallel session limit reached",
};
static void timeval_update_request_time(struct request_time *request, const struct timeval *offer_diff) {
// lock offers
@ -112,7 +116,7 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin
struct control_ng *c = (void *) obj;
bencode_buffer_t bencbuf;
bencode_item_t *dict, *resp;
str cmd, cookie, data, reply, *to_send, callid;
str cmd = STR_NULL, cookie, data, reply, *to_send, callid;
const char *errstr, *resultstr;
struct iovec iov[3];
unsigned int iovlen;
@ -262,11 +266,18 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin
goto send_resp;
err_send:
ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]", addr, errstr, STR_FMT(&data));
bencode_dictionary_add_string(resp, "result", "error");
bencode_dictionary_add_string(resp, "error-reason", errstr);
g_atomic_int_inc(&cur->errors);
cmd = STR_NULL;
if (errstr < magic_load_limit_strings[0] || errstr > magic_load_limit_strings[__LOAD_LIMIT_MAX-1]) {
ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]",
addr, errstr, STR_FMT(&data));
bencode_dictionary_add_string(resp, "result", "error");
bencode_dictionary_add_string(resp, "error-reason", errstr);
g_atomic_int_inc(&cur->errors);
cmd = STR_NULL;
}
else {
bencode_dictionary_add_string(resp, "result", "load limit");
bencode_dictionary_add_string(resp, "message", errstr);
}
send_resp:
bencode_collapse_str(resp, &reply);

@ -35,4 +35,12 @@ extern mutex_t rtpe_cngs_lock;
extern GHashTable *rtpe_cngs_hash;
extern struct control_ng *rtpe_control_ng;
enum load_limit_reasons {
LOAD_LIMIT_NONE = -1,
LOAD_LIMIT_MAX_SESSIONS = 0,
__LOAD_LIMIT_MAX
};
extern const char magic_load_limit_strings[__LOAD_LIMIT_MAX][64];
#endif

@ -53,6 +53,8 @@ GetOptions(
'ptime=i' => \$options{'ptime'},
'flags=s@' => \$options{'flags'},
'codec-options-flat' => \$options{'codec options flag'},
'flags=s@' => \$options{'flags'},
'supports=s@' => \$options{'supports'},
) or die;
my $cmd = shift(@ARGV) or die;
@ -71,7 +73,7 @@ for my $x (split(/,/, 'trust address,symmetric,asymmetric,force,strict source,me
for my $x (split(/,/, 'origin,session connection')) {
defined($options{'replace-' . $x}) and push(@{$packet{replace}}, $x);
}
for my $x (split(/,/, 'rtcp-mux,SDES')) {
for my $x (split(/,/, 'rtcp-mux,SDES,supports')) {
$packet{$x} = $options{$x}
if defined($options{$x}) && ref($options{$x}) eq 'ARRAY';
}

Loading…
Cancel
Save