From c3629d7c3fbdd7c9deadc759f95b00f476fb49cc Mon Sep 17 00:00:00 2001 From: Stefan Sayer Date: Sat, 19 Jun 2010 17:24:39 +0200 Subject: [PATCH] jsonrpc extensions and fixes o sendMessage DI function to send message into open connection O execServerFunction DI function execute server function as if received through json-rpc o getServerPort DI function get the server port the json-rpc server is listening on o notification events of connection disconnect o various fixes in protocol handling --- apps/jsonrpc/JsonRPC.cpp | 32 ++++ apps/jsonrpc/JsonRPC.h | 1 + apps/jsonrpc/JsonRPCEvents.h | 69 +++++++- apps/jsonrpc/JsonRPCServer.cpp | 79 +++++++-- apps/jsonrpc/JsonRPCServer.h | 5 + apps/jsonrpc/RpcPeer.cpp | 29 +++- apps/jsonrpc/RpcPeer.h | 16 +- apps/jsonrpc/RpcServerLoop.cpp | 285 ++++++++++++++++++++++++++----- apps/jsonrpc/RpcServerLoop.h | 44 ++++- apps/jsonrpc/RpcServerThread.cpp | 89 ++++++++-- apps/jsonrpc/RpcServerThread.h | 10 -- 11 files changed, 564 insertions(+), 95 deletions(-) diff --git a/apps/jsonrpc/JsonRPC.cpp b/apps/jsonrpc/JsonRPC.cpp index 40784f2c..123e3133 100644 --- a/apps/jsonrpc/JsonRPC.cpp +++ b/apps/jsonrpc/JsonRPC.cpp @@ -26,6 +26,7 @@ */ #include "JsonRPC.h" +#include "JsonRPCServer.h" JsonRPCServerModule* JsonRPCServerModule::_instance = NULL; @@ -90,8 +91,26 @@ void JsonRPCServerModule::invoke(const string& method, } execRpc(args, ret); // sendRequestList(args, ret); + } else if (method == "sendMessage"){ + args.assertArrayFmt("sisss"); // conn_id, type, method, id, reply_sink, [params] + if (args.size() > 5) { + if (!isArgArray(args.get(5)) && !isArgStruct(args.get(5))) { + ERROR("internal error: params to JSON-RPC must be struct or array\n"); + throw AmArg::TypeMismatchException(); + } + } + sendMessage(args, ret); + } else if (method == "execServerFunction"){ + args.assertArrayFmt("ss"); // method, id, params + JsonRpcServer::execRpc(args.get(0).asCStr(), args.get(1).asCStr(), args.get(2), ret); + // JsonRpcServer::execRpc(args, ret); + } else if (method == "getServerPort"){ + ret.push(port); } else if(method == "_list"){ ret.push(AmArg("execRpc")); + ret.push(AmArg("sendMessage")); + ret.push(AmArg("getServerPort")); + ret.push(AmArg("execServerFunction")); // ret.push(AmArg("newConnection")); // ret.push(AmArg("sendRequest")); // ret.push(AmArg("sendRequestList")); @@ -115,3 +134,16 @@ void JsonRPCServerModule::execRpc(const AmArg& args, AmArg& ret) { args.get(5).asInt(), args.get(6).asCStr(), params, ret); } + +void JsonRPCServerModule::sendMessage(const AmArg& args, AmArg& ret) { + AmArg none_params; + AmArg& params = none_params; + if (args.size()>5) + params = args.get(5); + JsonRPCServerLoop::sendMessage(args.get(0).asCStr(), // conn_id, + args.get(1).asInt(), // type, (0 == reply) + args.get(2).asCStr(), // method, + args.get(3).asCStr(), // id + args.get(4).asCStr(), // reply_sink + params, ret); +} diff --git a/apps/jsonrpc/JsonRPC.h b/apps/jsonrpc/JsonRPC.h index 5d28e458..c9d61881 100644 --- a/apps/jsonrpc/JsonRPC.h +++ b/apps/jsonrpc/JsonRPC.h @@ -45,6 +45,7 @@ class JsonRPCServerModule // DI methods void execRpc(const AmArg& args, AmArg& ret); + void sendMessage(const AmArg& args, AmArg& ret); public: JsonRPCServerModule(const string& mod_name); diff --git a/apps/jsonrpc/JsonRPCEvents.h b/apps/jsonrpc/JsonRPCEvents.h index 98da9a64..baac783b 100644 --- a/apps/jsonrpc/JsonRPCEvents.h +++ b/apps/jsonrpc/JsonRPCEvents.h @@ -34,8 +34,14 @@ #include using std::string; +#define JSONRPC_MSG_REQUEST 0 +#define JSONRPC_MSG_RESPONSE 1 +#define JSONRPC_MSG_ERROR 2 +class JsonrpcNetstringsConnection; + struct JsonRpcEvent : public AmEvent { + string connection_id; JsonRpcEvent() : AmEvent(122) { } virtual ~JsonRpcEvent() { } @@ -95,16 +101,73 @@ struct JsonRpcRequestEvent struct JsonRpcConnectionEvent : public JsonRpcEvent { - // todo: add connection id enum { DISCONNECT = 0 }; int what; + string connection_id; - JsonRpcConnectionEvent(int what) - : what(what) { } + JsonRpcConnectionEvent(int what, const string& connection_id) + : what(what), connection_id(connection_id) { } ~JsonRpcConnectionEvent() { } }; + +// events used internally: + +struct JsonServerEvent + : public AmEvent { + + enum EventType { + StartReadLoop = 0, + SendMessage + }; + + JsonrpcNetstringsConnection* conn; + string connection_id; + + JsonServerEvent(JsonrpcNetstringsConnection* c, + EventType ev_type) + : conn(c), AmEvent(ev_type) { } + + JsonServerEvent(const string& connection_id, + EventType ev_type) + : connection_id(connection_id), AmEvent(ev_type), + conn(NULL) { } + + ~JsonServerEvent() { } +}; + +struct JsonServerSendMessageEvent + : public JsonServerEvent { + + bool is_reply; + string method; + string id; + AmArg params; + string reply_link; + bool is_error; + + JsonServerSendMessageEvent(const string& connection_id, + bool is_reply, + const string& method, + const string& id, + AmArg& params, + const string& reply_link = "") + : JsonServerEvent(connection_id, SendMessage), + is_reply(is_reply), reply_link(reply_link), + method(method), id(id), params(params) { } + + JsonServerSendMessageEvent(const JsonServerSendMessageEvent& e, + JsonrpcNetstringsConnection* conn) + : JsonServerEvent(conn, SendMessage), + is_reply(e.is_reply),reply_link(e.reply_link), + method(e.method), id(e.id), params(e.params), + is_error(e.is_error) { + connection_id = e.connection_id; + } + +}; + #endif // _JsonRPCEvents_h_ diff --git a/apps/jsonrpc/JsonRPCServer.cpp b/apps/jsonrpc/JsonRPCServer.cpp index 06dfb5ee..715248f0 100644 --- a/apps/jsonrpc/JsonRPCServer.cpp +++ b/apps/jsonrpc/JsonRPCServer.cpp @@ -37,7 +37,8 @@ #include "AmSession.h" #include "AmUtils.h" -int JsonRpcServer::createRequest(const string& evq_link, const string& method, AmArg& params, JsonrpcNetstringsConnection* peer, +int JsonRpcServer::createRequest(const string& evq_link, const string& method, + AmArg& params, JsonrpcNetstringsConnection* peer, bool is_notification) { AmArg rpc_params; rpc_params["jsonrpc"] = "2.0"; @@ -47,7 +48,11 @@ int JsonRpcServer::createRequest(const string& evq_link, const string& method, A peer->req_id++; string req_id = int2str(peer->req_id); rpc_params["id"] = req_id; - peer->replyReceivers[req_id] = evq_link; + + if (!evq_link.empty()) + peer->replyReceivers[req_id] = evq_link; + DBG("registering reply sink '%s' for id %s\n", + evq_link.c_str(), req_id.c_str()); } string rpc_params_json = arg2json(rpc_params); @@ -65,6 +70,31 @@ int JsonRpcServer::createRequest(const string& evq_link, const string& method, A return 0; } +int JsonRpcServer::createReply(JsonrpcNetstringsConnection* peer, + const string& id, AmArg& result, bool is_error) { + + AmArg rpc_res; + rpc_res["id"] = id; + rpc_res["jsonrpc"] = "2.0"; + if (is_error) + rpc_res["error"] = result; + else + rpc_res["result"] = result; + + string res_s = arg2json(rpc_res); + if (res_s.length() > MAX_RPC_MSG_SIZE) { + ERROR("internal error: reply exceeded MAX_RPC_MSG_SIZE (%d)\n", + MAX_RPC_MSG_SIZE); + return -3; + } + + DBG("created RPC reply: >>%.*s<<\n", res_s.length(), res_s.c_str()); + memcpy(peer->msgbuf, res_s.c_str(), res_s.length()); + peer->msg_size = res_s.length(); + + return 0; +} + int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size, JsonrpcPeerConnection* peer) { DBG("parsing message ...\n"); @@ -95,7 +125,7 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size, std::map::iterator rep_recv_q = peer->replyReceivers.find(id); if (rep_recv_q == peer->replyReceivers.end()) { - DBG("received repy for unknown request"); + DBG("received reply for unknown request"); *msg_size = 0; if (peer->flags && JsonrpcPeerConnection::FL_CLOSE_WRONG_REPLY) { @@ -114,6 +144,7 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size, } resp_ev = new JsonRpcResponseEvent(true, id, rpc_params["error"]); } + resp_ev->connection_id = peer->id; bool posted = AmEventDispatcher::instance()-> post(rep_recv_q->second, resp_ev); @@ -135,8 +166,17 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size, } string id; - if (rpc_params.hasMember("id") && isArgCStr(rpc_params["id"])) { - id = rpc_params["id"].asCStr(); + if (rpc_params.hasMember("id")) { + if (isArgCStr(rpc_params["id"])) + id = rpc_params["id"].asCStr(); + else if (isArgInt(rpc_params["id"])) + id = int2str(rpc_params["id"].asInt()); + else if (isArgBool(rpc_params["id"])) + id = rpc_params["id"].asBool() ? "True":"False"; + else { + ERROR("incorrect type for jsonrpc id <%s>\n", + AmArg::print(rpc_params["id"]).c_str()); + } } else { DBG("received notification\n"); } @@ -158,18 +198,19 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size, if (rpc_params.hasMember("params")) { params = rpc_params["params"]; } - JsonRpcRequestEvent* notification_ev = + JsonRpcRequestEvent* request_ev = new JsonRpcRequestEvent(rpc_params["method"].asCStr(), id, params); + request_ev->connection_id = peer->id; bool posted = AmEventDispatcher::instance()-> - post(dst_evqueue, notification_ev); + post(dst_evqueue, request_ev); if (!posted) { DBG("%s receiver event queue '%s' does not exist (any more)\n", id.empty() ? "notification":"request", dst_evqueue.c_str()); - delete notification_ev; + delete request_ev; if (id.empty() && (peer->flags & JsonrpcPeerConnection::FL_CLOSE_NO_NOTIF_RECV)) { INFO("closing connection on missing notification receiver queue\n"); @@ -201,6 +242,7 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size, MAX_RPC_MSG_SIZE); return -3; } + DBG("RPC result: >>%.*s<<\n", res_s.length(), res_s.c_str()); memcpy(msgbuf, res_s.c_str(), res_s.length()); *msg_size = res_s.length(); @@ -210,14 +252,22 @@ int JsonRpcServer::processMessage(char* msgbuf, unsigned int* msg_size, /** rpc_params must contain "method" member as string */ void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) { - try { AmArg none_params; AmArg& params = none_params; if (rpc_params.hasMember("params")) { params = rpc_params["params"]; } - string method = rpc_params["method"].asCStr(); + string id; + if (rpc_params.hasMember("id") && isArgCStr(rpc_params["id"])) + id = rpc_params["id"].asCStr(); + + execRpc(method, id, params, rpc_res); +} + +void JsonRpcServer::execRpc(const string& method, const string& id, const AmArg& params, AmArg& rpc_res) { + + try { size_t dot_pos = method.find('.'); if (dot_pos == string::npos || dot_pos == method.length()) { throw JsonRpcError(-32601, "Method not found", @@ -229,7 +279,7 @@ void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) { try { if (factory == "core") { runCoreMethod(fact_meth, params, rpc_res["result"]); - rpc_res["id"] = rpc_params["id"]; + rpc_res["id"] = id; rpc_res["error"] = AmArg(); // Undef/null rpc_res["jsonrpc"] = "2.0"; return; @@ -265,6 +315,9 @@ void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) { INFO("type mismatch in RPC DI call\n"); throw JsonRpcError(-32602, "Invalid params", "parameters type mismatch in function call"); + } catch (const JsonRpcError& e) { + INFO("JsonRpcError \n"); + throw; } catch (...) { ERROR("unexpected Exception in RPC DI call\n"); throw JsonRpcError(-32000, "Server error", @@ -272,7 +325,7 @@ void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) { } // todo: notification! - rpc_res["id"] = rpc_params["id"]; + rpc_res["id"] = id; rpc_res["jsonrpc"] = "2.0"; } catch (const JsonRpcError& e) { INFO("got JsonRpcError core %d message '%s'\n", @@ -282,7 +335,7 @@ void JsonRpcServer::execRpc(const AmArg& rpc_params, AmArg& rpc_res) { rpc_res["error"]["message"] = e.message; rpc_res["error"]["data"] = e.data; // todo: notification! - rpc_res["id"] = rpc_params["id"]; + rpc_res["id"] = id; rpc_res["jsonrpc"] = "2.0"; return; } diff --git a/apps/jsonrpc/JsonRPCServer.h b/apps/jsonrpc/JsonRPCServer.h index 48b834e2..039c08b6 100644 --- a/apps/jsonrpc/JsonRPCServer.h +++ b/apps/jsonrpc/JsonRPCServer.h @@ -46,7 +46,9 @@ struct JsonRpcError { }; class JsonRpcServer { + public: static void execRpc(const AmArg& rpc_params, AmArg& rpc_res); + static void execRpc(const string& method, const string& id, const AmArg& params, AmArg& rpc_res); static void runCoreMethod(const string& method, const AmArg& params, AmArg& res); public: static int processMessage(char* msgbuf, unsigned int* msg_size, @@ -54,6 +56,9 @@ class JsonRpcServer { static int createRequest(const string& evq_link, const string& method, AmArg& params, JsonrpcNetstringsConnection* peer, bool is_notification = false); + + static int createReply(JsonrpcNetstringsConnection* peer, const string& id, + AmArg& result, bool is_error); }; #endif // _JsonRPCServer_h_ diff --git a/apps/jsonrpc/RpcPeer.cpp b/apps/jsonrpc/RpcPeer.cpp index d086a456..f162eef7 100644 --- a/apps/jsonrpc/RpcPeer.cpp +++ b/apps/jsonrpc/RpcPeer.cpp @@ -39,9 +39,32 @@ using std::string; #include "AmUtils.h" +#include "AmEventDispatcher.h" +#include "JsonRPCEvents.h" -JsonrpcNetstringsConnection::JsonrpcNetstringsConnection() - : fd(0), msg_size(0), rcvd_size(0), in_msg(false), msg_recv(true) +void JsonrpcPeerConnection::notifyDisconnect() { + // let event receivers know about broken connection + DBG("notifying event receivers about broken connection\n"); + if (!notificationReceiver.empty()) + AmEventDispatcher::instance()-> + post(notificationReceiver, + new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT, id)); + if (!requestReceiver.empty()) + AmEventDispatcher::instance()-> + post(requestReceiver, + new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT, id)); + + for (std::map::iterator it= + replyReceivers.begin(); it != replyReceivers.end(); it++) { + AmEventDispatcher::instance()-> + post(it->second, + new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT, id)); + } +} + +JsonrpcNetstringsConnection::JsonrpcNetstringsConnection(const std::string& id) + : JsonrpcPeerConnection(id), + fd(0), msg_size(0), rcvd_size(0), in_msg(false), msg_recv(true) { } @@ -159,7 +182,7 @@ int JsonrpcNetstringsConnection::netstringsRead() { return REMOVE; } - // DBG("received '%c'\n", msgbuf[rcvd_size]); + DBG("received '%c'\n", msgbuf[rcvd_size]); if (msgbuf[rcvd_size] == ':') { msgbuf[rcvd_size] = '\0'; if (str2i(std::string(msgbuf, rcvd_size), msg_size)) { diff --git a/apps/jsonrpc/RpcPeer.h b/apps/jsonrpc/RpcPeer.h index 88942b96..360fba4a 100644 --- a/apps/jsonrpc/RpcPeer.h +++ b/apps/jsonrpc/RpcPeer.h @@ -30,6 +30,7 @@ #include #include +#include "log.h" #define MAX_RPC_MSG_SIZE 20*1024*1024 // 20k #define MAX_NS_LEN_SIZE 10 @@ -39,6 +40,8 @@ #include struct JsonrpcPeerConnection { + std::string id; + // event queue keys that should receive the reply // to requests sent on that connection std::map replyReceivers; @@ -67,7 +70,16 @@ struct JsonrpcPeerConnection { int req_id; - virtual ~JsonrpcPeerConnection() { } + JsonrpcPeerConnection(const std::string& id) + : id(id) { + DBG("created connection '%s'\n", id.c_str()); + } + + virtual ~JsonrpcPeerConnection() { + DBG("destroying connection '%s'\n", id.c_str()); + } + + void notifyDisconnect(); }; struct JsonrpcNetstringsConnection @@ -84,7 +96,7 @@ struct JsonrpcNetstringsConnection bool in_msg; bool msg_recv; - JsonrpcNetstringsConnection(); + JsonrpcNetstringsConnection(const std::string& id); ~JsonrpcNetstringsConnection(); int connect(const std::string& host, int port, std::string& res_str); diff --git a/apps/jsonrpc/RpcServerLoop.cpp b/apps/jsonrpc/RpcServerLoop.cpp index 683de6e3..5676eee3 100644 --- a/apps/jsonrpc/RpcServerLoop.cpp +++ b/apps/jsonrpc/RpcServerLoop.cpp @@ -30,6 +30,7 @@ #include "JsonRPC.h" #include "JsonRPCEvents.h" +#include "AmSession.h" #include "AmEventDispatcher.h" #include "log.h" @@ -55,6 +56,13 @@ ev_io ev_accept; ev_async JsonRPCServerLoop::async_w; struct ev_loop* JsonRPCServerLoop::loop = 0; JsonRPCServerLoop* JsonRPCServerLoop::_instance = NULL; +RpcServerThreadpool JsonRPCServerLoop::threadpool; + +std::map JsonRPCServerLoop::connections; +AmMutex JsonRPCServerLoop::connections_mut; + +vector JsonRPCServerLoop::pending_events; +AmMutex JsonRPCServerLoop::pending_events_mut; JsonRPCServerLoop* JsonRPCServerLoop::instance() { if (_instance == NULL) { @@ -63,8 +71,7 @@ JsonRPCServerLoop* JsonRPCServerLoop::instance() { return _instance; } -int -setnonblock(int fd) +int setnonblock(int fd) { int flags; @@ -95,40 +102,37 @@ setnonblock(int fd) // } static void read_cb(struct ev_loop *loop, struct ev_io *w, int revents) { - struct JsonrpcNetstringsConnection *cli= ((struct JsonrpcNetstringsConnection*) (((char*)w) - - offsetof(JsonrpcNetstringsConnection,ev_read))); + + struct JsonrpcNetstringsConnection *peer= + ((struct JsonrpcNetstringsConnection*) + (((char*)w) - offsetof(JsonrpcNetstringsConnection,ev_read))); + + DBG("read_cb in connection %p\n", peer); + // int r=0; // char rbuff[1024]; if (revents & EV_READ){ - int res = cli->netstringsRead(); + // read message - here in main server thread (more efficient for small messages) + int res = peer->netstringsRead(); switch (res) { case JsonrpcNetstringsConnection::CONTINUE: - ev_io_start(loop,&cli->ev_read); return; + ev_io_start(loop,&peer->ev_read); return; + case JsonrpcNetstringsConnection::REMOVE: { ev_io_stop(EV_A_ w); - // let event receivers know about broken connection - // todo: add connection id - if (!cli->notificationReceiver.empty()) - AmEventDispatcher::instance()->post(cli->notificationReceiver, - new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT)); - if (!cli->requestReceiver.empty()) - AmEventDispatcher::instance()->post(cli->requestReceiver, - new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT)); - for (std::map::iterator it= - cli->replyReceivers.begin(); it != cli->replyReceivers.end(); it++) { - AmEventDispatcher::instance()->post(it->second, - new JsonRpcConnectionEvent(JsonRpcConnectionEvent::DISCONNECT)); - } - delete cli; + peer->notifyDisconnect(); + + JsonRPCServerLoop::instance()->removeConnection(peer->id); + delete peer; + } return; case JsonrpcNetstringsConnection::DISPATCH: { ev_io_stop(EV_A_ w); - JsonRPCServerLoop::dispatchServerEvent(new JsonServerEvent(cli)); + JsonRPCServerLoop:: + dispatchServerEvent(new JsonServerEvent(peer, JsonServerEvent::StartReadLoop)); } return; } - // todo: put into reader thread - //r=read(cli->fd,&rbuff,1024); return; } // put back to read loop @@ -140,13 +144,12 @@ static void read_cb(struct ev_loop *loop, struct ev_io *w, int revents) { } void JsonRPCServerLoop::dispatchServerEvent(AmEvent* ev) { - instance()->threadpool.dispatch(ev); + threadpool.dispatch(ev); } static void accept_cb(struct ev_loop *loop, struct ev_io *w, int revents) { int client_fd; - struct JsonrpcNetstringsConnection *a_client; struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); client_fd = accept(w->fd, (struct sockaddr *)&client_addr, &client_len); @@ -154,14 +157,19 @@ static void accept_cb(struct ev_loop *loop, struct ev_io *w, int revents) return; } - a_client = new JsonrpcNetstringsConnection(); - a_client->fd=client_fd; - if (setnonblock(a_client->fd) < 0) { + string connection_id = JsonRPCServerLoop::newConnectionId(); + JsonrpcNetstringsConnection* peer = new JsonrpcNetstringsConnection(connection_id); + peer->fd=client_fd; + if (setnonblock(peer->fd) < 0) { + delete peer; ERROR("failed to set client socket to non-blocking"); return; } - ev_io_init(&a_client->ev_read,read_cb,a_client->fd,EV_READ); - ev_io_start(loop,&a_client->ev_read); + + JsonRPCServerLoop::registerConnection(peer, connection_id); + + ev_io_init(&peer->ev_read,read_cb,peer->fd,EV_READ); + ev_io_start(loop,&peer->ev_read); } static void async_cb (EV_P_ ev_async *w, int revents) @@ -174,28 +182,102 @@ void JsonRPCServerLoop::_processEvents() { } void JsonRPCServerLoop::process(AmEvent* ev) { - DBG("processing event in server loop\n"); + DBG("server loop - processing event\n"); JsonServerEvent* server_event=dynamic_cast(ev); if (server_event==NULL) { ERROR("wrong event type received\n"); return; } - JsonrpcNetstringsConnection* a_client = server_event->conn; - a_client->resetRead(); - ev_io_init(&a_client->ev_read,read_cb,a_client->fd,EV_READ); - ev_io_start(loop,&a_client->ev_read); + switch (server_event->event_id) { + case JsonServerEvent::StartReadLoop: { + JsonrpcNetstringsConnection* a_client = server_event->conn; + + DBG("checking for pending events to connection %p/%s\n", + a_client, a_client->id.c_str()); + + pending_events_mut.lock(); + // (check whether event for that connection pending) + for (vector::iterator it= + pending_events.begin(); it != pending_events.end(); it++) { + if ((*it)->connection_id == a_client->id) { + // stop read loop + ev_io_stop(loop,&a_client->ev_read); + + JsonServerEvent* server_event = *it; + pending_events.erase(it); + pending_events_mut.unlock(); + + DBG("got pending event for connection '%s'\n", a_client->id.c_str()); + + server_event->conn = a_client; + dispatchServerEvent(server_event); + + return; + } + } + pending_events_mut.unlock(); + DBG("no pending events for connection %p/%s, starting read loop\n", + a_client, a_client->id.c_str()); + + a_client->resetRead(); + ev_io_init(&a_client->ev_read,read_cb,a_client->fd,EV_READ); + ev_io_start(loop,&a_client->ev_read); + }; break; + + case JsonServerEvent::SendMessage: { + JsonServerSendMessageEvent* snd_msg_ev = + dynamic_cast(server_event); + if (snd_msg_ev == NULL) { + ERROR("invalid SendMessage type event received\n"); + return; + } + + JsonrpcPeerConnection* p_peer = getConnection(snd_msg_ev->connection_id); + if (p_peer == NULL) { + WARN("dropping message to inexistent/broken connection '%s' " + "(is_reply=%s, method=%s, id=%s, params='%s')", + snd_msg_ev->connection_id.c_str(), snd_msg_ev->is_reply?"true":"false", + snd_msg_ev->method.c_str(), snd_msg_ev->id.c_str(), + AmArg::print(snd_msg_ev->params).c_str()); + return; + } + JsonrpcNetstringsConnection* peer = dynamic_cast(p_peer); + if (NULL == peer) { + ERROR("invalid connection type\n"); // todo: other transports + return; + } + + if (ev_is_active(&peer->ev_read)) { + // ok, peer is in read loop, we can dispatch it to thread for sending + ev_io_stop(EV_A_ &peer->ev_read); + JsonRPCServerLoop:: + dispatchServerEvent(new JsonServerSendMessageEvent(*snd_msg_ev, peer)); + } else { + // peer is being processed by thread, save event for later sending + pending_events_mut.lock(); + // (need to copy event here: original event is deleted when event processed) + pending_events.push_back(new JsonServerSendMessageEvent(*snd_msg_ev)); + size_t q_size = pending_events.size(); + pending_events_mut.unlock(); + DBG("queued event for connection %s (total %zd events pending)\n", + snd_msg_ev->connection_id.c_str(), q_size); + } + + }; break; + // todo: process remove connection event + + default: + ERROR("unknown server event type received\n"); + return; + } + } JsonRPCServerLoop::JsonRPCServerLoop() : AmEventQueue(this) { loop = ev_default_loop (0); - // one thread is started here so that - // in app initialization code, there is already - // a server thread available to receive events - DBG("starting one server thread for startup requests...\n"); - threadpool.addThreads(1); } @@ -239,6 +321,7 @@ void JsonRPCServerLoop::run() { ev_io_init(&ev_accept,accept_cb,listen_fd,EV_READ); ev_io_start(loop,&ev_accept); + // async watcher to process our events in event loop ev_async_init (&async_w, async_cb); ev_async_start (EV_A_ &async_w); @@ -252,8 +335,29 @@ void JsonRPCServerLoop::on_stop() { } void JsonRPCServerLoop::returnConnection(JsonrpcNetstringsConnection* conn) { + pending_events_mut.lock(); + // (check whether event for that connection pending) + DBG("checking %u pending events\n", pending_events.size()); + for (vector::iterator it= + pending_events.begin(); it != pending_events.end(); it++) { + DBG("%s vs %s\n", (*it)->connection_id.c_str(),conn->id.c_str()); + if ((*it)->connection_id == conn->id) { + JsonServerEvent* server_event = *it; + pending_events.erase(it); + pending_events_mut.unlock(); + + DBG("got pending event for connection '%s'\n", conn->id.c_str()); + + server_event->conn = conn; + dispatchServerEvent(server_event); + return; + } + } + pending_events_mut.unlock(); + DBG("returning connection %p\n", conn); - instance()->postEvent(new JsonServerEvent(conn)); + instance()->postEvent(new JsonServerEvent(conn, JsonServerEvent::StartReadLoop)); + ev_async_send(loop, &async_w); } @@ -265,7 +369,8 @@ void JsonRPCServerLoop::execRpc(const string& evq_link, int port, const string& method, AmArg& params, AmArg& ret) { - JsonrpcNetstringsConnection* peer = new JsonrpcNetstringsConnection(); + string connection_id = newConnectionId(); + JsonrpcNetstringsConnection* peer = new JsonrpcNetstringsConnection(connection_id); peer->flags = flags; peer->notificationReceiver = notificationReceiver; peer->requestReceiver = requestReceiver; @@ -279,13 +384,101 @@ void JsonRPCServerLoop::execRpc(const string& evq_link, return; } - if (JsonRpcServer::createRequest(evq_link, method, params, peer)) { + // DBG("evq_link = '%s'\n", evq_link.c_str()); + // if (JsonRpcServer::createRequest(evq_link, method, params, peer)) { + // ret.push(400); + // ret.push("Error creating request message"); + // delete peer; + // return; + // } + + registerConnection(peer, connection_id); + + DBG("dispatching JsonServerSendMessageEvent\n"); + JsonServerSendMessageEvent* send_message_event = + new JsonServerSendMessageEvent(connection_id, false, method, "1" /* id - not empty */, + params); + + send_message_event->reply_link = evq_link; + + JsonRPCServerLoop::dispatchServerEvent(send_message_event); + + // JsonRPCServerLoop:: + // dispatchServerEvent(new JsonServerEvent(peer, JsonServerEvent::SendMessage)); + + ret.push(200); + ret.push("OK"); + ret.push(connection_id); +} + +void JsonRPCServerLoop::sendMessage(const string& connection_id, + int msg_type, + const string& method, + const string& id, + const string& reply_sink, + AmArg& params, + AmArg& ret) { + // check for presence of connection + // (connection might still be removed until we really + // process the request to send message, but here we already + // catch most failures) + if (getConnection(connection_id)==NULL) { ret.push(400); - ret.push("Error creating request message"); + ret.push("unknown connection"); + return; } - JsonRPCServerLoop::dispatchServerEvent(new JsonServerEvent(peer)); + JsonServerSendMessageEvent* ev = + new JsonServerSendMessageEvent(connection_id, msg_type != JSONRPC_MSG_REQUEST, + method, id, params, reply_sink); + ev->is_error = msg_type == JSONRPC_MSG_ERROR; + instance()->postEvent(ev); + + // wake up event loop to process message + ev_async_send(loop, &async_w); ret.push(200); - ret.push("OK"); + ret.push("posted"); +} + +string JsonRPCServerLoop::newConnectionId() { + return AmSession::getNewId(); +} + +bool JsonRPCServerLoop::registerConnection(JsonrpcPeerConnection* peer, const string& id) { + bool res = false; + connections_mut.lock(); + if (connections.find(id) != connections.end()) + res = true; + connections[id] = peer; + connections_mut.unlock(); + + DBG("registered connection '%s'\n", id.c_str()); + return res; } + +bool JsonRPCServerLoop::removeConnection(const string& id) { + bool res = false; + connections_mut.lock(); + std::map::iterator it = + connections.find(id); + if (it != connections.end()) { + res = true; + connections.erase(it); + } + connections_mut.unlock(); + DBG("deregistered connection '%s'\n", id.c_str()); + return res; +} + +JsonrpcPeerConnection* JsonRPCServerLoop::getConnection(const string& id) { + JsonrpcPeerConnection* res = NULL; + connections_mut.lock(); + std::map::iterator it = + connections.find(id); + if (it != connections.end()) + res = it->second; + connections_mut.unlock(); + return res; +} + diff --git a/apps/jsonrpc/RpcServerLoop.h b/apps/jsonrpc/RpcServerLoop.h index 7d43d6d6..e4382778 100644 --- a/apps/jsonrpc/RpcServerLoop.h +++ b/apps/jsonrpc/RpcServerLoop.h @@ -35,18 +35,29 @@ #include "RpcPeer.h" #include "RpcServerThread.h" +#include "JsonRPCEvents.h" + #include "AmArg.h" + +#include + class JsonRPCServerLoop : public AmThread, public AmEventQueue, public AmEventHandler { - RpcServerThreadpool threadpool; + static RpcServerThreadpool threadpool; static ev_async async_w; static struct ev_loop *loop; static JsonRPCServerLoop* _instance; + static std::map connections; + static AmMutex connections_mut; + + static vector pending_events; // todo: use map > if many pending events + static AmMutex pending_events_mut; + public: - JsonRPCServerLoop(); + JsonRPCServerLoop(); ~JsonRPCServerLoop(); static JsonRPCServerLoop* instance(); @@ -63,9 +74,38 @@ class JsonRPCServerLoop int port, const string& method, AmArg& params, AmArg& ret); + + static void sendMessage(const string& connection_id, + int msg_type, + const string& method, + const string& id, + const string& reply_sink, + AmArg& params, + AmArg& ret); void run(); void on_stop(); void process(AmEvent* ev); + + static string newConnectionId(); + + /** + add connection with id + @return whether connection with this id existed before + */ + static bool registerConnection(JsonrpcPeerConnection* peer, const string& id); + + /** + remove a connection with id + @return whether connection with this id existed + */ + static bool removeConnection(const string& id); + + /** + get a connection with id + @return NULL if not found + */ + static JsonrpcPeerConnection* getConnection(const string& id); + }; #endif diff --git a/apps/jsonrpc/RpcServerThread.cpp b/apps/jsonrpc/RpcServerThread.cpp index 3c448594..3225a3e3 100644 --- a/apps/jsonrpc/RpcServerThread.cpp +++ b/apps/jsonrpc/RpcServerThread.cpp @@ -62,45 +62,97 @@ void RpcServerThread::process(AmEvent* event) { ERROR("invalid event to process\n"); return; } - JsonrpcNetstringsConnection* conn = server_event->conn; + JsonrpcNetstringsConnection* connection = server_event->conn; + // todo: check event type - for now handle all types equally + + if (server_event->event_id == JsonServerEvent::SendMessage) { + JsonServerSendMessageEvent* snd_msg_ev = + dynamic_cast(server_event); + + if (NULL == snd_msg_ev) { + ERROR("wrong event type received\n"); + return; + } + + if (NULL == connection) { + DBG("getting connection for id %s\n", snd_msg_ev->connection_id.c_str()); + JsonrpcPeerConnection* js_connection = JsonRPCServerLoop::getConnection(snd_msg_ev->connection_id); + if ((NULL == js_connection) || + (NULL == + (connection = dynamic_cast(js_connection)))) { + ERROR("getting connection for id %s - message will not be sent\n", + snd_msg_ev->connection_id.c_str()); + return; + } + } + + if (!snd_msg_ev->is_reply) { + if (JsonRpcServer::createRequest(snd_msg_ev->reply_link, snd_msg_ev->method, + snd_msg_ev->params, connection, + snd_msg_ev->id.empty())) { + ERROR("creating request\n"); + // give back connection into server loop + JsonRPCServerLoop::returnConnection(connection); + return; + } + } else { + if (JsonRpcServer::createReply(connection, snd_msg_ev->id, snd_msg_ev->params, + snd_msg_ev->is_error)) { + // give back connection into server loop + JsonRPCServerLoop::returnConnection(connection); + return; + } + } + connection->msg_recv = false; + + } bool processed_message = false; int res = 0; - if (conn->messagePending() && conn->messageIsRecv()) { - processed_message = true; - DBG("processing message >%.*s<\n", conn->msg_size, conn->msgbuf); - res = JsonRpcServer::processMessage(conn->msgbuf, &conn->msg_size, - conn); + if (connection->messagePending() && connection->messageIsRecv()) { + DBG("processing message >%.*s<\n", connection->msg_size, connection->msgbuf); + res = JsonRpcServer::processMessage(connection->msgbuf, &connection->msg_size, + connection); if (res<0) { INFO("error processing message - closing connection\n"); - conn->close(); - delete conn; + connection->close(); + connection->notifyDisconnect(); + JsonRPCServerLoop::removeConnection(connection->id); + delete connection; return; } - conn->msg_recv = false; + connection->msg_recv = false; + processed_message = true; } - if (conn->messagePending() && !conn->messageIsRecv()) { - res = conn->netstringsBlockingWrite(); + DBG("connection->messagePending() = %s\n", connection->messagePending()?"true":"false"); + + if (connection->messagePending() && !connection->messageIsRecv()) { + DBG("calling write\n"); + res = connection->netstringsBlockingWrite(); if (res == JsonrpcNetstringsConnection::REMOVE) { - delete conn; + connection->notifyDisconnect(); + JsonRPCServerLoop::removeConnection(connection->id); + delete connection; return; } } if (processed_message && - (conn->flags & JsonrpcPeerConnection::FL_CLOSE_ALWAYS)) { + (connection->flags & JsonrpcPeerConnection::FL_CLOSE_ALWAYS)) { DBG("closing connection marked as FL_CLOSE_ALWAYS\n"); - conn->close(); - delete conn; + connection->close(); + connection->notifyDisconnect(); // ?? + JsonRPCServerLoop::removeConnection(connection->id); + delete connection; return; } // give back connection into server loop - JsonRPCServerLoop::returnConnection(conn); + JsonRPCServerLoop::returnConnection(connection); // ev_io_init(&cli->ev_write,write_cb,cli->fd,EV_WRITE); // ev_io_start(loop,&cli->ev_write); @@ -108,6 +160,11 @@ void RpcServerThread::process(AmEvent* event) { RpcServerThreadpool::RpcServerThreadpool() { + // one thread is started here so that + // in app initialization code, there is already + // a server thread available to receive events + DBG("starting one server thread for startup requests...\n"); + addThreads(1); } RpcServerThreadpool::~RpcServerThreadpool() { diff --git a/apps/jsonrpc/RpcServerThread.h b/apps/jsonrpc/RpcServerThread.h index 67bcfbb0..ab3ed206 100644 --- a/apps/jsonrpc/RpcServerThread.h +++ b/apps/jsonrpc/RpcServerThread.h @@ -33,16 +33,6 @@ #include "AmThread.h" #include "RpcPeer.h" -struct JsonServerEvent - : public AmEvent { - - JsonrpcNetstringsConnection* conn; - - JsonServerEvent(JsonrpcNetstringsConnection* c) - : conn(c), AmEvent(121) { } - ~JsonServerEvent() { } -}; - class RpcServerThread : public AmThread, public AmEventQueue, public AmEventHandler {