json-rpc v2.0 module.

transport over netstrings/tcp. still experimental. 

git-svn-id: http://svn.berlios.de/svnroot/repos/sems/trunk@1826 8eb893ce-cfd4-0310-b710-fb5ebe64c474
sayer/1.4-spce2.6
Stefan Sayer 15 years ago
parent 41c2e7ca67
commit 03bf1e1b15

@ -3,7 +3,7 @@ COREPATH ?= ../core
include $(COREPATH)/../Makefile.defs
exclude_app_modules += mp3 examples py_sems xmlrpc2di gateway
exclude_app_modules += mp3 examples py_sems xmlrpc2di gateway jsonrpc
ifneq ($(USE_MONITORING), yes)
exclude_app_modules += monitoring

@ -30,6 +30,8 @@
#include "AmMediaProcessor.h"
#include "DSM.h"
#include "../apps/jsonrpc/JsonRPCEvents.h" // todo!
DSMCall::DSMCall(const DSMScriptConfig& config,
AmPromptCollection* prompts,
DSMStateDiagramCollection& diags,
@ -243,6 +245,74 @@ void DSMCall::process(AmEvent* event)
engine.runEvent(this, DSMCondition::PlaylistSeparator, &params);
}
// todo: give modules the possibility to define/process events
JsonRpcEvent* jsonrpc_ev = dynamic_cast<JsonRpcEvent*>(event);
if (jsonrpc_ev) {
DBG("received jsonrpc event\n");
JsonRpcResponseEvent* resp_ev =
dynamic_cast<JsonRpcResponseEvent*>(jsonrpc_ev);
if (resp_ev) {
map<string, string> params;
params["ev_type"] = "JsonRpcResponse";
params["id"] = resp_ev->response.id;
params["is_error"] = resp_ev->response.is_error ?
"true":"false";
if (isArgStruct(resp_ev->response.data)) {
AmArg::ValueStruct::const_iterator it = resp_ev->response.data.begin();
while (it != resp_ev->response.data.end()) {
if (isArgCStr(it->second))
params[it->first] = it->second.asCStr();
else
params[it->first] = AmArg::print(it->second);
it++;
}
} else if (isArgArray(resp_ev->response.data)) {
vector<string> strs = resp_ev->response.data.asStringVector();
for (vector<string>::iterator it =
strs.begin(); it != strs.end(); it++) {
params[int2str(it-strs.begin())] = *it;
}
}
engine.runEvent(this, DSMCondition::JsonRpcResponse, &params);
return;
}
JsonRpcRequestEvent* req_ev =
dynamic_cast<JsonRpcRequestEvent*>(jsonrpc_ev);
if (req_ev) {
map<string, string> params;
params["ev_type"] = "JsonRpcRequest";
params["is_notify"] = req_ev->isNotification() ?
"true" : "false";
params["method"] = req_ev->method;
if (!req_ev->id.empty())
params["id"] = req_ev->id;
if (isArgStruct(req_ev->params)) {
AmArg::ValueStruct::const_iterator it = req_ev->params.begin();
while (it != req_ev->params.end()) {
if (isArgCStr(it->second))
params[it->first] = it->second.asCStr();
else
params[it->first] = AmArg::print(it->second);
it++;
}
} else if (isArgArray(req_ev->params)) {
vector<string> strs = req_ev->params.asStringVector();
for (vector<string>::iterator it =
strs.begin(); it != strs.end(); it++) {
params[int2str(it-strs.begin())] = *it;
}
}
engine.runEvent(this, DSMCondition::JsonRpcRequest, &params);
return;
}
}
AmB2BCallerSession::process(event);
}

@ -75,7 +75,10 @@ class DSMCondition
B2BOtherReply,
B2BOtherBye,
DSMException
DSMException,
JsonRpcResponse,
JsonRpcRequest
};
bool invert;

@ -0,0 +1,115 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "JsonRPC.h"
JsonRPCServerModule* JsonRPCServerModule::_instance = NULL;
int JsonRPCServerModule::port = DEFAULT_JSONRPC_SERVER_PORT;
int JsonRPCServerModule::threads = DEFAULT_JSONRPC_SERVER_THREADS;
EXPORT_PLUGIN_CLASS_FACTORY(JsonRPCServerModule, MOD_NAME)
JsonRPCServerModule* JsonRPCServerModule::instance()
{
if(_instance == NULL){
_instance = new JsonRPCServerModule(MOD_NAME);
}
return _instance;
}
JsonRPCServerModule::JsonRPCServerModule(const string& mod_name)
: AmDynInvokeFactory(mod_name)
{
}
JsonRPCServerModule::~JsonRPCServerModule() {
}
int JsonRPCServerModule::onLoad() {
return instance()->load();
}
int JsonRPCServerModule::load() {
AmConfigReader cfg;
if(cfg.loadFile(AmConfig::ModConfigPath +
string(MOD_NAME ".conf"))) {
INFO("no '%s' configuration file present. using default values\n",
(AmConfig::ModConfigPath + string(MOD_NAME ".conf")).c_str());
} else {
port = cfg.getParameterInt("jsonrpc_port", DEFAULT_JSONRPC_SERVER_PORT);
threads = cfg.getParameterInt("server_threads", DEFAULT_JSONRPC_SERVER_THREADS);
}
DBG("using server port %d\n", port);
DBG("using %d server threads\n", threads);
DBG("starting server loop thread\n");
server_loop = new JsonRPCServerLoop();
server_loop->start();
return 0;
}
void JsonRPCServerModule::invoke(const string& method,
const AmArg& args, AmArg& ret) {
if (method == "execRpc"){
args.assertArrayFmt("sssisis"); // evq_link, notificationReceiver, requestReceiver,
// flags(i), host, port (i), method, [params]
if (args.size() > 7) {
if (!isArgArray(args.get(7)) && !isArgStruct(args.get(7))) {
ERROR("internal error: params to JSON-RPC must be struct or array\n");
throw AmArg::TypeMismatchException();
}
}
execRpc(args, ret);
// sendRequestList(args, ret);
} else if(method == "_list"){
ret.push(AmArg("execRpc"));
// ret.push(AmArg("newConnection"));
// ret.push(AmArg("sendRequest"));
// ret.push(AmArg("sendRequestList"));
} else
throw AmDynInvoke::NotImplemented(method);
}
void JsonRPCServerModule::execRpc(const AmArg& args, AmArg& ret) {
AmArg none_params;
AmArg& params = none_params;
if (args.size()>7)
params = args.get(7);
JsonRPCServerLoop::execRpc(// evq_link, notification_link, request_link
args.get(0).asCStr(), args.get(1).asCStr(),
args.get(2).asCStr(),
// flags
args.get(3).asInt(),
// host, port, method
args.get(4).asCStr(),
args.get(5).asInt(), args.get(6).asCStr(),
params, ret);
}

@ -0,0 +1,67 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _JSON_RPC_H_
#define _JSON_RPC_H_
#include "AmApi.h"
#include "RpcServerLoop.h"
#define DEFAULT_JSONRPC_SERVER_PORT 7080
#define DEFAULT_JSONRPC_SERVER_THREADS 5
class JsonRPCServerModule
: public AmDynInvokeFactory,
public AmDynInvoke
{
static JsonRPCServerModule* _instance;
int load();
JsonRPCServerLoop* server_loop;
// DI methods
void execRpc(const AmArg& args, AmArg& ret);
public:
JsonRPCServerModule(const string& mod_name);
~JsonRPCServerModule();
int onLoad();
// DI factory
AmDynInvoke* getInstance() { return instance(); }
// DI API
static JsonRPCServerModule* instance();
void invoke(const string& method,
const AmArg& args, AmArg& ret);
// configuration
static int port;
static int threads;
};
#endif

@ -0,0 +1,95 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _JsonRPCEvents_h_
#define _JsonRPCEvents_h_
#include "AmEvent.h"
#include "AmArg.h"
#include <string>
using std::string;
struct JsonRpcEvent
: public AmEvent {
JsonRpcEvent()
: AmEvent(122) { }
virtual ~JsonRpcEvent() { }
};
struct JsonRpcResponse {
string id;
AmArg data;
bool is_error;
JsonRpcResponse(bool is_error, string id, AmArg data)
: is_error(is_error), id(id), data(data) { }
JsonRpcResponse(bool is_error, string id)
: is_error(is_error), id(id) { }
~JsonRpcResponse() { }
};
struct JsonRpcResponseEvent
: public JsonRpcEvent {
JsonRpcResponse response;
JsonRpcResponseEvent(bool is_error, string id, AmArg data)
: response(is_error, id, data)
{ }
JsonRpcResponseEvent(bool is_error, string id)
: response(is_error, id)
{ }
~JsonRpcResponseEvent() { }
};
struct JsonRpcRequestEvent
: public JsonRpcEvent {
string method;
string id;
AmArg params;
// notification without parameters
JsonRpcRequestEvent(string method)
: method(method) { }
// notification with parameters
JsonRpcRequestEvent(string method, AmArg params)
: method(method), params(params) { }
// request without parameters
JsonRpcRequestEvent(string method, string id)
: method(method), id(id) { }
// request with parameters
JsonRpcRequestEvent(string method, string id, AmArg params)
: method(method), id(id), params(params) { }
bool isNotification() { return id.empty(); }
};
#endif // _JsonRPCEvents_h_

@ -0,0 +1,308 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "JsonRPCServer.h"
#include "RpcPeer.h"
#include "JsonRPCEvents.h"
#include "jsonArg.h"
#include "AmEventDispatcher.h"
#include "AmPlugIn.h"
#include "log.h"
#include "AmApi.h"
#include "AmSession.h"
#include "AmUtils.h"
int JsonRpcServer::createRequest(const string& evq_link, const string& method, AmArg& params, JsonrpcNetstringsConnection* peer,
bool is_notification) {
AmArg rpc_params;
rpc_params["jsonrpc"] = "2.0";
rpc_params["method"] = method;
rpc_params["params"] = params;
if (!is_notification) {
peer->req_id++;
string req_id = int2str(peer->req_id);
rpc_params["id"] = req_id;
peer->replyReceivers[req_id] = evq_link;
}
string rpc_params_json = arg2json(rpc_params);
if (rpc_params_json.length() > MAX_RPC_MSG_SIZE) {
ERROR("internal error: message exceeded MAX_RPC_MSG_SIZE (%d)\n",
MAX_RPC_MSG_SIZE);
return -3;
}
DBG("RPC message: >>%.*s<<\n", rpc_params_json.length(), rpc_params_json.c_str());
memcpy(peer->msgbuf, rpc_params_json.c_str(), rpc_params_json.length());
peer->msg_size = rpc_params_json.length();
// set peer connection up for sending
peer->msg_recv = false;
return 0;
}
int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size,
JsonrpcPeerConnection* peer) {
DBG("parsing message ...\n");
// const char* txt = "{\"jsonrpc\": \"2.0\", \"result\": 19, \"id\": 1}";
AmArg rpc_params;
if (!json2arg(msgbuf, rpc_params)) {
INFO("Error parsing message '%.*s'\n", (int)msg_size, msgbuf);
return -1;
}
if (!rpc_params.hasMember("jsonrpc") || strcmp(rpc_params["jsonrpc"].asCStr(), "2.0")) {
INFO("wrong json-rpc version received; only 2.0 supported!\n");
return -2; // todo: check value, reply with error?
}
bool is_request = (rpc_params.hasMember("method") && isArgCStr(rpc_params["method"]));
if (!is_request) {
// todo: move this to peer->processReply(rpc_params);
// process reply
if (!rpc_params.hasMember("id") || !isArgCStr(rpc_params["id"])
|| rpc_params["id"] == "") {
INFO("Error parsing jsonrpc message: no id in reply!\n");
return -2;// todo: check value, reply with error?
}
string id = rpc_params["id"].asCStr();
std::map<std::string, std::string>::iterator rep_recv_q =
peer->replyReceivers.find(id);
if (rep_recv_q == peer->replyReceivers.end()) {
DBG("received repy for unknown request");
*msg_size = 0;
if (peer->flags && JsonrpcPeerConnection::FL_CLOSE_WRONG_REPLY) {
INFO("closing connection after unknown reply id %s received\n", id.c_str());
return -2;
}
return 0;
}
JsonRpcResponseEvent* resp_ev = NULL;
if (rpc_params.hasMember("result")) {
resp_ev = new JsonRpcResponseEvent(false, id, rpc_params["result"]);
} else {
if (!rpc_params.hasMember("error")) {
INFO("protocol error: reply does not have error nor result!\n");
return -2;
}
resp_ev = new JsonRpcResponseEvent(true, id, rpc_params["error"]);
}
bool posted = AmEventDispatcher::instance()->
post(rep_recv_q->second, resp_ev);
if (!posted) {
DBG("receiver event queue does not exist (any more)\n");
peer->replyReceivers.erase(rep_recv_q);
*msg_size = 0;
if (peer->flags & JsonrpcPeerConnection::FL_CLOSE_NO_REPLYLINK) {
INFO("closing connection where reply link missing");
return -2;
}
return 0;
}
DBG("successfully posted reply to event queue\n");
peer->replyReceivers.erase(rep_recv_q);
// don't send a reply
*msg_size = 0;
return 0;
}
string id;
if (rpc_params.hasMember("id") && isArgCStr(rpc_params["id"])) {
id = rpc_params["id"].asCStr();
} else {
DBG("received notification\n");
}
// send directly to event queue
if ((id.empty() && !peer->notificationReceiver.empty()) ||
(!id.empty() && !peer->requestReceiver.empty())) {
// don't send a reply
*msg_size = 0;
string dst_evqueue = id.empty() ?
peer->notificationReceiver.c_str() : peer->requestReceiver.c_str();
DBG("directly passing %s to event queue '%s'\n",
id.empty() ? "notification":"request",
dst_evqueue.c_str());
AmArg none_params;
AmArg& params = none_params;
if (rpc_params.hasMember("params")) {
params = rpc_params["params"];
}
JsonRpcRequestEvent* notification_ev =
new JsonRpcRequestEvent(rpc_params["method"].asCStr(),
id, params);
bool posted = AmEventDispatcher::instance()->
post(dst_evqueue, notification_ev);
if (!posted) {
DBG("%s receiver event queue '%s' does not exist (any more)\n",
id.empty() ? "notification":"request",
dst_evqueue.c_str());
delete notification_ev;
if (id.empty() && (peer->flags & JsonrpcPeerConnection::FL_CLOSE_NO_NOTIF_RECV)) {
INFO("closing connection on missing notification receiver queue\n");
return -1; // todo: reply error?
}
if (!id.empty() && (peer->flags & JsonrpcPeerConnection::FL_CLOSE_NO_REQUEST_RECV)) {
INFO("closing connection on missing request receiver queue\n");
return -1; // todo: reply error?
}
} else {
DBG("successfully posted %s to event queue '%s'\n",
id.empty() ? "notification":"request",
dst_evqueue.c_str());
}
return 0;
}
AmArg rpc_res;
execRpc(rpc_params, rpc_res);
// rpc_res["error"] = AmArg(); // Undef/null
// rpc_res["id"] = rpc_params["id"];
string res_s = arg2json(rpc_res);
if (res_s.length() > MAX_RPC_MSG_SIZE) {
ERROR("internal error: reply exceeded MAX_RPC_MSG_SIZE (%d)\n",
MAX_RPC_MSG_SIZE);
return -3;
}
DBG("RPC result: >>%.*s<<\n", res_s.length(), res_s.c_str());
memcpy(msgbuf, res_s.c_str(), res_s.length());
*msg_size = res_s.length();
return 0;
}
/** rpc_params must contain "method" member as string */
void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) {
try {
AmArg none_params;
AmArg& params = none_params;
if (rpc_params.hasMember("params")) {
params = rpc_params["params"];
}
string method = rpc_params["method"].asCStr();
size_t dot_pos = method.find('.');
if (dot_pos == string::npos || dot_pos == method.length()) {
throw JsonRpcError(-32601, "Method not found",
"use module.method as rpc method name");
}
string factory = method.substr(0, method.find('.'));
string fact_meth = method.substr(method.find('.')+1);
try {
if (factory == "core") {
runCoreMethod(fact_meth, params, rpc_res["result"]);
rpc_res["id"] = rpc_params["id"];
rpc_res["error"] = AmArg(); // Undef/null
rpc_res["jsonrpc"] = "2.0";
return;
}
// todo: direct_export
DBG("searching for factory '%s' method '%s'\n",
factory.c_str(), fact_meth.c_str());
AmDynInvokeFactory* fact =
AmPlugIn::instance()->getFactory4Di(factory);
if (fact==NULL) {
throw JsonRpcError(-32601, "Method not found",
"module not loaded");
}
AmDynInvoke* di_inst = fact->getInstance();
if(!di_inst) {
throw JsonRpcError(-32601, "Method not found",
"failed to instanciate module");
}
di_inst->invoke(fact_meth, params, rpc_res["result"]);
} catch (const AmDynInvoke::NotImplemented& ni) {
INFO("not implemented DI function '%s'\n",
ni.what.c_str());
throw JsonRpcError(-32601, "Method not found",
"function unknown in module");
} catch (const AmArg::OutOfBoundsException& oob) {
INFO("out of bounds in RPC DI call\n");
throw JsonRpcError(-32602, "Invalid params",
"out of bounds in function call");
} catch (const AmArg::TypeMismatchException& oob) {
INFO("type mismatch in RPC DI call\n");
throw JsonRpcError(-32602, "Invalid params",
"parameters type mismatch in function call");
} catch (...) {
ERROR("unexpected Exception in RPC DI call\n");
throw JsonRpcError(-32000, "Server error",
"unexpected Exception");
}
// todo: notification!
rpc_res["id"] = rpc_params["id"];
rpc_res["jsonrpc"] = "2.0";
} catch (const JsonRpcError& e) {
INFO("got JsonRpcError core %d message '%s'\n",
e.code, e.message.c_str());
rpc_res["error"] = AmArg();
rpc_res["error"]["code"] = e.code;
rpc_res["error"]["message"] = e.message;
rpc_res["error"]["data"] = e.data;
// todo: notification!
rpc_res["id"] = rpc_params["id"];
rpc_res["jsonrpc"] = "2.0";
return;
}
rpc_res["error"] = AmArg(); // Undef/null
}
void JsonRpcServer::runCoreMethod(const string& method, const AmArg& params, AmArg& res) {
if (method == "calls") {
res[0] = (int)AmSession::getSessionNum();
} else if (method == "set_loglevel") {
assertArgArray(params);
assertArgInt(params[0]);
log_level = params[0].asInt();
DBG("set log_level to %d\n", log_level);
} else if (method == "get_loglevel") {
res[0] = log_level;
DBG("get_log_level returns %d\n", log_level);
} else {
throw JsonRpcError(-32601, "Method not found",
"function unknown in core");
}
}

@ -0,0 +1,59 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _JsonRPCServer_h_
#define _JsonRPCServer_h_
#include "AmArg.h"
#include <string>
using std::string;
struct JsonrpcPeerConnection;
struct JsonrpcNetstringsConnection;
struct JsonRpcError {
int code;
string message;
AmArg data;
JsonRpcError(int code, string message, AmArg data)
: code(code), message(message), data(data) { }
~JsonRpcError() { }
};
class JsonRpcServer {
static void execRpc(const AmArg& rpc_params, AmArg& rpc_res);
static void runCoreMethod(const string& method, const AmArg& params, AmArg& res);
public:
static int processMessage(char* msgbuf, unsigned int* msg_size,
JsonrpcPeerConnection* peer);
static int createRequest(const string& evq_link, const string& method, AmArg& params,
JsonrpcNetstringsConnection* peer, bool is_notification = false);
};
#endif // _JsonRPCServer_h_

@ -0,0 +1,7 @@
plug_in_name = jsonrpc
module_ldflags = -lev
module_cflags = -DMOD_NAME=\"$(plug_in_name)\" -fno-strict-aliasing
COREPATH ?=../../core
include $(COREPATH)/plug-in/Makefile.app_module

@ -0,0 +1,309 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "RpcPeer.h"
#include "log.h"
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string>
using std::string;
#include "AmUtils.h"
JsonrpcNetstringsConnection::JsonrpcNetstringsConnection()
: fd(0), msg_size(0), rcvd_size(0), in_msg(false), msg_recv(true)
{
}
JsonrpcNetstringsConnection::~JsonrpcNetstringsConnection() {
}
int JsonrpcNetstringsConnection::connect(const string& host, int port, string& res_str) {
struct sockaddr_in sa;
if (!populate_sockaddr_in_from_name(host, &sa)) {
res_str = "populate_sockaddr_in_from_name failed\n";
return 300;
}
fd = socket(PF_INET, SOCK_STREAM, 0);
sa.sin_port = htons(port);
sa.sin_family = PF_INET;
int flags = fcntl(fd, F_GETFL);
if (flags < 0) {
::close(fd);
res_str = "error setting socket non-blocking";
return 300;
}
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0) {
::close(fd);
res_str = "error setting socket non-blocking";
return 300;
}
if (::connect(fd, (const struct sockaddr *)&sa,
sizeof(sa)) ==-1 && errno != EINPROGRESS) {
::close(fd);
res_str = "error connecting to "+host+": "+ strerror(errno);
return 300;
}
fd_set wfds;
struct timeval tv;
int retval;
FD_ZERO(&wfds);
FD_SET(fd, &wfds);
/* Wait up to five seconds. */
tv.tv_sec = 5;
tv.tv_usec = 0;
while (true) {
retval = select(fd+1, NULL, &wfds, NULL, &tv);
if (retval<0 && errno == EINTR)
continue;
if (retval<0) {
res_str = "error waiting for connect: "+string(strerror(errno));
::close(fd);
return 300;
}
if (retval==0) {
res_str = "connect to "+host+" timed out";
::close(fd);
return 300;
}
break;
}
int optval;
socklen_t optval_len = sizeof(int);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &optval, &optval_len)) {
res_str = "error in connect: "+string(strerror(errno));
::close(fd);
return 300;
}
if (optval) {
res_str = "error in connect ("+int2str(optval)+")";
::close(fd);
return 300;
}
return 0;
}
void JsonrpcNetstringsConnection::resetRead() {
in_msg = false;
msg_size = 0;
rcvd_size = 0;
msg_recv = true;
}
int JsonrpcNetstringsConnection::netstringsRead() {
if (!in_msg) {
while (true) {
if (rcvd_size > MAX_NS_LEN_SIZE) {
DBG("closing connection [%p/%d]: oversize length\n", this, fd);
close();
return REMOVE;
}
// reading length
ssize_t r=read(fd,&msgbuf[rcvd_size],1);
if (!r) {
DBG("closing connection [%p/%d] on peer hangup\n", this, fd);
close();
return REMOVE;
}
if ((r<0 && errno == EAGAIN) ||
(r<0 && errno == EWOULDBLOCK))
return CONTINUE;
if (r != 1) {
INFO("socket error on connection [%p/%d]: %s\n",
this, fd, strerror(errno));
close();
return REMOVE;
}
// DBG("received '%c'\n", msgbuf[rcvd_size]);
if (msgbuf[rcvd_size] == ':') {
msgbuf[rcvd_size] = '\0';
if (str2i(std::string(msgbuf, rcvd_size), msg_size)) {
ERROR("Protocol error decoding size '%s'\n", msgbuf);
close();
return REMOVE;
}
// received len - switch to receive msg mode
in_msg = true;
rcvd_size = read(fd,msgbuf,msg_size+1);
// DBG("received '%.*s'\n", rcvd_size, msgbuf);
if (rcvd_size == msg_size+1) {
if (msgbuf[msg_size] == ',')
return DISPATCH;
INFO("Protocol error on connection [%p/%d]: netstring not terminated with ','\n",
this, fd);
close();
return REMOVE;
}
if (!rcvd_size) {
DBG("closing connection [%p/%d] on peer hangup\n", this, fd);
close();
return REMOVE;
}
if ((rcvd_size<0 && errno == EAGAIN) ||
(rcvd_size<0 && errno == EWOULDBLOCK))
return CONTINUE; // necessary?
if (rcvd_size<0) {
INFO("socket error on connection [%p/%d]: %s\n",
this, fd, strerror(errno));
close();
return REMOVE;
}
return CONTINUE;
}
if (msgbuf[rcvd_size] < '0' || msgbuf[rcvd_size] > '9') {
INFO("Protocol error on connection [%p/%d]: invalid character in size\n",
this, fd);
close();
return REMOVE;
}
rcvd_size++;
}
} else {
ssize_t r = read(fd,msgbuf+rcvd_size,msg_size-rcvd_size+1);
if (r>0) {
rcvd_size += r;
DBG("msgbuf='%.*s'\n", msg_size+1,msgbuf);
if (rcvd_size == msg_size+1) {
DBG("msg_size = %d, rcvd_size = %d, <%c> \n", msg_size, rcvd_size, msgbuf[msg_size-1]);
if (msgbuf[msg_size] == ',')
return DISPATCH;
INFO("Protocol error on connection [%p/%d]: netstring not terminated with ','\n",
this, fd);
close();
return REMOVE;
}
return CONTINUE;
}
if (!r) {
DBG("closing connection [%p/%d] on peer hangup\n", this, fd);
close();
return REMOVE;
}
if ((r<0 && errno == EAGAIN) ||
(r<0 && errno == EWOULDBLOCK))
return CONTINUE; // necessary?
INFO("socket error on connection [%p/%d]: %s\n",
this, fd, strerror(errno));
close();
return REMOVE;
}
}
int JsonrpcNetstringsConnection::netstringsBlockingWrite() {
if (msg_size<0) {
close();
return REMOVE;
}
if (msg_size==0)
return CONTINUE;
// write size to snd_size
string msg_size_s = int2str(msg_size);
if (msg_size_s.length()>MAX_NS_LEN_SIZE) {
ERROR("too large return message size len\n");
close();
return REMOVE;
}
char* ns_begin = msgbuf-(msg_size_s.length()+1);
memcpy(ns_begin,
msg_size_s.c_str(),
msg_size_s.length());
ns_begin[msg_size_s.length()]=':';
msgbuf[msg_size]=',';
rcvd_size = 0;
size_t ns_total_len = msg_size+msg_size_s.length()+2;
while (rcvd_size != ns_total_len) {
size_t written = send(fd, &ns_begin[rcvd_size], ns_total_len - rcvd_size, MSG_NOSIGNAL);
if ((written<0 && (errno==EAGAIN || errno==EWOULDBLOCK)) ||
written==0) {
usleep(SEND_SLEEP);
continue;
}
if (written<0) {
if (errno == ECONNRESET) {
DBG("closing connection [%p/%d] on peer hangup\n", this, fd);
close();
return REMOVE;
}
INFO("error on connection [%p/%d]: %s\n", this, fd, strerror(errno));
close();
return REMOVE;
}
rcvd_size+=written;
}
rcvd_size = 0;
msg_size = 0;
return CONTINUE;
}
void JsonrpcNetstringsConnection::close() {
if (fd>0) {
shutdown(fd, SHUT_RDWR);
::close(fd);
}
}
bool JsonrpcNetstringsConnection::messagePending() {
return msg_size != 0;
}
bool JsonrpcNetstringsConnection::messageIsRecv() {
return msg_recv;
}

@ -0,0 +1,115 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _RpcPeer_h_
#define _RpcPeer_h_
#include <ev.h>
#include <stdlib.h>
#define MAX_RPC_MSG_SIZE 20*1024*1024 // 20k
#define MAX_NS_LEN_SIZE 10
#define SEND_SLEEP 10000 // 10 ms send retry
#include <map>
#include <string>
struct JsonrpcPeerConnection {
// event queue keys that should receive the reply
// to requests sent on that connection
std::map<std::string, std::string> replyReceivers;
// if present, notifications will be sent
// to that event queue directly
std::string notificationReceiver;
// if present, requests will be sent
// to that event queue directly
std::string requestReceiver;
int flags;
enum {
FL_CLOSE_ALWAYS = 1, // always close connection after request processed
FL_CLOSE_WRONG_REPLY = 2, // close connection if reply with unknown ID received
FL_CLOSE_NO_REPLYLINK = 4, // close connection if reply queue for a request missing
FL_CLOSE_NO_REQUEST_RECV = 8, // close connection if reques queue missing
FL_CLOSE_NO_NOTIF_RECV = 16 // close connection if notification queue missing
};
JsonrpcPeerConnection() {
req_id = rand()%1024;
}
int req_id;
virtual ~JsonrpcPeerConnection() { }
};
struct JsonrpcNetstringsConnection
: public JsonrpcPeerConnection
{
int fd;
ev_io ev_write;
ev_io ev_read;
char snd_size[MAX_NS_LEN_SIZE+1];
char msgbuf[MAX_RPC_MSG_SIZE];
unsigned int msg_size;
unsigned int rcvd_size;
bool in_msg;
bool msg_recv;
JsonrpcNetstringsConnection();
~JsonrpcNetstringsConnection();
int connect(const std::string& host, int port, std::string& res_str);
void close();
enum {
CONTINUE = 0,
REMOVE,
DISPATCH
} ReadResult;
/** @returns ReadResult */
int netstringsRead();
/**
blocking write: blocks until message in msgbuf
with size msg_size is written
*/
int netstringsBlockingWrite();
void resetRead();
bool messagePending();
bool messageIsRecv();
};
#endif

@ -0,0 +1,272 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "RpcServerLoop.h"
#include "JsonRPCServer.h"
#include "JsonRPC.h"
#include "log.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>
#include <err.h>
#include <stddef.h>
#include "RpcPeer.h"
ev_io ev_accept;
ev_async JsonRPCServerLoop::async_w;
struct ev_loop* JsonRPCServerLoop::loop = 0;
JsonRPCServerLoop* JsonRPCServerLoop::_instance = NULL;
JsonRPCServerLoop* JsonRPCServerLoop::instance() {
if (_instance == NULL) {
_instance = new JsonRPCServerLoop();
}
return _instance;
}
int
setnonblock(int fd)
{
int flags;
flags = fcntl(fd, F_GETFL);
if (flags < 0)
return flags;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0)
return -1;
return 0;
}
// todo: use write_cb
// static void write_cb(struct ev_loop *loop, struct ev_io *w, int revents)
// {
// struct JsonrpcClientConnection *cli=
// ((struct JsonrpcClientConnection*) (((char*)w) - offsetof(struct JsonrpcClientConnection,ev_write)));
// if (revents & EV_WRITE){
// ssize_t written = write(cli->fd,superjared,strlen(superjared));
// if (written != strlen(superjared)) {
// ERROR("writing response\n");
// }
// ev_io_stop(EV_A_ w);
// }
// close(cli->fd);
// delete cli;
// }
static void read_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
struct JsonrpcNetstringsConnection *cli= ((struct JsonrpcNetstringsConnection*) (((char*)w) -
offsetof(JsonrpcNetstringsConnection,ev_read)));
// int r=0;
// char rbuff[1024];
if (revents & EV_READ){
int res = cli->netstringsRead();
switch (res) {
case JsonrpcNetstringsConnection::CONTINUE:
ev_io_start(loop,&cli->ev_read); return;
case JsonrpcNetstringsConnection::REMOVE:
ev_io_stop(EV_A_ w); delete cli; return;
case JsonrpcNetstringsConnection::DISPATCH: {
ev_io_stop(EV_A_ w);
JsonRPCServerLoop::dispatchServerEvent(new JsonServerEvent(cli));
} return;
}
// todo: put into reader thread
//r=read(cli->fd,&rbuff,1024);
return;
}
// put back to read loop
// ev_io_start(loop,&cli->ev_read);
// ev_io_stop(EV_A_ w);
// ev_io_init(&cli->ev_write,write_cb,cli->fd,EV_WRITE);
// ev_io_start(loop,&cli->ev_write);
}
void JsonRPCServerLoop::dispatchServerEvent(AmEvent* ev) {
instance()->threadpool.dispatch(ev);
}
static void accept_cb(struct ev_loop *loop, struct ev_io *w, int revents)
{
int client_fd;
struct JsonrpcNetstringsConnection *a_client;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
client_fd = accept(w->fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd == -1) {
return;
}
a_client = new JsonrpcNetstringsConnection();
a_client->fd=client_fd;
if (setnonblock(a_client->fd) < 0) {
ERROR("failed to set client socket to non-blocking");
return;
}
ev_io_init(&a_client->ev_read,read_cb,a_client->fd,EV_READ);
ev_io_start(loop,&a_client->ev_read);
}
static void async_cb (EV_P_ ev_async *w, int revents)
{
JsonRPCServerLoop::_processEvents();
}
void JsonRPCServerLoop::_processEvents() {
instance()->processEvents();
}
void JsonRPCServerLoop::process(AmEvent* ev) {
DBG("processing event in server loop\n");
JsonServerEvent* server_event=dynamic_cast<JsonServerEvent*>(ev);
if (server_event==NULL) {
ERROR("wrong event type received\n");
return;
}
JsonrpcNetstringsConnection* a_client = server_event->conn;
a_client->resetRead();
ev_io_init(&a_client->ev_read,read_cb,a_client->fd,EV_READ);
ev_io_start(loop,&a_client->ev_read);
}
JsonRPCServerLoop::JsonRPCServerLoop()
: AmEventQueue(this)
{
loop = ev_default_loop (0);
// one thread is started here so that
// in app initialization code, there is already
// a server thread available to receive events
DBG("starting one server thread for startup requests...\n");
threadpool.addThreads(1);
}
JsonRPCServerLoop::~JsonRPCServerLoop() {
}
void JsonRPCServerLoop::run() {
DBG("adding %d more server threads \n",
JsonRPCServerModule::threads - 1);
threadpool.addThreads(JsonRPCServerModule::threads - 1);
INFO("running server loop; listening on port %d\n",
JsonRPCServerModule::port);
int listen_fd;
struct sockaddr_in listen_addr;
int reuseaddr_on = 1;
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
err(1, "listen failed");
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on,
sizeof(reuseaddr_on)) == -1)
err(1, "setsockopt failed");
memset(&listen_addr, 0, sizeof(listen_addr));
listen_addr.sin_family = AF_INET;
listen_addr.sin_addr.s_addr = INADDR_ANY;
listen_addr.sin_port = htons(JsonRPCServerModule::port);
if (bind(listen_fd, (struct sockaddr *)&listen_addr,
sizeof(listen_addr)) < 0) {
ERROR("bind failed\n");
return;
}
if (listen(listen_fd,5) < 0) {
ERROR("listen failed\n");
return;
}
if (setnonblock(listen_fd) < 0) {
ERROR("failed to set server socket to non-blocking\n");
return;
}
ev_io_init(&ev_accept,accept_cb,listen_fd,EV_READ);
ev_io_start(loop,&ev_accept);
ev_async_init (&async_w, async_cb);
ev_async_start (EV_A_ &async_w);
INFO("running event loop\n");
ev_loop (loop, 0);
INFO("event loop finished\n");
}
void JsonRPCServerLoop::on_stop() {
INFO("todo\n");
}
void JsonRPCServerLoop::returnConnection(JsonrpcNetstringsConnection* conn) {
DBG("returning connection %p\n", conn);
instance()->postEvent(new JsonServerEvent(conn));
ev_async_send(loop, &async_w);
}
void JsonRPCServerLoop::execRpc(const string& evq_link,
const string& notificationReceiver,
const string& requestReceiver,
int flags,
const string& host,
int port, const string& method,
AmArg& params,
AmArg& ret) {
JsonrpcNetstringsConnection* peer = new JsonrpcNetstringsConnection();
peer->flags = flags;
peer->notificationReceiver = notificationReceiver;
peer->requestReceiver = requestReceiver;
string res_str;
int res = peer->connect(host, port, res_str);
if (res) {
ret.push(400);
ret.push("Error in connect: "+res_str);
delete peer;
return;
}
if (JsonRpcServer::createRequest(evq_link, method, params, peer)) {
ret.push(400);
ret.push("Error creating request message");
}
JsonRPCServerLoop::dispatchServerEvent(new JsonServerEvent(peer));
ret.push(200);
ret.push("OK");
}

@ -0,0 +1,71 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _RpcServerLoop_h_
#define _RpcServerLoop_h_
#include <ev.h>
#include "AmEvent.h"
#include "AmEventQueue.h"
#include "AmThread.h"
#include "RpcPeer.h"
#include "RpcServerThread.h"
#include "AmArg.h"
class JsonRPCServerLoop
: public AmThread, public AmEventQueue, public AmEventHandler
{
RpcServerThreadpool threadpool;
static ev_async async_w;
static struct ev_loop *loop;
static JsonRPCServerLoop* _instance;
public:
JsonRPCServerLoop();
~JsonRPCServerLoop();
static JsonRPCServerLoop* instance();
static void returnConnection(JsonrpcNetstringsConnection* conn);
static void dispatchServerEvent(AmEvent* ev);
static void _processEvents();
static void execRpc(const string& evq_link,
const string& notificationReceiver,
const string& requestReceiver,
int flags,
const string& host,
int port, const string& method,
AmArg& params,
AmArg& ret);
void run();
void on_stop();
void process(AmEvent* ev);
};
#endif

@ -0,0 +1,146 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "RpcServerThread.h"
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#include "RpcPeer.h"
#include "RpcServerLoop.h"
#include "jsonArg.h"
#include "JsonRPCServer.h"
#include "log.h"
RpcServerThread::RpcServerThread()
: AmEventQueue(this) {
}
RpcServerThread::~RpcServerThread() {
}
void RpcServerThread::run() {
while (true) {
waitForEvent();
processEvents();
}
}
void RpcServerThread::on_stop() {
INFO("TODO: stop server thread\n");
}
void RpcServerThread::process(AmEvent* event) {
JsonServerEvent* server_event = dynamic_cast<JsonServerEvent*>(event);
if (server_event == NULL) {
ERROR("invalid event to process\n");
return;
}
JsonrpcNetstringsConnection* conn = server_event->conn;
bool processed_message = false;
int res = 0;
if (conn->messagePending() && conn->messageIsRecv()) {
processed_message = true;
DBG("processing message >%.*s<\n", conn->msg_size, conn->msgbuf);
res = JsonRpcServer::processMessage(conn->msgbuf, &conn->msg_size,
conn);
if (res<0) {
INFO("error processing message - closing connection\n");
conn->close();
delete conn;
return;
}
conn->msg_recv = false;
}
if (conn->messagePending() && !conn->messageIsRecv()) {
res = conn->netstringsBlockingWrite();
if (res == JsonrpcNetstringsConnection::REMOVE) {
delete conn;
return;
}
}
if (processed_message &&
(conn->flags & JsonrpcPeerConnection::FL_CLOSE_ALWAYS)) {
DBG("closing connection marked as FL_CLOSE_ALWAYS\n");
conn->close();
delete conn;
return;
}
// give back connection into server loop
JsonRPCServerLoop::returnConnection(conn);
// ev_io_init(&cli->ev_write,write_cb,cli->fd,EV_WRITE);
// ev_io_start(loop,&cli->ev_write);
}
RpcServerThreadpool::RpcServerThreadpool() {
}
RpcServerThreadpool::~RpcServerThreadpool() {
}
/** round-robin dispatch to one thread */
void RpcServerThreadpool::dispatch(AmEvent* ev) {
threads_mut.lock();
if (!threads.size()) {
ERROR("no threads started for Rpc servers\n");
delete ev;
threads_mut.unlock();
return;
}
(*t_it)->postEvent(ev);
t_it++;
if (t_it == threads.end())
t_it = threads.begin();
threads_mut.unlock();
}
void RpcServerThreadpool::addThreads(unsigned int cnt) {
DBG("adding %u RPC server threads\n", cnt);
threads_mut.lock();
for (unsigned int i=0;i<cnt;i++) {
RpcServerThread* thr = new RpcServerThread();
thr->start();
threads.push_back(thr);
}
t_it = threads.begin();
threads_mut.unlock();
}

@ -0,0 +1,76 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _RpcServerThread_h_
#define _RpcServerThread_h_
#include "AmEvent.h"
#include "AmEventQueue.h"
#include "AmThread.h"
#include "RpcPeer.h"
struct JsonServerEvent
: public AmEvent {
JsonrpcNetstringsConnection* conn;
JsonServerEvent(JsonrpcNetstringsConnection* c)
: conn(c), AmEvent(121) { }
~JsonServerEvent() { }
};
class RpcServerThread
: public AmThread, public AmEventQueue, public AmEventHandler
{
char rcvbuf[MAX_RPC_MSG_SIZE];
public:
RpcServerThread();
~RpcServerThread();
void run();
void on_stop();
void process(AmEvent* event);
};
class RpcServerThreadpool
{
vector<RpcServerThread*> threads;
vector<RpcServerThread*>::iterator t_it;
AmMutex threads_mut;
public:
RpcServerThreadpool();
~RpcServerThreadpool();
void dispatch(AmEvent* ev);
void addThreads(unsigned int cnt);
};
#endif

@ -0,0 +1,13 @@
# jsonrpc_port - json-rpc server port to listen on
#
# optional; default: 7080
#
# jsonrpc_port=7080
# server_threads - json-rpc server threads to start
#
# optional; default: 5
#
# server_threads=5

@ -0,0 +1,234 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "AmArg.h"
#include "AmUtils.h"
#include "log.h"
#include "jsonxx.h"
using namespace jsonxx;
#include <iostream>
#include <sstream>
#include <string>
const char *hex_chars = "0123456789abcdef";
string arg2json(const AmArg &a) {
// TODO: optimize to avoid lots of mallocs
// TODO: how to get a bool?
string s;
switch (a.getType()) {
case AmArg::Undef:
return "null";
case AmArg::Int:
return a.asInt()<0?"-"+int2str(abs(a.asInt())):int2str(abs(a.asInt()));
case AmArg::Bool:
return a.asBool()?"true":"false";
case AmArg::Double:
return double2str(a.asDouble());
case AmArg::CStr: {
// borrowed from jsoncpp
// Not sure how to handle unicode...
if (strpbrk(a.asCStr(), "\"\\\b\f\n\r\t") == NULL)
return std::string("\"") + a.asCStr() + "\"";
// We have to walk value and escape any special characters.
// Appending to std::string is not efficient, but this should be rare.
// (Note: forward slashes are *not* rare, but I am not escaping them.)
unsigned maxsize = strlen(a.asCStr())*2 + 3; // allescaped+quotes+NULL
std::string result;
result.reserve(maxsize); // to avoid lots of mallocs
result += "\"";
for (const char* c=a.asCStr(); *c != 0; ++c){
switch(*c){
case '\"':
result += "\\\"";
break;
case '\\':
result += "\\\\";
break;
case '\b':
result += "\\b";
break;
case '\f':
result += "\\f";
break;
case '\n':
result += "\\n";
break;
case '\r':
result += "\\r";
break;
case '\t':
result += "\\t";
break;
case '/':
// Even though \/ is considered a legal escape in JSON, a bare
// slash is also legal, so I see no reason to escape it.
// (I hope I am not misunderstanding something.)
default:{
if (*c < ' ')
result += "\\u00" + hex_chars[*c >> 4] + hex_chars[*c & 0xf];
else
result += *c;
} break;
}
}
result += "\"";
return result;
}
case AmArg::Array:
s = "[";
for (size_t i = 0; i < a.size(); i ++)
s += arg2json(a[i]) + ", ";
if (1 < s.size())
s.resize(s.size() - 2); // strip last ", "
s += "]";
return s;
case AmArg::Struct:
s = "{";
for (AmArg::ValueStruct::const_iterator it = a.asStruct()->begin();
it != a.asStruct()->end(); it ++) {
s += '"'+it->first + "\": ";
s += arg2json(it->second);
s += ", ";
}
if (1 < s.size())
s.resize(s.size() - 2); // strip last ", "
s += "}";
return s;
default: break;
}
return "{}";
}
// based on jsonxx
bool array_parse(std::istream& input, AmArg& res) {
if (!match("[", input)) {
return false;
}
do {
res.push(AmArg());
AmArg v;
if (!json2arg(input, res.get(res.size()-1))) {
res.pop_back();
break; // TODO: return false????
}
} while (match(",", input));
if (!match("]", input)) {
return false;
}
return true;
}
bool object_parse(std::istream& input, AmArg& res) {
if (!match("{", input)) {
return false;
}
do {
std::string key;
if (!parse_string(input, &key)) {
return false;
}
if (!match(":", input)) {
return false;
}
res[key] = AmArg(); // using the reference
if (!json2arg(input, res[key])) {
res.erase(key);
// TODO: return false? but: empty will return false in subtree as well
break;
}
} while (match(",", input));
if (!match("}", input)) {
return false;
}
return true;
}
bool json2arg(const std::string& input, AmArg& res) {
std::istringstream iss(input);
return json2arg(iss, res);
}
bool json2arg(const char* input, AmArg& res) {
std::istringstream iss(input);
return json2arg(iss, res);
}
bool json2arg(std::istream& input, AmArg& res) {
res.clear();
std::string string_value;
if (parse_string(input, &string_value)) {
res = string_value; // todo: unnecessary value copy here
return true;
}
if (parse_float(input, &res.v_double)) {
res.type = AmArg::Double;
return true;
}
if (parse_number(input, &res.v_int)) {
res.type = AmArg::Int;
return true;
}
if (parse_bool(input, &res.v_bool)) {
res.type = AmArg::Bool;
return true;
}
if (parse_null(input)) { // AmArg::Undef
return true;
}
if (array_parse(input, res)) {
res.type = AmArg::Array;
return true;
}
if (object_parse(input, res)) {
res.type = AmArg::Struct;
return true;
}
res.invalidate();
return false;
}

@ -0,0 +1,37 @@
/*
* $Id: ModMysql.cpp 1764 2010-04-01 14:33:30Z peter_lemenkov $
*
* Copyright (C) 2010 TelTech Systems Inc.
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _jsonArg_h_
#define _jsonArg_h_
#include <string>
#include <iostream>
string arg2json(const AmArg &a);
bool json2arg(std::istream& input, AmArg& res);
bool json2arg(const char* input, AmArg& res);
bool json2arg(const std::string& input, AmArg& res);
#endif

@ -0,0 +1,283 @@
// Author: Hong Jiang <hong@hjiang.net>
#include "jsonxx.h"
#include <cctype>
#include <iostream>
#include <sstream>
namespace jsonxx {
void eat_whitespaces(std::istream& input) {
char ch;
do {
input.get(ch);
} while(isspace(ch));
input.putback(ch);
}
// Try to consume characters from the input stream and match the
// pattern string. Leading whitespaces from the input are ignored if
// ignore_ws is true.
bool match(const std::string& pattern, std::istream& input,
bool ignore_ws) {
if (ignore_ws) {
eat_whitespaces(input);
}
std::string::const_iterator cur(pattern.begin());
char ch(0);
while(input && !input.eof() && cur != pattern.end()) {
input.get(ch);
if (ch != *cur) {
input.putback(ch);
return false;
} else {
cur++;
}
}
return cur == pattern.end();
}
bool parse_string(std::istream& input, std::string* value) {
if (!match("\"", input)) {
return false;
}
char ch;
while(!input.eof() && input.good()) {
input.get(ch);
if (ch == '"') {
break;
}
value->push_back(ch);
}
if (input && ch == '"') {
return true;
} else {
return false;
}
}
bool parse_bool(std::istream& input, bool* value) {
if (match("true", input)) {
*value = true;
return true;
}
if (match("false", input)) {
*value = false;
return true;
}
return false;
}
bool parse_null(std::istream& input) {
if (match("null", input)) {
return true;
}
return false;
}
bool parse_float(std::istream& input, double* value) {
eat_whitespaces(input);
char ch;
bool has_dot = false;
std::string value_str;
int sign = 1;
if (match("-", input)) {
sign = -1;
} else {
match("+", input);
}
while(input && !input.eof()) {
input.get(ch);
if (ch=='.')
has_dot = true;
if (!isdigit(ch) && (!(ch == '.'))) {
input.putback(ch);
break;
}
value_str.push_back(ch);
}
if (!has_dot) {
for (std::string::reverse_iterator r_it=
value_str.rbegin(); r_it != value_str.rend(); r_it++)
input.putback(*r_it);
return false;
}
if (value_str.size() > 0) {
std::istringstream(value_str) >> *value;
*value*=sign;
return true;
} else {
return false;
}
}
bool parse_number(std::istream& input, long* value) {
eat_whitespaces(input);
char ch;
std::string value_str;
int sign = 1;
if (match("-", input)) {
sign = -1;
} else {
match("+", input);
}
while(input && !input.eof()) {
input.get(ch);
if (!isdigit(ch)) {
input.putback(ch);
break;
}
value_str.push_back(ch);
}
if (value_str.size() > 0) {
std::istringstream(value_str) >> *value;
*value*=sign;
return true;
} else {
return false;
}
}
bool parse_number(std::istream& input, int* value) {
eat_whitespaces(input);
char ch;
std::string value_str;
int sign = 1;
if (match("-", input)) {
sign = -1;
} else {
match("+", input);
}
while(input && !input.eof()) {
input.get(ch);
if (!isdigit(ch)) {
input.putback(ch);
break;
}
value_str.push_back(ch);
}
if (value_str.size() > 0) {
std::istringstream(value_str) >> *value;
*value*=sign;
return true;
} else {
return false;
}
}
Object::Object() : value_map_() {}
Object::~Object() {
std::map<std::string, Value*>::iterator i;
for (i = value_map_.begin(); i != value_map_.end(); ++i) {
delete i->second;
}
}
bool Object::parse(std::istream& input) {
if (!match("{", input)) {
return false;
}
do {
std::string key;
if (!parse_string(input, &key)) {
return false;
}
if (!match(":", input)) {
return false;
}
Value* v = new Value();
if (!v->parse(input)) {
delete v;
break;
}
value_map_[key] = v;
} while (match(",", input));
if (!match("}", input)) {
return false;
}
return true;
}
Value::Value() : type_(INVALID_) {}
Value::~Value() {
if (type_ == STRING_) {
delete string_value_;
}
if (type_ == OBJECT_) {
delete object_value_;
}
if (type_ == ARRAY_) {
delete array_value_;
}
}
bool Value::parse(std::istream& input) {
std::string string_value;
if (parse_string(input, &string_value)) {
string_value_ = new std::string();
string_value_->swap(string_value);
type_ = STRING_;
return true;
}
if (parse_number(input, &integer_value_)) {
type_ = INTEGER_;
return true;
}
if (parse_bool(input, &bool_value_)) {
type_ = BOOL_;
return true;
}
if (parse_null(input)) {
type_ = NULL_;
return true;
}
array_value_ = new Array();
if (array_value_->parse(input)) {
type_ = ARRAY_;
return true;
}
delete array_value_;
object_value_ = new Object();
if (object_value_->parse(input)) {
type_ = OBJECT_;
return true;
}
delete object_value_;
return false;
}
Array::Array() : values_() {}
Array::~Array() {
for (unsigned int i = 0; i < values_.size(); ++i) {
delete values_[i];
}
}
bool Array::parse(std::istream& input) {
if (!match("[", input)) {
return false;
}
do {
Value* v = new Value();
if (!v->parse(input)) {
delete v;
break;
}
values_.push_back(v);
} while (match(",", input));
if (!match("]", input)) {
return false;
}
return true;
}
} // namespace jsonxx

@ -0,0 +1,185 @@
// Author: Hong Jiang <hong@hjiang.net>
#include <cassert>
#include <iostream>
#include <map>
#include <vector>
namespace jsonxx {
bool match(const std::string& pattern, std::istream& input,
bool ignore_ws = true);
bool parse_string(std::istream& input, std::string* value);
bool parse_bool(std::istream& input, bool* value);
bool parse_null(std::istream& input);
bool parse_float(std::istream& input, double* value);
bool parse_number(std::istream& input, long* value);
bool parse_number(std::istream& input, int* value);
// TODO: *::parse() should be static functions.
class Value;
// A JSON Object
class Object {
public:
Object();
~Object();
bool parse(std::istream& input);
template <typename T>
bool has(const std::string& key);
// Always call has<>() first. If the key doesn't exist, consider
// the behavior undefined.
template <typename T>
T& get(const std::string& key);
private:
Object(const Object&);
Object& operator=(const Object&);
std::map<std::string, Value*> value_map_;
};
class Value;
class Array {
public:
Array();
~Array();
bool parse(std::istream& input);
unsigned int size() { return values_.size(); }
template <typename T>
bool has(unsigned int i);
template <typename T>
T& get(unsigned int i);
private:
Array(const Array&);
Array& operator=(const Array&);
std::vector<Value*> values_;
};
// A value could be a number, an array, a string, an object, a
// boolean, or null
class Value {
public:
class Null {};
Value();
~Value();
bool parse(std::istream& input);
template<typename T>
bool is();
template<typename T>
T& get();
private:
Value(const Value&);
Value& operator=(const Value&);
enum {
INTEGER_,
STRING_,
BOOL_,
NULL_,
ARRAY_,
OBJECT_,
INVALID_
} type_;
union {
long integer_value_;
std::string* string_value_;
bool bool_value_;
Array* array_value_;
Object* object_value_;
};
};
template <typename T>
bool Array::has(unsigned int i) {
if (i >= size()) {
return false;
} else {
return values_[i]->is<T>();
}
}
template <typename T>
T& Array::get(unsigned int i) {
assert(i < size());
return values_[i]->get<T>();
}
template <typename T>
bool Object::has(const std::string& key) {
return value_map_.count(key) > 0 && value_map_[key]->is<T>();
}
template <typename T>
T& Object::get(const std::string& key) {
assert(has<T>(key));
return value_map_[key]->get<T>();
}
template<>
inline bool Value::is<Value::Null>() {
return type_ == NULL_;
}
template<>
inline bool Value::is<bool>() {
return type_ == BOOL_;
}
template<>
inline bool Value::is<std::string>() {
return type_ == STRING_;
}
template<>
inline bool Value::is<long>() {
return type_ == INTEGER_;
}
template<>
inline bool Value::is<Array>() {
return type_ == ARRAY_;
}
template<>
inline bool Value::is<Object>() {
return type_ == OBJECT_;
}
template<>
inline bool& Value::get<bool>() {
assert(is<bool>());
return bool_value_;
}
template<>
inline std::string& Value::get<std::string>() {
assert(is<std::string>());
return *string_value_;
}
template<>
inline long& Value::get<long>() {
assert(is<long>());
return integer_value_;
}
template<>
inline Array& Value::get<Array>() {
assert(is<Array>());
return *array_value_;
}
template<>
inline Object& Value::get<Object>() {
assert(is<Object>());
return *object_value_;
}
} // namespace jsonxx
Loading…
Cancel
Save