-- store offline stanzas to DB -- Copyright (C) 2016 Sipwise GmbH -- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- local mod_sql = module:require("sql"); local params = module:get_option("sql", {}); local engine = mod_sql:create_engine(params); local sipwise_offline = module:shared("sipwise_offline"); local serialize = require "util.serialization".serialize; local deserialize = require "util.serialization".deserialize; local st = require "util.stanza"; local datetime = require "util.datetime"; local ipairs = ipairs; local jid_split = require "util.jid".split; local count_query=[[ SELECT id FROM sipwise_offline WHERE domain = ? AND username = ?; ]] local load_query =[[ SELECT stanza FROM prosody.sipwise_offline WHERE domain = ? AND username = ?; ]] local store_query=[[ INSERT INTO prosody.sipwise_offline (domain, username, stanza) VALUES (?,?,?); ]] local delete_query=[[ DELETE FROM prosody.sipwise_offline WHERE domain = ? AND username = ?; ]] -- Reconnect to DB if necessary local function reconect_check() if not engine.conn:ping() then engine.conn = nil; engine:connect(); engine:execute("SET NAMES 'utf8mb4' COLLATE 'utf8mb4_unicode_ci';"); end end function sipwise_offline.get_num(node, host) reconect_check(); local res = 0; for _ in engine:select(count_query, host, node) do res = res + 1; end return res; end local function load_db(node, host) local res; reconect_check(); res = engine:select(load_query, host, node); local out = {}; for row in res do table.insert(out, row[1]); end return out; end local function store_db(node, host, stanza) reconect_check(); local res = engine:insert(store_query, host, node, serialize(stanza)); engine.conn:commit(); return res; end local function delete_db(node, host) reconect_check(); engine:delete(delete_query, host, node); engine.conn:commit(); end --- http://xmpp.org/extensions/xep-0160.html#types local function should_store(stanza) local body = stanza:get_child("body"); local stanza_type = stanza.attr.type or "normal"; if (stanza_type == 'normal') then return true; elseif (stanza_type == 'chat' and body) then return true; end return false; end -- save stanza local function handle_offline(event) local origin, stanza = event.origin, event.stanza; local to = stanza.attr.to; local node, host; local stanza_info = stanza:top_tag(); if to then node, host = jid_split(to) else node, host = origin.username, origin.host; end if should_store(stanza) then stanza.attr.stamp = datetime.datetime(); stanza.attr.stamp_legacy = datetime.legacy(); local res = store_db(node, host, st.preserialize(stanza)); if not res or res.__affected ~= 1 then module:log("error", "store_db failed for %s", stanza_info); else module:log("debug", "stored: %s", stanza_info); end stanza.attr.stamp, stanza.attr.stamp_legacy = nil, nil; else module:log("debug", "stanza[%s] not for offline, not stored", stanza_info); end -- we should be the last return true; end -- load stanzas and send local function broadcast_offline(event) local origin = event.origin; local node, host = origin.username, origin.host; local data = load_db(node, host); for _, stanza in ipairs(data) do stanza = st.deserialize(deserialize(stanza)); stanza:tag("delay", { xmlns = "urn:xmpp:delay", from = host, stamp = stanza.attr.stamp }):up(); -- XEP-0203 stanza:tag("x", { xmlns = "jabber:x:delay", from = host, stamp = stanza.attr.stamp_legacy }):up(); -- XEP-0091 (deprecated) stanza.attr.stamp, stanza.attr.stamp_legacy = nil, nil; module:log("debug", "send: %s", stanza:top_tag()); origin.send(stanza); end if #data > 0 then delete_db(node, host); module:log("debug", "delete all stanzas for %s@%s", node, host); end -- we should be the last return true; end function module.load() module:hook("message/offline/handle", handle_offline, 1); module:hook("message/offline/broadcast", broadcast_offline, 1); end