jsonrpc extensions and fixes

o sendMessage DI function
  to send message into open connection

O execServerFunction DI function
  execute server function as if received through json-rpc

o getServerPort DI function
  get the server port the json-rpc server is listening on

o notification events of connection disconnect

o various fixes in protocol handling
sayer/1.4-spce2.6
Stefan Sayer 16 years ago
parent ccdbae75ed
commit c3629d7c3f

@ -26,6 +26,7 @@
*/
#include "JsonRPC.h"
#include "JsonRPCServer.h"
JsonRPCServerModule* JsonRPCServerModule::_instance = NULL;
@ -90,8 +91,26 @@ void JsonRPCServerModule::invoke(const string& method,
}
execRpc(args, ret);
// sendRequestList(args, ret);
} else if (method == "sendMessage"){
args.assertArrayFmt("sisss"); // conn_id, type, method, id, reply_sink, [params]
if (args.size() > 5) {
if (!isArgArray(args.get(5)) && !isArgStruct(args.get(5))) {
ERROR("internal error: params to JSON-RPC must be struct or array\n");
throw AmArg::TypeMismatchException();
}
}
sendMessage(args, ret);
} else if (method == "execServerFunction"){
args.assertArrayFmt("ss"); // method, id, params
JsonRpcServer::execRpc(args.get(0).asCStr(), args.get(1).asCStr(), args.get(2), ret);
// JsonRpcServer::execRpc(args, ret);
} else if (method == "getServerPort"){
ret.push(port);
} else if(method == "_list"){
ret.push(AmArg("execRpc"));
ret.push(AmArg("sendMessage"));
ret.push(AmArg("getServerPort"));
ret.push(AmArg("execServerFunction"));
// ret.push(AmArg("newConnection"));
// ret.push(AmArg("sendRequest"));
// ret.push(AmArg("sendRequestList"));
@ -115,3 +134,16 @@ void JsonRPCServerModule::execRpc(const AmArg& args, AmArg& ret) {
args.get(5).asInt(), args.get(6).asCStr(),
params, ret);
}
void JsonRPCServerModule::sendMessage(const AmArg& args, AmArg& ret) {
AmArg none_params;
AmArg& params = none_params;
if (args.size()>5)
params = args.get(5);
JsonRPCServerLoop::sendMessage(args.get(0).asCStr(), // conn_id,
args.get(1).asInt(), // type, (0 == reply)
args.get(2).asCStr(), // method,
args.get(3).asCStr(), // id
args.get(4).asCStr(), // reply_sink
params, ret);
}

@ -45,6 +45,7 @@ class JsonRPCServerModule
// DI methods
void execRpc(const AmArg& args, AmArg& ret);
void sendMessage(const AmArg& args, AmArg& ret);
public:
JsonRPCServerModule(const string& mod_name);

@ -34,8 +34,14 @@
#include <string>
using std::string;
#define JSONRPC_MSG_REQUEST 0
#define JSONRPC_MSG_RESPONSE 1
#define JSONRPC_MSG_ERROR 2
class JsonrpcNetstringsConnection;
struct JsonRpcEvent
: public AmEvent {
string connection_id;
JsonRpcEvent()
: AmEvent(122) { }
virtual ~JsonRpcEvent() { }
@ -95,16 +101,73 @@ struct JsonRpcRequestEvent
struct JsonRpcConnectionEvent
: public JsonRpcEvent {
// todo: add connection id
enum {
DISCONNECT = 0
};
int what;
string connection_id;
JsonRpcConnectionEvent(int what)
: what(what) { }
JsonRpcConnectionEvent(int what, const string& connection_id)
: what(what), connection_id(connection_id) { }
~JsonRpcConnectionEvent() { }
};
// events used internally:
struct JsonServerEvent
: public AmEvent {
enum EventType {
StartReadLoop = 0,
SendMessage
};
JsonrpcNetstringsConnection* conn;
string connection_id;
JsonServerEvent(JsonrpcNetstringsConnection* c,
EventType ev_type)
: conn(c), AmEvent(ev_type) { }
JsonServerEvent(const string& connection_id,
EventType ev_type)
: connection_id(connection_id), AmEvent(ev_type),
conn(NULL) { }
~JsonServerEvent() { }
};
struct JsonServerSendMessageEvent
: public JsonServerEvent {
bool is_reply;
string method;
string id;
AmArg params;
string reply_link;
bool is_error;
JsonServerSendMessageEvent(const string& connection_id,
bool is_reply,
const string& method,
const string& id,
AmArg& params,
const string& reply_link = "")
: JsonServerEvent(connection_id, SendMessage),
is_reply(is_reply), reply_link(reply_link),
method(method), id(id), params(params) { }
JsonServerSendMessageEvent(const JsonServerSendMessageEvent& e,
JsonrpcNetstringsConnection* conn)
: JsonServerEvent(conn, SendMessage),
is_reply(e.is_reply),reply_link(e.reply_link),
method(e.method), id(e.id), params(e.params),
is_error(e.is_error) {
connection_id = e.connection_id;
}
};
#endif // _JsonRPCEvents_h_

@ -37,7 +37,8 @@
#include "AmSession.h"
#include "AmUtils.h"
int JsonRpcServer::createRequest(const string& evq_link, const string& method, AmArg& params, JsonrpcNetstringsConnection* peer,
int JsonRpcServer::createRequest(const string& evq_link, const string& method,
AmArg& params, JsonrpcNetstringsConnection* peer,
bool is_notification) {
AmArg rpc_params;
rpc_params["jsonrpc"] = "2.0";
@ -47,7 +48,11 @@ int JsonRpcServer::createRequest(const string& evq_link, const string& method, A
peer->req_id++;
string req_id = int2str(peer->req_id);
rpc_params["id"] = req_id;
peer->replyReceivers[req_id] = evq_link;
if (!evq_link.empty())
peer->replyReceivers[req_id] = evq_link;
DBG("registering reply sink '%s' for id %s\n",
evq_link.c_str(), req_id.c_str());
}
string rpc_params_json = arg2json(rpc_params);
@ -65,6 +70,31 @@ int JsonRpcServer::createRequest(const string& evq_link, const string& method, A
return 0;
}
int JsonRpcServer::createReply(JsonrpcNetstringsConnection* peer,
const string& id, AmArg& result, bool is_error) {
AmArg rpc_res;
rpc_res["id"] = id;
rpc_res["jsonrpc"] = "2.0";
if (is_error)
rpc_res["error"] = result;
else
rpc_res["result"] = result;
string res_s = arg2json(rpc_res);
if (res_s.length() > MAX_RPC_MSG_SIZE) {
ERROR("internal error: reply exceeded MAX_RPC_MSG_SIZE (%d)\n",
MAX_RPC_MSG_SIZE);
return -3;
}
DBG("created RPC reply: >>%.*s<<\n", res_s.length(), res_s.c_str());
memcpy(peer->msgbuf, res_s.c_str(), res_s.length());
peer->msg_size = res_s.length();
return 0;
}
int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size,
JsonrpcPeerConnection* peer) {
DBG("parsing message ...\n");
@ -95,7 +125,7 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size,
std::map<std::string, std::string>::iterator rep_recv_q =
peer->replyReceivers.find(id);
if (rep_recv_q == peer->replyReceivers.end()) {
DBG("received repy for unknown request");
DBG("received reply for unknown request");
*msg_size = 0;
if (peer->flags && JsonrpcPeerConnection::FL_CLOSE_WRONG_REPLY) {
@ -114,6 +144,7 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size,
}
resp_ev = new JsonRpcResponseEvent(true, id, rpc_params["error"]);
}
resp_ev->connection_id = peer->id;
bool posted = AmEventDispatcher::instance()->
post(rep_recv_q->second, resp_ev);
@ -135,8 +166,17 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size,
}
string id;
if (rpc_params.hasMember("id") && isArgCStr(rpc_params["id"])) {
id = rpc_params["id"].asCStr();
if (rpc_params.hasMember("id")) {
if (isArgCStr(rpc_params["id"]))
id = rpc_params["id"].asCStr();
else if (isArgInt(rpc_params["id"]))
id = int2str(rpc_params["id"].asInt());
else if (isArgBool(rpc_params["id"]))
id = rpc_params["id"].asBool() ? "True":"False";
else {
ERROR("incorrect type for jsonrpc id <%s>\n",
AmArg::print(rpc_params["id"]).c_str());
}
} else {
DBG("received notification\n");
}
@ -158,18 +198,19 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size,
if (rpc_params.hasMember("params")) {
params = rpc_params["params"];
}
JsonRpcRequestEvent* notification_ev =
JsonRpcRequestEvent* request_ev =
new JsonRpcRequestEvent(rpc_params["method"].asCStr(),
id, params);
request_ev->connection_id = peer->id;
bool posted = AmEventDispatcher::instance()->
post(dst_evqueue, notification_ev);
post(dst_evqueue, request_ev);
if (!posted) {
DBG("%s receiver event queue '%s' does not exist (any more)\n",
id.empty() ? "notification":"request",
dst_evqueue.c_str());
delete notification_ev;
delete request_ev;
if (id.empty() && (peer->flags & JsonrpcPeerConnection::FL_CLOSE_NO_NOTIF_RECV)) {
INFO("closing connection on missing notification receiver queue\n");
@ -201,6 +242,7 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size,
MAX_RPC_MSG_SIZE);
return -3;
}
DBG("RPC result: >>%.*s<<\n", res_s.length(), res_s.c_str());
memcpy(msgbuf, res_s.c_str(), res_s.length());
*msg_size = res_s.length();
@ -210,14 +252,22 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size,
/** rpc_params must contain "method" member as string */
void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) {
try {
AmArg none_params;
AmArg& params = none_params;
if (rpc_params.hasMember("params")) {
params = rpc_params["params"];
}
string method = rpc_params["method"].asCStr();
string id;
if (rpc_params.hasMember("id") && isArgCStr(rpc_params["id"]))
id = rpc_params["id"].asCStr();
execRpc(method, id, params, rpc_res);
}
void JsonRpcServer::execRpc(const string& method, const string& id, const AmArg& params, AmArg& rpc_res) {
try {
size_t dot_pos = method.find('.');
if (dot_pos == string::npos || dot_pos == method.length()) {
throw JsonRpcError(-32601, "Method not found",
@ -229,7 +279,7 @@ void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) {
try {
if (factory == "core") {
runCoreMethod(fact_meth, params, rpc_res["result"]);
rpc_res["id"] = rpc_params["id"];
rpc_res["id"] = id;
rpc_res["error"] = AmArg(); // Undef/null
rpc_res["jsonrpc"] = "2.0";
return;
@ -265,6 +315,9 @@ void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) {
INFO("type mismatch in RPC DI call\n");
throw JsonRpcError(-32602, "Invalid params",
"parameters type mismatch in function call");
} catch (const JsonRpcError& e) {
INFO("JsonRpcError \n");
throw;
} catch (...) {
ERROR("unexpected Exception in RPC DI call\n");
throw JsonRpcError(-32000, "Server error",
@ -272,7 +325,7 @@ void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) {
}
// todo: notification!
rpc_res["id"] = rpc_params["id"];
rpc_res["id"] = id;
rpc_res["jsonrpc"] = "2.0";
} catch (const JsonRpcError& e) {
INFO("got JsonRpcError core %d message '%s'\n",
@ -282,7 +335,7 @@ void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) {
rpc_res["error"]["message"] = e.message;
rpc_res["error"]["data"] = e.data;
// todo: notification!
rpc_res["id"] = rpc_params["id"];
rpc_res["id"] = id;
rpc_res["jsonrpc"] = "2.0";
return;
}

@ -46,7 +46,9 @@ struct JsonRpcError {
};
class JsonRpcServer {
public:
static void execRpc(const AmArg& rpc_params, AmArg& rpc_res);
static void execRpc(const string& method, const string& id, const AmArg& params, AmArg& rpc_res);
static void runCoreMethod(const string& method, const AmArg& params, AmArg& res);
public:
static int processMessage(char* msgbuf, unsigned int* msg_size,
@ -54,6 +56,9 @@ class JsonRpcServer {
static int createRequest(const string& evq_link, const string& method, AmArg& params,
JsonrpcNetstringsConnection* peer, bool is_notification = false);
static int createReply(JsonrpcNetstringsConnection* peer, const string& id,
AmArg& result, bool is_error);
};
#endif // _JsonRPCServer_h_

@ -39,9 +39,32 @@
using std::string;
#include "AmUtils.h"
#include "AmEventDispatcher.h"
#include "JsonRPCEvents.h"
JsonrpcNetstringsConnection::JsonrpcNetstringsConnection()
: fd(0), msg_size(0), rcvd_size(0), in_msg(false), msg_recv(true)
void JsonrpcPeerConnection::notifyDisconnect() {
// let event receivers know about broken connection
DBG("notifying event receivers about broken connection\n");
if (!notificationReceiver.empty())
AmEventDispatcher::instance()->
post(notificationReceiver,
new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT, id));
if (!requestReceiver.empty())
AmEventDispatcher::instance()->
post(requestReceiver,
new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT, id));
for (std::map<std::string, std::string>::iterator it=
replyReceivers.begin(); it != replyReceivers.end(); it++) {
AmEventDispatcher::instance()->
post(it->second,
new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT, id));
}
}
JsonrpcNetstringsConnection::JsonrpcNetstringsConnection(const std::string& id)
: JsonrpcPeerConnection(id),
fd(0), msg_size(0), rcvd_size(0), in_msg(false), msg_recv(true)
{
}
@ -159,7 +182,7 @@ int JsonrpcNetstringsConnection::netstringsRead() {
return REMOVE;
}
// DBG("received '%c'\n", msgbuf[rcvd_size]);
DBG("received '%c'\n", msgbuf[rcvd_size]);
if (msgbuf[rcvd_size] == ':') {
msgbuf[rcvd_size] = '\0';
if (str2i(std::string(msgbuf, rcvd_size), msg_size)) {

@ -30,6 +30,7 @@
#include <ev.h>
#include <stdlib.h>
#include "log.h"
#define MAX_RPC_MSG_SIZE 20*1024*1024 // 20k
#define MAX_NS_LEN_SIZE 10
@ -39,6 +40,8 @@
#include <string>
struct JsonrpcPeerConnection {
std::string id;
// event queue keys that should receive the reply
// to requests sent on that connection
std::map<std::string, std::string> replyReceivers;
@ -67,7 +70,16 @@ struct JsonrpcPeerConnection {
int req_id;
virtual ~JsonrpcPeerConnection() { }
JsonrpcPeerConnection(const std::string& id)
: id(id) {
DBG("created connection '%s'\n", id.c_str());
}
virtual ~JsonrpcPeerConnection() {
DBG("destroying connection '%s'\n", id.c_str());
}
void notifyDisconnect();
};
struct JsonrpcNetstringsConnection
@ -84,7 +96,7 @@ struct JsonrpcNetstringsConnection
bool in_msg;
bool msg_recv;
JsonrpcNetstringsConnection();
JsonrpcNetstringsConnection(const std::string& id);
~JsonrpcNetstringsConnection();
int connect(const std::string& host, int port, std::string& res_str);

@ -30,6 +30,7 @@
#include "JsonRPC.h"
#include "JsonRPCEvents.h"
#include "AmSession.h"
#include "AmEventDispatcher.h"
#include "log.h"
@ -55,6 +56,13 @@ ev_io ev_accept;
ev_async JsonRPCServerLoop::async_w;
struct ev_loop* JsonRPCServerLoop::loop = 0;
JsonRPCServerLoop* JsonRPCServerLoop::_instance = NULL;
RpcServerThreadpool JsonRPCServerLoop::threadpool;
std::map<string, JsonrpcPeerConnection*> JsonRPCServerLoop::connections;
AmMutex JsonRPCServerLoop::connections_mut;
vector<JsonServerEvent*> JsonRPCServerLoop::pending_events;
AmMutex JsonRPCServerLoop::pending_events_mut;
JsonRPCServerLoop* JsonRPCServerLoop::instance() {
if (_instance == NULL) {
@ -63,8 +71,7 @@ JsonRPCServerLoop* JsonRPCServerLoop::instance() {
return _instance;
}
int
setnonblock(int fd)
int setnonblock(int fd)
{
int flags;
@ -95,40 +102,37 @@ setnonblock(int fd)
// }
static void read_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
struct JsonrpcNetstringsConnection *cli= ((struct JsonrpcNetstringsConnection*) (((char*)w) -
offsetof(JsonrpcNetstringsConnection,ev_read)));
struct JsonrpcNetstringsConnection *peer=
((struct JsonrpcNetstringsConnection*)
(((char*)w) - offsetof(JsonrpcNetstringsConnection,ev_read)));
DBG("read_cb in connection %p\n", peer);
// int r=0;
// char rbuff[1024];
if (revents & EV_READ){
int res = cli->netstringsRead();
// read message - here in main server thread (more efficient for small messages)
int res = peer->netstringsRead();
switch (res) {
case JsonrpcNetstringsConnection::CONTINUE:
ev_io_start(loop,&cli->ev_read); return;
ev_io_start(loop,&peer->ev_read); return;
case JsonrpcNetstringsConnection::REMOVE: {
ev_io_stop(EV_A_ w);
// let event receivers know about broken connection
// todo: add connection id
if (!cli->notificationReceiver.empty())
AmEventDispatcher::instance()->post(cli->notificationReceiver,
new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT));
if (!cli->requestReceiver.empty())
AmEventDispatcher::instance()->post(cli->requestReceiver,
new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT));
for (std::map<std::string, std::string>::iterator it=
cli->replyReceivers.begin(); it != cli->replyReceivers.end(); it++) {
AmEventDispatcher::instance()->post(it->second,
new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT));
}
delete cli;
peer->notifyDisconnect();
JsonRPCServerLoop::instance()->removeConnection(peer->id);
delete peer;
} return;
case JsonrpcNetstringsConnection::DISPATCH: {
ev_io_stop(EV_A_ w);
JsonRPCServerLoop::dispatchServerEvent(new JsonServerEvent(cli));
JsonRPCServerLoop::
dispatchServerEvent(new JsonServerEvent(peer, JsonServerEvent::StartReadLoop));
} return;
}
// todo: put into reader thread
//r=read(cli->fd,&rbuff,1024);
return;
}
// put back to read loop
@ -140,13 +144,12 @@ static void read_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
}
void JsonRPCServerLoop::dispatchServerEvent(AmEvent* ev) {
instance()->threadpool.dispatch(ev);
threadpool.dispatch(ev);
}
static void accept_cb(struct ev_loop *loop, struct ev_io *w, int revents)
{
int client_fd;
struct JsonrpcNetstringsConnection *a_client;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
client_fd = accept(w->fd, (struct sockaddr *)&client_addr, &client_len);
@ -154,14 +157,19 @@ static void accept_cb(struct ev_loop *loop, struct ev_io *w, int revents)
return;
}
a_client = new JsonrpcNetstringsConnection();
a_client->fd=client_fd;
if (setnonblock(a_client->fd) < 0) {
string connection_id = JsonRPCServerLoop::newConnectionId();
JsonrpcNetstringsConnection* peer = new JsonrpcNetstringsConnection(connection_id);
peer->fd=client_fd;
if (setnonblock(peer->fd) < 0) {
delete peer;
ERROR("failed to set client socket to non-blocking");
return;
}
ev_io_init(&a_client->ev_read,read_cb,a_client->fd,EV_READ);
ev_io_start(loop,&a_client->ev_read);
JsonRPCServerLoop::registerConnection(peer, connection_id);
ev_io_init(&peer->ev_read,read_cb,peer->fd,EV_READ);
ev_io_start(loop,&peer->ev_read);
}
static void async_cb (EV_P_ ev_async *w, int revents)
@ -174,28 +182,102 @@ void JsonRPCServerLoop::_processEvents() {
}
void JsonRPCServerLoop::process(AmEvent* ev) {
DBG("processing event in server loop\n");
DBG("server loop - processing event\n");
JsonServerEvent* server_event=dynamic_cast<JsonServerEvent*>(ev);
if (server_event==NULL) {
ERROR("wrong event type received\n");
return;
}
JsonrpcNetstringsConnection* a_client = server_event->conn;
a_client->resetRead();
ev_io_init(&a_client->ev_read,read_cb,a_client->fd,EV_READ);
ev_io_start(loop,&a_client->ev_read);
switch (server_event->event_id) {
case JsonServerEvent::StartReadLoop: {
JsonrpcNetstringsConnection* a_client = server_event->conn;
DBG("checking for pending events to connection %p/%s\n",
a_client, a_client->id.c_str());
pending_events_mut.lock();
// (check whether event for that connection pending)
for (vector<JsonServerEvent*>::iterator it=
pending_events.begin(); it != pending_events.end(); it++) {
if ((*it)->connection_id == a_client->id) {
// stop read loop
ev_io_stop(loop,&a_client->ev_read);
JsonServerEvent* server_event = *it;
pending_events.erase(it);
pending_events_mut.unlock();
DBG("got pending event for connection '%s'\n", a_client->id.c_str());
server_event->conn = a_client;
dispatchServerEvent(server_event);
return;
}
}
pending_events_mut.unlock();
DBG("no pending events for connection %p/%s, starting read loop\n",
a_client, a_client->id.c_str());
a_client->resetRead();
ev_io_init(&a_client->ev_read,read_cb,a_client->fd,EV_READ);
ev_io_start(loop,&a_client->ev_read);
}; break;
case JsonServerEvent::SendMessage: {
JsonServerSendMessageEvent* snd_msg_ev =
dynamic_cast<JsonServerSendMessageEvent*>(server_event);
if (snd_msg_ev == NULL) {
ERROR("invalid SendMessage type event received\n");
return;
}
JsonrpcPeerConnection* p_peer = getConnection(snd_msg_ev->connection_id);
if (p_peer == NULL) {
WARN("dropping message to inexistent/broken connection '%s' "
"(is_reply=%s, method=%s, id=%s, params='%s')",
snd_msg_ev->connection_id.c_str(), snd_msg_ev->is_reply?"true":"false",
snd_msg_ev->method.c_str(), snd_msg_ev->id.c_str(),
AmArg::print(snd_msg_ev->params).c_str());
return;
}
JsonrpcNetstringsConnection* peer = dynamic_cast<JsonrpcNetstringsConnection*>(p_peer);
if (NULL == peer) {
ERROR("invalid connection type\n"); // todo: other transports
return;
}
if (ev_is_active(&peer->ev_read)) {
// ok, peer is in read loop, we can dispatch it to thread for sending
ev_io_stop(EV_A_ &peer->ev_read);
JsonRPCServerLoop::
dispatchServerEvent(new JsonServerSendMessageEvent(*snd_msg_ev, peer));
} else {
// peer is being processed by thread, save event for later sending
pending_events_mut.lock();
// (need to copy event here: original event is deleted when event processed)
pending_events.push_back(new JsonServerSendMessageEvent(*snd_msg_ev));
size_t q_size = pending_events.size();
pending_events_mut.unlock();
DBG("queued event for connection %s (total %zd events pending)\n",
snd_msg_ev->connection_id.c_str(), q_size);
}
}; break;
// todo: process remove connection event
default:
ERROR("unknown server event type received\n");
return;
}
}
JsonRPCServerLoop::JsonRPCServerLoop()
: AmEventQueue(this)
{
loop = ev_default_loop (0);
// one thread is started here so that
// in app initialization code, there is already
// a server thread available to receive events
DBG("starting one server thread for startup requests...\n");
threadpool.addThreads(1);
}
@ -239,6 +321,7 @@ void JsonRPCServerLoop::run() {
ev_io_init(&ev_accept,accept_cb,listen_fd,EV_READ);
ev_io_start(loop,&ev_accept);
// async watcher to process our events in event loop
ev_async_init (&async_w, async_cb);
ev_async_start (EV_A_ &async_w);
@ -252,8 +335,29 @@ void JsonRPCServerLoop::on_stop() {
}
void JsonRPCServerLoop::returnConnection(JsonrpcNetstringsConnection* conn) {
pending_events_mut.lock();
// (check whether event for that connection pending)
DBG("checking %u pending events\n", pending_events.size());
for (vector<JsonServerEvent*>::iterator it=
pending_events.begin(); it != pending_events.end(); it++) {
DBG("%s vs %s\n", (*it)->connection_id.c_str(),conn->id.c_str());
if ((*it)->connection_id == conn->id) {
JsonServerEvent* server_event = *it;
pending_events.erase(it);
pending_events_mut.unlock();
DBG("got pending event for connection '%s'\n", conn->id.c_str());
server_event->conn = conn;
dispatchServerEvent(server_event);
return;
}
}
pending_events_mut.unlock();
DBG("returning connection %p\n", conn);
instance()->postEvent(new JsonServerEvent(conn));
instance()->postEvent(new JsonServerEvent(conn, JsonServerEvent::StartReadLoop));
ev_async_send(loop, &async_w);
}
@ -265,7 +369,8 @@ void JsonRPCServerLoop::execRpc(const string& evq_link,
int port, const string& method,
AmArg& params,
AmArg& ret) {
JsonrpcNetstringsConnection* peer = new JsonrpcNetstringsConnection();
string connection_id = newConnectionId();
JsonrpcNetstringsConnection* peer = new JsonrpcNetstringsConnection(connection_id);
peer->flags = flags;
peer->notificationReceiver = notificationReceiver;
peer->requestReceiver = requestReceiver;
@ -279,13 +384,101 @@ void JsonRPCServerLoop::execRpc(const string& evq_link,
return;
}
if (JsonRpcServer::createRequest(evq_link, method, params, peer)) {
// DBG("evq_link = '%s'\n", evq_link.c_str());
// if (JsonRpcServer::createRequest(evq_link, method, params, peer)) {
// ret.push(400);
// ret.push("Error creating request message");
// delete peer;
// return;
// }
registerConnection(peer, connection_id);
DBG("dispatching JsonServerSendMessageEvent\n");
JsonServerSendMessageEvent* send_message_event =
new JsonServerSendMessageEvent(connection_id, false, method, "1" /* id - not empty */,
params);
send_message_event->reply_link = evq_link;
JsonRPCServerLoop::dispatchServerEvent(send_message_event);
// JsonRPCServerLoop::
// dispatchServerEvent(new JsonServerEvent(peer, JsonServerEvent::SendMessage));
ret.push(200);
ret.push("OK");
ret.push(connection_id);
}
void JsonRPCServerLoop::sendMessage(const string& connection_id,
int msg_type,
const string& method,
const string& id,
const string& reply_sink,
AmArg& params,
AmArg& ret) {
// check for presence of connection
// (connection might still be removed until we really
// process the request to send message, but here we already
// catch most failures)
if (getConnection(connection_id)==NULL) {
ret.push(400);
ret.push("Error creating request message");
ret.push("unknown connection");
return;
}
JsonRPCServerLoop::dispatchServerEvent(new JsonServerEvent(peer));
JsonServerSendMessageEvent* ev =
new JsonServerSendMessageEvent(connection_id, msg_type != JSONRPC_MSG_REQUEST,
method, id, params, reply_sink);
ev->is_error = msg_type == JSONRPC_MSG_ERROR;
instance()->postEvent(ev);
// wake up event loop to process message
ev_async_send(loop, &async_w);
ret.push(200);
ret.push("OK");
ret.push("posted");
}
string JsonRPCServerLoop::newConnectionId() {
return AmSession::getNewId();
}
bool JsonRPCServerLoop::registerConnection(JsonrpcPeerConnection* peer, const string& id) {
bool res = false;
connections_mut.lock();
if (connections.find(id) != connections.end())
res = true;
connections[id] = peer;
connections_mut.unlock();
DBG("registered connection '%s'\n", id.c_str());
return res;
}
bool JsonRPCServerLoop::removeConnection(const string& id) {
bool res = false;
connections_mut.lock();
std::map<string, JsonrpcPeerConnection*>::iterator it =
connections.find(id);
if (it != connections.end()) {
res = true;
connections.erase(it);
}
connections_mut.unlock();
DBG("deregistered connection '%s'\n", id.c_str());
return res;
}
JsonrpcPeerConnection* JsonRPCServerLoop::getConnection(const string& id) {
JsonrpcPeerConnection* res = NULL;
connections_mut.lock();
std::map<string, JsonrpcPeerConnection*>::iterator it =
connections.find(id);
if (it != connections.end())
res = it->second;
connections_mut.unlock();
return res;
}

@ -35,18 +35,29 @@
#include "RpcPeer.h"
#include "RpcServerThread.h"
#include "JsonRPCEvents.h"
#include "AmArg.h"
#include <map>
class JsonRPCServerLoop
: public AmThread, public AmEventQueue, public AmEventHandler
{
RpcServerThreadpool threadpool;
static RpcServerThreadpool threadpool;
static ev_async async_w;
static struct ev_loop *loop;
static JsonRPCServerLoop* _instance;
static std::map<string, JsonrpcPeerConnection*> connections;
static AmMutex connections_mut;
static vector<JsonServerEvent*> pending_events; // todo: use map<set<> > if many pending events
static AmMutex pending_events_mut;
public:
JsonRPCServerLoop();
JsonRPCServerLoop();
~JsonRPCServerLoop();
static JsonRPCServerLoop* instance();
@ -63,9 +74,38 @@ class JsonRPCServerLoop
int port, const string& method,
AmArg& params,
AmArg& ret);
static void sendMessage(const string& connection_id,
int msg_type,
const string& method,
const string& id,
const string& reply_sink,
AmArg& params,
AmArg& ret);
void run();
void on_stop();
void process(AmEvent* ev);
static string newConnectionId();
/**
add connection with id
@return whether connection with this id existed before
*/
static bool registerConnection(JsonrpcPeerConnection* peer, const string& id);
/**
remove a connection with id
@return whether connection with this id existed
*/
static bool removeConnection(const string& id);
/**
get a connection with id
@return NULL if not found
*/
static JsonrpcPeerConnection* getConnection(const string& id);
};
#endif

@ -62,45 +62,97 @@ void RpcServerThread::process(AmEvent* event) {
ERROR("invalid event to process\n");
return;
}
JsonrpcNetstringsConnection* conn = server_event->conn;
JsonrpcNetstringsConnection* connection = server_event->conn;
// todo: check event type - for now handle all types equally
if (server_event->event_id == JsonServerEvent::SendMessage) {
JsonServerSendMessageEvent* snd_msg_ev =
dynamic_cast<JsonServerSendMessageEvent*>(server_event);
if (NULL == snd_msg_ev) {
ERROR("wrong event type received\n");
return;
}
if (NULL == connection) {
DBG("getting connection for id %s\n", snd_msg_ev->connection_id.c_str());
JsonrpcPeerConnection* js_connection = JsonRPCServerLoop::getConnection(snd_msg_ev->connection_id);
if ((NULL == js_connection) ||
(NULL ==
(connection = dynamic_cast<JsonrpcNetstringsConnection*>(js_connection)))) {
ERROR("getting connection for id %s - message will not be sent\n",
snd_msg_ev->connection_id.c_str());
return;
}
}
if (!snd_msg_ev->is_reply) {
if (JsonRpcServer::createRequest(snd_msg_ev->reply_link, snd_msg_ev->method,
snd_msg_ev->params, connection,
snd_msg_ev->id.empty())) {
ERROR("creating request\n");
// give back connection into server loop
JsonRPCServerLoop::returnConnection(connection);
return;
}
} else {
if (JsonRpcServer::createReply(connection, snd_msg_ev->id, snd_msg_ev->params,
snd_msg_ev->is_error)) {
// give back connection into server loop
JsonRPCServerLoop::returnConnection(connection);
return;
}
}
connection->msg_recv = false;
}
bool processed_message = false;
int res = 0;
if (conn->messagePending() && conn->messageIsRecv()) {
processed_message = true;
DBG("processing message >%.*s<\n", conn->msg_size, conn->msgbuf);
res = JsonRpcServer::processMessage(conn->msgbuf, &conn->msg_size,
conn);
if (connection->messagePending() && connection->messageIsRecv()) {
DBG("processing message >%.*s<\n", connection->msg_size, connection->msgbuf);
res = JsonRpcServer::processMessage(connection->msgbuf, &connection->msg_size,
connection);
if (res<0) {
INFO("error processing message - closing connection\n");
conn->close();
delete conn;
connection->close();
connection->notifyDisconnect();
JsonRPCServerLoop::removeConnection(connection->id);
delete connection;
return;
}
conn->msg_recv = false;
connection->msg_recv = false;
processed_message = true;
}
if (conn->messagePending() && !conn->messageIsRecv()) {
res = conn->netstringsBlockingWrite();
DBG("connection->messagePending() = %s\n", connection->messagePending()?"true":"false");
if (connection->messagePending() && !connection->messageIsRecv()) {
DBG("calling write\n");
res = connection->netstringsBlockingWrite();
if (res == JsonrpcNetstringsConnection::REMOVE) {
delete conn;
connection->notifyDisconnect();
JsonRPCServerLoop::removeConnection(connection->id);
delete connection;
return;
}
}
if (processed_message &&
(conn->flags & JsonrpcPeerConnection::FL_CLOSE_ALWAYS)) {
(connection->flags & JsonrpcPeerConnection::FL_CLOSE_ALWAYS)) {
DBG("closing connection marked as FL_CLOSE_ALWAYS\n");
conn->close();
delete conn;
connection->close();
connection->notifyDisconnect(); // ??
JsonRPCServerLoop::removeConnection(connection->id);
delete connection;
return;
}
// give back connection into server loop
JsonRPCServerLoop::returnConnection(conn);
JsonRPCServerLoop::returnConnection(connection);
// ev_io_init(&cli->ev_write,write_cb,cli->fd,EV_WRITE);
// ev_io_start(loop,&cli->ev_write);
@ -108,6 +160,11 @@ void RpcServerThread::process(AmEvent* event) {
RpcServerThreadpool::RpcServerThreadpool() {
// one thread is started here so that
// in app initialization code, there is already
// a server thread available to receive events
DBG("starting one server thread for startup requests...\n");
addThreads(1);
}
RpcServerThreadpool::~RpcServerThreadpool() {

@ -33,16 +33,6 @@
#include "AmThread.h"
#include "RpcPeer.h"
struct JsonServerEvent
: public AmEvent {
JsonrpcNetstringsConnection* conn;
JsonServerEvent(JsonrpcNetstringsConnection* c)
: conn(c), AmEvent(121) { }
~JsonServerEvent() { }
};
class RpcServerThread
: public AmThread, public AmEventQueue, public AmEventHandler
{

Loading…
Cancel
Save