diff --git a/plugins/mod_mam/mod_mam.lua b/plugins/mod_mam/mod_mam.lua index dbdd92a..d7f4f61 100644 --- a/plugins/mod_mam/mod_mam.lua +++ b/plugins/mod_mam/mod_mam.lua @@ -37,21 +37,7 @@ if global_default_policy ~= "roster" then global_default_policy = module:get_option_boolean("default_archive_policy", global_default_policy); end -local archive_store = "archive2"; -local archive = assert(module:open_store(archive_store, "archive")); - -if archive.name == "null" or not archive.find then - if not archive.find then - module:log("debug", "Attempt to open archive storage returned a valid driver but it does not seem to implement the storage API"); - module:log("debug", "mod_%s does not support archiving", archive._provided_by or archive.name and "storage_"..archive.name.."(?)" or ""); - else - module:log("debug", "Attempt to open archive storage returned null driver"); - end - module:log("debug", "See https://prosody.im/doc/storage and https://prosody.im/doc/archiving for more information"); - module:log("info", "Using in-memory fallback archive driver"); - archive = module:require "fallback_archive"; -end - +local archive = module:require "sipwise_archive"; local cleanup; -- Handle prefs. diff --git a/plugins/mod_mam/sipwise_archive.lib.lua b/plugins/mod_mam/sipwise_archive.lib.lua new file mode 100644 index 0000000..f772ea6 --- /dev/null +++ b/plugins/mod_mam/sipwise_archive.lib.lua @@ -0,0 +1,165 @@ +-- luacheck: ignore 212/self +local uuid = require "util.uuid".generate; +local archive_store = { _provided_by = "mam"; name = "fallback"; }; + +local serialize = require "util.serialization".serialize; +local deserialize = require "util.serialization".deserialize; +local st = require "util.stanza"; + +local mod_sql = module:require("sql"); +local params = module:get_option("sql", {}); +local engine = mod_sql:create_engine(params); +engine:execute("SET NAMES 'utf8' COLLATE 'utf8_bin';"); +local log = require "util.logger".init("sipwise_archive"); +local ut_tostring = require "util.table".table.tostring; + +local store_query=[[ +INSERT INTO `sipwise_mam` (`username`, `key`, `stanza`, `epoch`, `with`) +VALUES (?,UuidToBin(?),?,?,?); +]] + +local delete_query=[[ +DELETE FROM `sipwise_mam` +WHERE `username` = ?; +]] + +local delete_query_extra=[[ +DELETE FROM `sipwise_mam` +WHERE `username` = ? AND `epoch` <= ?; +]] + +local select_key_query=[[ +SELECT id FROM `sipwise_mam` +WHERE `key` = UuidToBin(?) +]] + +local select_query_base=[[ +SELECT UuidFromBin(`key`),`stanza`,`epoch`,`with` FROM `sipwise_mam` +WHERE `username` = ? +]] + +-- Reconnect to DB if necessary +local function reconect_check() + if not engine.conn:ping() then + engine.conn = nil; + log("debug", "DDBB reconecting"); + engine:connect(); + end +end + +local function load_db(query, _params) + local res; + reconect_check(); + log("debug", "query[%s]", query); + log("debug", "_params[%s]", ut_tostring(_params)); + res = engine:select(query, unpack(_params)); + local out = {}; + for row in res do + table.insert(out, {row[1], row[2], row[3], row[4]}); + end + return out; +end + +local function key_get_id(key) + local res; + reconect_check(); + res = engine:select(select_key_query, key); + local out = {}; + for row in res do + table.insert(out, row[1]); + end + return out[1]; +end + +local function key_in_db(key) + local res = key_get_id(key); + if res then + return true; + else + return false; + end +end + +function archive_store:append(username, key, value, when, with) + reconect_check(); + if not key or key_in_db(key) then + key = uuid(); + end + engine:insert(store_query, username, key, serialize(st.preserialize(value)), + when, with); + engine.conn:commit(); +end + +function archive_store:find(username, query) + local qstart, qend, qwith = -math.huge, math.huge; + local qlimit, qid; + local db_query = select_query_base; + local _params = { username, }; + local i, values = 0; + + if query then + if query.reverse then + if query.before then + qid = key_get_id(query.before); + end + elseif query.after then + qid = key_get_id(query.after); + end + qwith = query.with; + qlimit = query.limit; + qstart = query.start or qstart; + qend = query["end"] or qend; + end + + if qwith then + db_query = db_query.." AND `with` = ?"; + table.insert(_params, qwith); + end + if qid then + if query.reverse then + db_query = db_query.." AND `id` < ?"; + else + db_query = db_query.." AND `id` > ?"; + end + table.insert(_params, qid); + end + db_query = db_query.." AND (`epoch` >= ? AND `epoch` <= ?)"; + table.insert(_params, qstart); + table.insert(_params, qend); + db_query = db_query.." ORDER BY `epoch`"; + if query.reverse then + db_query = db_query.." DESC"; + end + if qlimit then + db_query = db_query.." LIMIT ?"; + table.insert(_params, qlimit); + end + db_query = db_query..";" + values = load_db(db_query, _params); + + return function () + i = i + 1; + if values[i] then + return values[i][1], deserialize(values[i][2]), values[i][3], values[i][4]; + end + end +end + +function archive_store:delete(username, query) + if not query or next(query) == nil then + -- no specifics, delete everything + reconect_check(); + engine:delete(delete_query, username); + engine.conn:commit(); + return true; + end + + local qend = query["end"] or math.huge; + + reconect_check(); + engine:delete(delete_query_extra, username, qend); + engine.conn:commit(); + return true; +end + +return archive_store;