mirror of https://github.com/sipwise/sems.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
370 lines
12 KiB
370 lines
12 KiB
/*
|
|
* Copyright (C) 2014 Stefan Sayer
|
|
*
|
|
* This file is part of SEMS, a free SIP media server.
|
|
*
|
|
* SEMS is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation; either version 2 of the License, or
|
|
* (at your option) any later version. This program is released under
|
|
* the GPL with the additional exemption that compiling, linking,
|
|
* and/or using OpenSSL is allowed.
|
|
*
|
|
* For a license to use the SEMS software under conditions
|
|
* other than those described here, or to purchase support for this
|
|
* software, please contact iptel.org by e-mail at the following addresses:
|
|
* info@iptel.org
|
|
*
|
|
* SEMS is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program; if not, write to the Free Software
|
|
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
*/
|
|
|
|
#include "ModRedis.h"
|
|
#include "log.h"
|
|
#include "AmUtils.h"
|
|
|
|
#include "DSMSession.h"
|
|
#include "AmSession.h"
|
|
#include "AmPlaylist.h"
|
|
#include "DSMCoreModule.h"
|
|
|
|
#include <stdio.h>
|
|
#include <fstream>
|
|
|
|
SC_EXPORT(DSMRedisModule);
|
|
|
|
DSMRedisModule::DSMRedisModule() {
|
|
}
|
|
|
|
DSMRedisModule::~DSMRedisModule() {
|
|
}
|
|
|
|
|
|
DSMAction* DSMRedisModule::getAction(const string& from_str) {
|
|
string cmd;
|
|
string params;
|
|
splitCmd(from_str, cmd, params);
|
|
|
|
DEF_CMD("redis.connect", DSMRedisConnectAction);
|
|
DEF_CMD("redis.disconnect", DSMRedisDisconnectAction);
|
|
DEF_CMD("redis.execCommand", DSMRedisExecCommandAction);
|
|
DEF_CMD("redis.appendCommand", DSMRedisAppendCommandAction);
|
|
DEF_CMD("redis.getReply", DSMRedisGetReplyAction);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
DSMCondition* DSMRedisModule::getCondition(const string& from_str) {
|
|
// string cmd;
|
|
// string params;
|
|
// splitCmd(from_str, cmd, params);
|
|
|
|
// if (cmd == "redis.hasResult") {
|
|
// return new MyHasResultCondition(params, false);
|
|
// }
|
|
|
|
// if (cmd == "redis.connected") {
|
|
// return new MyConnectedCondition(params, true);
|
|
// }
|
|
|
|
return NULL;
|
|
}
|
|
|
|
DSMRedisResult::~DSMRedisResult() {
|
|
if (NULL != result) {
|
|
freeReplyObject(result);
|
|
}
|
|
}
|
|
|
|
void DSMRedisResult::release() {
|
|
result = NULL;
|
|
}
|
|
|
|
DSMRedisConnection* getRedisDSMSessionConnection(DSMSession* sc_sess) {
|
|
if (sc_sess->avar.find(REDIS_AKEY_CONNECTION) == sc_sess->avar.end()) {
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_CONNECTION, "No connection to redis database");
|
|
return NULL;
|
|
}
|
|
AmObject* ao = NULL; DSMRedisConnection* res = NULL;
|
|
try {
|
|
if (!isArgAObject(sc_sess->avar[REDIS_AKEY_CONNECTION])) {
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_CONNECTION, "No connection to redis database (not AmObject)");
|
|
return NULL;
|
|
}
|
|
ao = sc_sess->avar[REDIS_AKEY_CONNECTION].asObject();
|
|
} catch (...){
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_CONNECTION, "No connection to redis database (not AmObject)");
|
|
return NULL;
|
|
}
|
|
|
|
if (NULL == ao || NULL == (res = dynamic_cast<DSMRedisConnection*>(ao))) {
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_CONNECTION, "No connection to database (not a RedisConnection)");
|
|
return NULL;
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
DSMRedisConnection* getConnectedRedisDSMSessionConnection(DSMSession* sc_sess) {
|
|
DSMRedisConnection* res = getRedisDSMSessionConnection(sc_sess);
|
|
if (!res || res->connected() || res->connect())
|
|
return res;
|
|
|
|
return NULL;
|
|
}
|
|
|
|
|
|
DSMRedisResult* getRedisDSMResult(DSMSession* sc_sess) {
|
|
if (sc_sess->avar.find(REDIS_AKEY_RESULT) == sc_sess->avar.end()) {
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_NORESULT, "No result available");
|
|
return NULL;
|
|
}
|
|
AmObject* ao = NULL; DSMRedisResult* res = NULL;
|
|
try {
|
|
assertArgAObject(sc_sess->avar[REDIS_AKEY_RESULT]);
|
|
ao = sc_sess->avar[REDIS_AKEY_RESULT].asObject();
|
|
} catch (...){
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_NORESULT, "Result object has wrong type");
|
|
return NULL;
|
|
}
|
|
|
|
if (NULL == ao || NULL == (res = dynamic_cast<DSMRedisResult*>(ao))) {
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_NORESULT, "Result object has wrong type");
|
|
return NULL;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
string replaceQueryParams(const string& q, DSMSession* sc_sess,
|
|
map<string,string>* event_params) {
|
|
string res = q;
|
|
size_t repl_pos = 0;
|
|
while (repl_pos<res.length()) {
|
|
size_t rstart = res.find_first_of("$#", repl_pos);
|
|
repl_pos = rstart+1;
|
|
if (rstart == string::npos)
|
|
break;
|
|
if (rstart && res[rstart-1] == '\\') // escaped
|
|
continue;
|
|
|
|
size_t rend = res.find_first_of(" ,()$#\t;'\"", rstart+1);
|
|
if (rend==string::npos)
|
|
rend = res.length();
|
|
switch(res[rstart]) {
|
|
case '$':
|
|
res.replace(rstart, rend-rstart,
|
|
sc_sess->var[res.substr(rstart+1, rend-rstart-1)]); break;
|
|
case '#':
|
|
if (NULL!=event_params) {
|
|
res.replace(rstart, rend-rstart,
|
|
(*event_params)[res.substr(rstart+1, rend-rstart-1)]); break;
|
|
}
|
|
default: break;
|
|
}
|
|
}
|
|
return res;
|
|
}
|
|
|
|
string skip_till(string& s, string sep) {
|
|
size_t pos = s.find_first_of(sep);
|
|
if (pos == string::npos) {
|
|
string res = s;
|
|
s.clear();
|
|
return res;
|
|
} else {
|
|
string res = s.substr(0, pos);
|
|
if (s.length()>pos)
|
|
s = s.substr(pos+1);
|
|
else
|
|
s.clear();
|
|
return res;
|
|
}
|
|
}
|
|
|
|
EXEC_ACTION_START(DSMRedisConnectAction) {
|
|
string f_arg = resolveVars(arg, sess, sc_sess, event_params);
|
|
string db_url = f_arg.length()?f_arg:sc_sess->var["config.redis_db_url"];
|
|
if (db_url.empty() || db_url.length() < 11 || db_url.substr(0, 8) != "redis://") {
|
|
ERROR("missing correct db_url config or connect parameter - must start with redis://\n");
|
|
SET_ERROR(sc_sess, DSM_ERRNO_UNKNOWN_ARG,
|
|
"missing correct db_url config or connect parameter - must start with redis://\n");
|
|
EXEC_ACTION_STOP;
|
|
}
|
|
|
|
db_url = db_url.substr(8);
|
|
|
|
// split url - tcp:host;param=value or unix:host;param=value
|
|
|
|
string db_proto = skip_till(db_url, ":");
|
|
if (db_proto != "unix" && db_proto != "tcp") {
|
|
ERROR("missing correct redis_db_url config or connect parameter - must start with unix or tcp protocol\n");
|
|
SET_ERROR(sc_sess, DSM_ERRNO_UNKNOWN_ARG,
|
|
"missing correct db_url config or connect parameter - must start with unix or tcp protocol\n");
|
|
EXEC_ACTION_STOP;
|
|
}
|
|
bool unix_socket = db_proto=="unix";
|
|
|
|
string db_host = skip_till(db_url, ":;");
|
|
if (db_host.empty()) {
|
|
ERROR("missing correct redis_db_url config or connect parameter - host must be non-empty\n");
|
|
SET_ERROR(sc_sess, DSM_ERRNO_UNKNOWN_ARG,
|
|
"missing correct db_url config or connect parameter - host must be non-empty\n");
|
|
EXEC_ACTION_STOP;
|
|
}
|
|
|
|
unsigned int redis_port = DEFAULT_REDIS_PORT;
|
|
bool full_logging = false;
|
|
bool use_transactions = false;
|
|
int connect_timeout = DEFAULT_REDIS_CONNECT_TIMEOUT;
|
|
|
|
while (!db_url.empty()) {
|
|
string param = skip_till(db_url, ";");
|
|
vector<string> p = explode(param, "=");
|
|
if (p.size() != 2) {
|
|
ERROR("missing correct redis_db_url config or connect parameter - "
|
|
"parameter '%s' must be param=value\n", param.c_str());
|
|
SET_ERROR(sc_sess, DSM_ERRNO_UNKNOWN_ARG,
|
|
"missing correct db_url config or connect parameter - parameter must be param=value\n");
|
|
EXEC_ACTION_STOP;
|
|
}
|
|
|
|
if (p[0] == "full_logging") {
|
|
full_logging = p[1] == "true";
|
|
} else if (p[0] == "use_transactions") {
|
|
use_transactions = p[1] == "true";
|
|
} else if (p[0] == "connect_timeout"){
|
|
str2int(p[1], connect_timeout);
|
|
} else if (p[0] == "port"){
|
|
str2int(p[1], redis_port);
|
|
} else {
|
|
ERROR("unknown redis_db_url config or connect parameter - "
|
|
"parameter '%s'\n", p[0].c_str());
|
|
SET_ERROR(sc_sess, DSM_ERRNO_UNKNOWN_ARG, "missing correct db_url config or connect parameter\n");
|
|
EXEC_ACTION_STOP;
|
|
}
|
|
}
|
|
|
|
DSMRedisConnection* conn =
|
|
new DSMRedisConnection(db_host, redis_port, unix_socket, full_logging,
|
|
use_transactions, connect_timeout);
|
|
|
|
if (!conn->connect()) {
|
|
delete conn;
|
|
ERROR("Could not connect to redis DB\n");
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_CONNECTION, "Could not connect to redis DB");
|
|
EXEC_ACTION_STOP;
|
|
}
|
|
|
|
// save connection for later use
|
|
AmArg c_arg;
|
|
c_arg.setBorrowedPointer(conn);
|
|
sc_sess->avar[REDIS_AKEY_CONNECTION] = c_arg;
|
|
// for garbage collection
|
|
sc_sess->transferOwnership(conn);
|
|
CLR_ERROR(sc_sess);
|
|
|
|
} EXEC_ACTION_END;
|
|
|
|
EXEC_ACTION_START(DSMRedisDisconnectAction) {
|
|
DSMRedisConnection* conn = getRedisDSMSessionConnection(sc_sess);
|
|
if (NULL == conn) {
|
|
EXEC_ACTION_STOP;
|
|
}
|
|
|
|
conn->disconnect();
|
|
// connection object might be reused - but its safer to create a new one
|
|
sc_sess->releaseOwnership(conn);
|
|
delete conn;
|
|
sc_sess->avar.erase(REDIS_AKEY_CONNECTION);
|
|
sc_sess->CLR_ERRNO;
|
|
} EXEC_ACTION_END;
|
|
|
|
void decodeRedisResult(VarMapT& dst, const string& varname, redisReply* reply) {
|
|
if (!reply)
|
|
return;
|
|
switch (reply->type) {
|
|
case REDIS_REPLY_STRING: dst[varname] = string(reply->str, reply->len); break;
|
|
case REDIS_REPLY_INTEGER: dst[varname] = int2str((int)reply->integer); break; // todo: long long?
|
|
case REDIS_REPLY_NIL: dst[varname] = "nil"; break;
|
|
case REDIS_REPLY_STATUS: dst[varname] = string(reply->str, reply->len); break;
|
|
case REDIS_REPLY_ERROR: ERROR("decoding REDIS reply - ERROR type"); break;
|
|
case REDIS_REPLY_ARRAY: {
|
|
for (size_t i=0;i<reply->elements;i++) {
|
|
decodeRedisResult(dst, varname+"["+int2str((unsigned int)i)+"]", reply->element[i]);
|
|
}
|
|
} break;
|
|
}
|
|
}
|
|
|
|
void handleResult(DSMSession* sc_sess, int res, redisReply* reply, const string& resultvar) {
|
|
switch (res) {
|
|
case DB_E_OK: {
|
|
decodeRedisResult(sc_sess->var, resultvar, reply);
|
|
freeReplyObject(reply);
|
|
} break;
|
|
case DB_E_CONNECTION: SET_ERROR(sc_sess, DSM_ERRNO_REDIS_CONNECTION, "REDIS connection error"); return;
|
|
case DB_E_WRITE: SET_ERROR(sc_sess, DSM_ERRNO_REDIS_WRITE, "REDIS write error"); return;
|
|
case DB_E_READ: SET_ERROR(sc_sess, DSM_ERRNO_REDIS_READ, "REDIS read error"); return;
|
|
default: SET_ERROR(sc_sess, DSM_ERRNO_REDIS_UNKNOWN, "REDIS unknown error"); return;
|
|
}
|
|
}
|
|
|
|
CONST_ACTION_2P(DSMRedisExecCommandAction, '=', false);
|
|
EXEC_ACTION_START(DSMRedisExecCommandAction) {
|
|
string resultvar = par1;
|
|
if (resultvar.length() && resultvar[0]=='$') resultvar=resultvar.substr(1);
|
|
string cmd = replaceParams(par2, sess, sc_sess, event_params);
|
|
DBG("executing redis command $%s='%s'\n", resultvar.c_str(), cmd.c_str());
|
|
|
|
DSMRedisConnection* conn = getConnectedRedisDSMSessionConnection(sc_sess);
|
|
if (!conn) {
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_CONNECTION, "Not connected to REDIS\n");
|
|
EXEC_ACTION_STOP;
|
|
}
|
|
|
|
redisReply* reply;
|
|
int res = conn->exec_cmd(cmd.c_str(), reply);
|
|
|
|
handleResult(sc_sess, res, reply, resultvar);
|
|
} EXEC_ACTION_END;
|
|
|
|
EXEC_ACTION_START(DSMRedisAppendCommandAction) {
|
|
string cmd = replaceParams(arg, sess, sc_sess, event_params);
|
|
DBG("appending redis command '%s' - from '%s'\n", cmd.c_str(), arg.c_str());
|
|
|
|
DSMRedisConnection* conn = getConnectedRedisDSMSessionConnection(sc_sess);
|
|
if (!conn) {
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_CONNECTION, "Not connected to REDIS\n");
|
|
EXEC_ACTION_STOP;
|
|
}
|
|
|
|
if (conn->append_cmd(cmd.c_str()) == DB_E_OK) {
|
|
CLR_ERROR(sc_sess);
|
|
} else {
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_CONNECTION, "Error appending command - no memory?\n");
|
|
}
|
|
} EXEC_ACTION_END;
|
|
|
|
EXEC_ACTION_START(DSMRedisGetReplyAction) {
|
|
string resultvar = arg;
|
|
if (resultvar.length() && resultvar[0]=='$') resultvar=resultvar.substr(1);
|
|
DBG("getting result for redis command in $%s\n", resultvar.c_str());
|
|
|
|
DSMRedisConnection* conn = getConnectedRedisDSMSessionConnection(sc_sess);
|
|
if (!conn) {
|
|
SET_ERROR(sc_sess, DSM_ERRNO_REDIS_CONNECTION, "Not connected to REDIS\n");
|
|
EXEC_ACTION_STOP;
|
|
}
|
|
|
|
redisReply* reply;
|
|
int res = conn->get_reply(reply);
|
|
|
|
handleResult(sc_sess, res, reply, resultvar);
|
|
} EXEC_ACTION_END;
|