@ -2,10 +2,10 @@
--
-- Copyright (C) 2010-2015 Matthew Wild
-- Copyright (C) 2010 Waqas Hussain
-- Copyright (C) 2012-20 15 Kim Alvefur
-- Copyright (C) 2012-20 2 1 Kim Alvefur
-- Copyright (C) 2012 Thijs Alkemade
-- Copyright (C) 2014 Florian Zeitz
-- Copyright (C) 2016-20 19 Thilo Molitor
-- Copyright (C) 2016-20 20 Thilo Molitor
--
-- This project is MIT/X11 licensed. Please see the
-- COPYING file in the source package for more information.
@ -17,7 +17,7 @@ local cache = dep.softreq("util.cache"); -- only available in prosody 0.10+
local uuid_generate = require " util.uuid " . generate ;
local jid = require " util.jid " ;
local t_ insert, t_ remove = table.insert , table.remove ;
local t_ remove = table.remove ;
local math_min = math.min ;
local math_max = math.max ;
local os_time = os.time ;
@ -26,6 +26,7 @@ local add_filter = require "util.filters".add_filter;
local timer = require " util.timer " ;
local datetime = require " util.datetime " ;
local xmlns_mam2 = " urn:xmpp:mam:2 " ;
local xmlns_sm2 = " urn:xmpp:sm:2 " ;
local xmlns_sm3 = " urn:xmpp:sm:3 " ;
local xmlns_errors = " urn:ietf:params:xml:ns:xmpp-stanzas " ;
@ -34,18 +35,19 @@ local xmlns_delay = "urn:xmpp:delay";
local sm2_attr = { xmlns = xmlns_sm2 } ;
local sm3_attr = { xmlns = xmlns_sm3 } ;
local resume_timeout = module : get_option_number ( " smacks_hibernation_time " , 3 00) ;
local s2s_smacks = module : get_option_boolean ( " smacks_enabled_s2s " , fals e) ;
local resume_timeout = module : get_option_number ( " smacks_hibernation_time " , 6 00) ;
local s2s_smacks = module : get_option_boolean ( " smacks_enabled_s2s " , tru e) ;
local s2s_resend = module : get_option_boolean ( " smacks_s2s_resend " , false ) ;
local max_unacked_stanzas = module : get_option_number ( " smacks_max_unacked_stanzas " , 0 ) ;
local delayed_ack_timeout = module : get_option_number ( " smacks_max_ack_delay " , 60 ) ;
local max_inactive_unacked_stanzas = module : get_option_number ( " smacks_max_inactive_unacked_stanzas " , 256 ) ;
local delayed_ack_timeout = module : get_option_number ( " smacks_max_ack_delay " , 30 ) ;
local max_hibernated_sessions = module : get_option_number ( " smacks_max_hibernated_sessions " , 10 ) ;
local max_old_sessions = module : get_option_number ( " smacks_max_old_sessions " , 10 ) ;
local core_process_stanza = prosody.core_process_stanza ;
local sessionmanager = require " core.sessionmanager " ;
assert ( max_hibernated_sessions > 0 , " smacks_max_hibernated_sessions must be greater than 0 " ) ;
assert ( max_old_sessions > 0 , " smacks_ old_sessions must be greater than 0" ) ;
assert ( max_old_sessions > 0 , " smacks_ max_ old_sessions must be greater than 0" ) ;
local c2s_sessions = module : shared ( " /*/c2s/sessions " ) ;
@ -110,18 +112,18 @@ local function stoppable_timer(delay, callback)
end ) ;
if timer and timer.stop then return timer ; end -- new prosody api includes stop() function
return {
stop = function ( ) stopped = true end ;
stop = function ( self ) stopped = true end ;
timer ;
} ;
end
local function delayed_ack_function ( session )
local function delayed_ack_function ( session , stanza )
-- fire event only if configured to do so and our session is not already hibernated or destroyed
if delayed_ack_timeout > 0 and session.awaiting_ack
and not session.hibernating and not session.destroyed then
session.log ( " debug " , " Firing event 'smacks-ack-delayed', queue = %d " ,
session.outgoing_stanza_queue and # session.outgoing_stanza_queue or 0 ) ;
module : fire_event ( " smacks-ack-delayed " , { origin = session , queue = session.outgoing_stanza_queue }) ;
module : fire_event ( " smacks-ack-delayed " , { origin = session , queue = session.outgoing_stanza_queue , stanza = stanza }) ;
end
session.delayed_ack_timer = nil ;
end
@ -157,16 +159,20 @@ module:hook("s2s-stream-features",
end
end ) ;
local function request_ack_if_needed ( session , force , reason )
local function request_ack_if_needed ( session , force , reason , stanza )
local queue = session.outgoing_stanza_queue ;
local expected_h = session.last_acknowledged_stanza + # queue ;
-- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
local max_unacked = max_unacked_stanzas ;
if session.state == " inactive " then
max_unacked = max_inactive_unacked_stanzas ;
end
if session.awaiting_ack == nil and not session.hibernating then
-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
-- further requests until a higher h-value would be expected.
-- session.log("debug", "*** SMACKS(2) ***: #queue=%s, max_unacked_stanzas=%s, expected_h=%s, last_requested_h=%s", tostring(#queue), tostring(max_unacked_stanzas), tostring(expected_h), tostring(session.last_requested_h));
if ( # queue > max_unacked _stanzas and expected_h ~= session.last_requested_h ) or force then
if ( # queue > max_unacked and expected_h ~= session.last_requested_h ) or force then
session.log ( " debug " , " Queuing <r> (in a moment) from %s - #queue=%d " , reason , # queue ) ;
session.awaiting_ack = false ;
session.awaiting_ack_timer = stoppable_timer ( 1e-06 , function ( )
@ -175,13 +181,14 @@ local function request_ack_if_needed(session, force, reason)
if not session.awaiting_ack and not session.hibernating and not session.destroyed then
session.log ( " debug " , " Sending <r> (inside timer, before send) from %s - #queue=%d " , reason , # queue ) ;
( session.sends2s or session.send ) ( st.stanza ( " r " , { xmlns = session.smacks } ) )
if session.destroyed then return end -- sending something can trigger destruction
session.awaiting_ack = true ;
-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
session.last_requested_h = session.last_acknowledged_stanza + # queue ;
session.log ( " debug " , " Sending <r> (inside timer, after send) from %s - #queue=%d " , reason , # queue ) ;
if not session.delayed_ack_timer then
session.delayed_ack_timer = stoppable_timer ( delayed_ack_timeout , function ( )
delayed_ack_function ( session );
delayed_ack_function ( session , nil ); -- we don't know if this is the only new stanza in the queue
end ) ;
end
end
@ -193,15 +200,22 @@ local function request_ack_if_needed(session, force, reason)
-- and there isn't already a timer for this event running.
-- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
-- would not trigger this event (again).
if # queue > max_unacked _stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then
if # queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
session.log ( " debug " , " Calling delayed_ack_function directly (still waiting for ack) " ) ;
delayed_ack_function ( session );
delayed_ack_function ( session , stanza ); -- this is the only new stanza in the queue --> provide it to other modules
end
end
local function outgoing_stanza_filter ( stanza , session )
local is_stanza = stanza.attr and not stanza.attr . xmlns and not stanza.name : find " : " ;
if is_stanza and not stanza._cached then -- Stanza in default stream namespace
-- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's
-- supposed to be nil.
-- However, when using mod_smacks with mod_websocket, then mod_websocket's
-- stanzas/out filter can get called before this one and adds the xmlns.
local is_stanza = stanza.attr and
( not stanza.attr . xmlns or stanza.attr . xmlns == ' jabber:client ' )
and not stanza.name : find " : " ;
if is_stanza and not stanza._cached then
local queue = session.outgoing_stanza_queue ;
local cached_stanza = st.clone ( stanza ) ;
cached_stanza._cached = true ;
@ -216,11 +230,11 @@ local function outgoing_stanza_filter(stanza, session)
queue [ # queue + 1 ] = cached_stanza ;
if session.hibernating then
session.log ( " debug " , " hibernating , stanza queued" ) ;
session.log ( " debug " , " hibernating since %s , stanza queued" , datetime.datetime ( session.hibernating ) ) ;
module : fire_event ( " smacks-hibernation-stanza-queued " , { origin = session , queue = queue , stanza = cached_stanza } ) ;
return nil ;
end
request_ack_if_needed ( session , false , " outgoing_stanza_filter " );
request_ack_if_needed ( session , false , " outgoing_stanza_filter " , stanza );
end
return stanza ;
end
@ -249,7 +263,7 @@ local function wrap_session_out(session, resume)
session.resumption_token = nil ;
end
-- send out last ack as per revision 1.5.2 of XEP-0198
if session.smacks and session.conn then
if session.smacks and session.conn and session.handled_stanza_count then
( session.sends2s or session.send ) ( st.stanza ( " a " , { xmlns = session.smacks , h = string.format ( " %d " , session.handled_stanza_count ) } ) ) ;
end
return session_close ( ... ) ;
@ -340,7 +354,7 @@ function handle_r(origin, stanza, xmlns_sm)
-- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h)
local expected_h = origin.last_acknowledged_stanza + # origin.outgoing_stanza_queue ;
if # origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then
request_ack_if_needed ( origin , true , " piggybacked by handle_r " );
request_ack_if_needed ( origin , true , " piggybacked by handle_r " , nil );
end
return true ;
end
@ -373,6 +387,8 @@ function handle_a(origin, stanza)
for i = 1 , # queue do
origin.log ( " debug " , " Q item %d: %s " , i , tostring ( queue [ i ] ) ) ;
end
origin : close { condition = " undefined-condition " ; text = " Client acknowledged more stanzas than sent by server " ; } ;
return ;
end
for i = 1 , math_min ( handled_stanza_count , # queue ) do
@ -382,7 +398,7 @@ function handle_a(origin, stanza)
origin.log ( " debug " , " #queue = %d " , # queue ) ;
origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count ;
request_ack_if_needed ( origin , false , " handle_a " )
request_ack_if_needed ( origin , false , " handle_a " , nil )
return true ;
end
module : hook_stanza ( xmlns_sm2 , " a " , handle_a ) ;
@ -393,25 +409,68 @@ module:hook_stanza(xmlns_sm3, "a", handle_a);
-- and won't slow non-198 sessions). We can also then remove the .handled flag
-- on stanzas
function handle_unacked_stanzas ( session )
local function handle_unacked_stanzas ( session )
local queue = session.outgoing_stanza_queue ;
local error_attr = { type = " cancel " } ;
if # queue > 0 then
session.outgoing_stanza_queue = { } ;
for i = 1 , # queue do
if not module : fire_event ( " delivery/failure " , { session = session , stanza = queue [ i ] } ) then
local reply = st.reply ( queue [ i ] ) ;
if reply.attr . to ~= session.full_jid then
reply.attr . type = " error " ;
reply : tag ( " error " , error_attr )
: tag ( " recipient-unavailable " , { xmlns = " urn:ietf:params:xml:ns:xmpp-stanzas " } ) ;
core_process_stanza ( session , reply ) ;
if queue [ i ] . attr.type ~= " error " then
local reply = st.reply ( queue [ i ] ) ;
if reply.attr . to ~= session.full_jid then
reply.attr . type = " error " ;
reply : tag ( " error " , error_attr )
: tag ( " recipient-unavailable " , { xmlns = " urn:ietf:params:xml:ns:xmpp-stanzas " } ) ;
core_process_stanza ( session , reply ) ;
end
end
end
end
end
end
-- don't send delivery errors for messages which will be delivered by mam later on
-- check if stanza was archived --> this will allow us to send back errors for stanzas not archived
-- because the user configured the server to do so ("no-archive"-setting for one special contact for example)
local function get_stanza_id ( stanza , by_jid )
for tag in stanza : childtags ( " stanza-id " , " urn:xmpp:sid:0 " ) do
if tag.attr . by == by_jid then
return tag.attr . id ;
end
end
return nil ;
end
module : hook ( " delivery/failure " , function ( event )
local session , stanza = event.session , event.stanza ;
-- Only deal with authenticated (c2s) sessions
if session.username then
if stanza.name == " message " and stanza.attr . xmlns == nil and
( stanza.attr . type == " chat " or ( stanza.attr . type or " normal " ) == " normal " ) then
-- don't store messages in offline store if they are mam results
local mam_result = stanza : get_child ( " result " , xmlns_mam2 ) ;
if mam_result ~= nil then
return true ; -- stanza already "handled", don't send an error and don't add it to offline storage
end
-- do nothing here for normal messages and don't send out "message delivery errors",
-- because messages are already in MAM at this point (no need to frighten users)
local stanza_id = get_stanza_id ( stanza , jid.bare ( session.full_jid ) ) ;
if session.mam_requested and stanza_id ~= nil then
session.log ( " debug " , " mod_smacks delivery/failure returning true for mam-handled stanza: mam-archive-id=%s " , tostring ( stanza_id ) ) ;
return true ; -- stanza handled, don't send an error
end
-- store message in offline store, if this client does not use mam *and* was the last client online
local sessions = prosody.hosts [ module.host ] . sessions [ session.username ] and
prosody.hosts [ module.host ] . sessions [ session.username ] . sessions or nil ;
if sessions and next ( sessions ) == session.resource and next ( sessions , session.resource ) == nil then
local ok = module : fire_event ( " message/offline/handle " , { origin = session , username = session.username , stanza = stanza } ) ;
session.log ( " debug " , " mod_smacks delivery/failuere returning %s for offline-handled stanza " , tostring ( ok ) ) ;
return ok ; -- if stanza was handled, don't send an error
end
end
end
end ) ;
module : hook ( " pre-resource-unbind " , function ( event )
local session , err = event.session , event.error ;
if session.smacks then
@ -442,16 +501,16 @@ module:hook("pre-resource-unbind", function (event)
and session.hibernating == hibernate_time then
-- wait longer if the timeout isn't reached because push was enabled for this session
-- session.first_hibernated_push is the starting point for hibernation timeouts of those push enabled clients
-- wait for an additional resume_timeout seconds if no push occur ed since hibernation at all
-- wait for an additional resume_timeout seconds if no push occur r ed since hibernation at all
local current_time = os_time ( ) ;
local timeout_start = math_max ( session.hibernating , session.first_hibernated_push or session.hibernating ) ;
if session.push_identifier ~= nil and not session.first_hibernated_push then
session.log ( " debug " , " No push happened since hibernation started, hibernating session for up to %d extra seconds " , resume_timeout ) ;
return resume_timeout ;
end
if current_time- timeout_start < resume_timeout and session.push_identifier ~= nil then
session.log ( " debug " , " A push happened since hibernation started, hibernating session for up to %d extra seconds " , current_time- timeout_start ) ;
return current_time- timeout_start ; -- time left to wait
if session.push_identifier ~= nil and current_time- timeout_start < resume_timeout then
session.log ( " debug " , " A push happened since hibernation started, hibernating session for up to %d extra seconds " , resume_timeout- ( current_time- timeout_start ) ) ;
return resume_timeout- ( current_time- timeout_start ) ; -- time left to wait
end
session.log ( " debug " , " Destroying session for hibernating too long " ) ;
session_registry.set ( session.username , session.resumption_token , nil ) ;
@ -531,6 +590,7 @@ function handle_resume(session, stanza, xmlns_sm)
c2s_sessions [ conn ] = nil ;
conn : close ( ) ;
end
local migrated_session_log = session.log ;
original_session.ip = session.ip ;
original_session.conn = session.conn ;
original_session.send = session.send ;
@ -558,18 +618,20 @@ function handle_resume(session, stanza, xmlns_sm)
-- Ok, we need to re-send any stanzas that the client didn't see
-- ...they are what is now left in the outgoing stanza queue
-- We have to use the send of "session" because we don't want to add our resent stanzas
-- to the outgoing queue again
local queue = original_session.outgoing_stanza_queue ;
original_session.log ( " debug " , " #queue = %d " , # queue ) ;
session.log( " debug " , " resending all unacked stanzas that are still queued after resume, #queue = %d" , # queue ) ;
for i = 1 , # queue do
original_ session.send( queue [ i ] ) ;
session.send( queue [ i ] ) ;
end
original_ session.log( " debug " , " #queue = %d -- after sen d" , # queue ) ;
session.log( " debug " , " all stanzas resent, now disabling send() in this migrated session, #queue = % d" , # queue ) ;
function session . send ( stanza )
session.log( " warn " , " Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s " , tostring ( stanza ) ) ;
migrated_session_log( " error " , " Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s " , tostring ( stanza ) ) ;
return false ;
end
module : fire_event ( " smacks-hibernation-end " , { origin = session , resumed = original_session , queue = queue } ) ;
request_ack_if_needed ( original_session , true , " handle_resume " );
request_ack_if_needed ( original_session , true , " handle_resume " , nil );
else
module : log ( " warn " , " Client %s@%s[%s] tried to resume stream for %s@%s[%s] " ,
session.username or " ? " , session.host or " ? " , session.type ,
@ -582,6 +644,23 @@ end
module : hook_stanza ( xmlns_sm2 , " resume " , function ( session , stanza ) return handle_resume ( session , stanza , xmlns_sm2 ) ; end ) ;
module : hook_stanza ( xmlns_sm3 , " resume " , function ( session , stanza ) return handle_resume ( session , stanza , xmlns_sm3 ) ; end ) ;
module : hook ( " csi-client-active " , function ( event )
if event.origin . smacks then
request_ack_if_needed ( event.origin , true , " csi-active " , nil ) ;
end
end ) ;
module : hook ( " csi-flushing " , function ( event )
local session = event.session ;
if session.smacks then
if not session.awaiting_ack and not session.hibernating and not session.destroyed then
session.log ( " debug " , " Sending <r> (csi-flushing) " ) ;
session.awaiting_ack = true ; -- The send() call may invoke this event again, so set this first
( session.sends2s or session.send ) ( st.stanza ( " r " , { xmlns = session.smacks } ) )
end
end
end ) ;
local function handle_read_timeout ( event )
local session = event.session ;
if session.smacks then
@ -600,7 +679,7 @@ local function handle_read_timeout(event)
session.awaiting_ack = true ;
if not session.delayed_ack_timer then
session.delayed_ack_timer = stoppable_timer ( delayed_ack_timeout , function ( )
delayed_ack_function ( session );
delayed_ack_function ( session , nil );
end ) ;
end
return true ;