mirror of https://github.com/sipwise/kamailio.git
* now we are using standard json-rpc witn ngcp-lnpd Change-Id: Ide6edf30c884a7693431cfdb6f953b661aaeb398changes/11/6711/1
parent
2e505d2083
commit
3835a76394
@ -1,571 +0,0 @@
|
||||
From 2316cc28b325917ed35ef609cd74678696871ac1 Mon Sep 17 00:00:00 2001
|
||||
From: Victor Seva <linuxmaniac@torreviejawireless.org>
|
||||
Date: Tue, 4 Aug 2015 10:43:42 +0200
|
||||
Subject: [PATCH] janssonrpc-c: fakerpc
|
||||
|
||||
---
|
||||
modules/janssonrpc-c/janssonrpc.h | 1 +
|
||||
modules/janssonrpc-c/janssonrpc_io.c | 268 ++++++++++++++++++++++++------
|
||||
modules/janssonrpc-c/janssonrpc_io.h | 9 +
|
||||
modules/janssonrpc-c/janssonrpc_mod.c | 2 +
|
||||
modules/janssonrpc-c/janssonrpc_request.c | 97 ++++++-----
|
||||
modules/janssonrpc-c/janssonrpc_request.h | 2 +
|
||||
modules/janssonrpc-c/netstring.c | 20 +++
|
||||
modules/janssonrpc-c/netstring.h | 1 +
|
||||
8 files changed, 305 insertions(+), 95 deletions(-)
|
||||
|
||||
diff --git a/modules/janssonrpc-c/janssonrpc.h b/modules/janssonrpc-c/janssonrpc.h
|
||||
index cfdf5e3..61c473f 100644
|
||||
--- a/modules/janssonrpc-c/janssonrpc.h
|
||||
+++ b/modules/janssonrpc-c/janssonrpc.h
|
||||
@@ -89,6 +89,7 @@ typedef struct retry_range {
|
||||
int cmd_pipe;
|
||||
extern const str null_str;
|
||||
str result_pv_str;
|
||||
+extern int fakerpc;
|
||||
retry_range_t* global_retry_ranges;
|
||||
|
||||
static inline str pkg_strdup(str src)
|
||||
diff --git a/modules/janssonrpc-c/janssonrpc_io.c b/modules/janssonrpc-c/janssonrpc_io.c
|
||||
index ffcc247..8a08ec6 100644
|
||||
--- a/modules/janssonrpc-c/janssonrpc_io.c
|
||||
+++ b/modules/janssonrpc-c/janssonrpc_io.c
|
||||
@@ -353,7 +353,14 @@ int jsonrpc_send(str conn, jsonrpc_request_t* req, bool notify_only)
|
||||
|
||||
char* ns;
|
||||
size_t bytes;
|
||||
- bytes = netstring_encode_new(&ns, json, (size_t)strlen(json));
|
||||
+ if(fakerpc) {
|
||||
+ bytes = strlen(json)+1;
|
||||
+ ns = pkg_malloc(sizeof(char)*bytes);
|
||||
+ strncpy(ns, json, bytes);
|
||||
+ ns[bytes-1] = '\n';
|
||||
+ } else {
|
||||
+ bytes = netstring_encode_new(&ns, json, (size_t)strlen(json));
|
||||
+ }
|
||||
|
||||
bool sent = false;
|
||||
jsonrpc_server_group_t* c_grp = NULL;
|
||||
@@ -417,6 +424,8 @@ int jsonrpc_send(str conn, jsonrpc_request_t* req, bool notify_only)
|
||||
if(schedule_retry(req)<0) {
|
||||
fail_request(JRPC_ERR_RETRY, req, "Failed to schedule retry");
|
||||
}
|
||||
+ } else {
|
||||
+ DEBUG("SENT DATA[%s]\n", ns);
|
||||
}
|
||||
|
||||
free_server_list(tried_servers);
|
||||
@@ -670,28 +679,179 @@ end:
|
||||
return retval;
|
||||
}
|
||||
|
||||
-void handle_netstring(jsonrpc_server_t* server)
|
||||
+int find_fakerpc_type(json_t* response)
|
||||
{
|
||||
- unsigned int old_count = server->req_count;
|
||||
- server->req_count--;
|
||||
- if (server->hwm > 0
|
||||
- && old_count >= server->hwm
|
||||
- && server->req_count < server->hwm) {
|
||||
- INFO("%.*s:%d in connection group %.*s is back to normal\n",
|
||||
- STR(server->addr), server->port, STR(server->conn));
|
||||
+ if(!response) return -1;
|
||||
+ const char* s;
|
||||
+ size_t size_type;
|
||||
+ json_t *t = json_object_get(response, "messageType");
|
||||
+
|
||||
+ if(t) {
|
||||
+ s = json_string_value(t);
|
||||
+ } else {
|
||||
+ t = json_object_get(response, JANSSONRPC_FAKERPC_CON);
|
||||
+ if(t) {
|
||||
+ LM_DBG(JANSSONRPC_FAKERPC_CON);
|
||||
+ return 5;
|
||||
+ } else {
|
||||
+ LM_DBG("Response received without 'messageType'.\n");
|
||||
+ return -1;
|
||||
+ }
|
||||
}
|
||||
+ size_type = strlen(s);
|
||||
+ switch(size_type) {
|
||||
+ case JANSSONRPC_KEEPALIVE_RES_SIZE:
|
||||
+ if(strncmp(s, JANSSONRPC_KEEPALIVE_RES, size_type)==0) return 1;
|
||||
+ break;
|
||||
+ case JANSSONRPC_KEEPALIVE_REQ_SIZE:
|
||||
+ if(strncmp(s, JANSSONRPC_KEEPALIVE_REQ, size_type)==0) return 2;
|
||||
+ break;
|
||||
+ case JANSSONRPC_FAKERPC_RES_SIZE:
|
||||
+ if(strncmp(s, JANSSONRPC_FAKERPC_RES, size_type)==0) return 3;
|
||||
+ break;
|
||||
+ case JANSSONRPC_FAKERPC_REQ_SIZE:
|
||||
+ if(strncmp(s, JANSSONRPC_FAKERPC_REQ, size_type)==0) return 4;
|
||||
+ break;
|
||||
+ }
|
||||
+ return 0;
|
||||
+}
|
||||
|
||||
- json_error_t error;
|
||||
+int handle_response_string(json_t* response)
|
||||
+{
|
||||
+ int retval = 0;
|
||||
+ jsonrpc_request_t* req = NULL;
|
||||
+ char* freeme = NULL;
|
||||
+
|
||||
+
|
||||
+ /* check if json object */
|
||||
+ if(!json_is_object(response)){
|
||||
+ WARN("jsonrpc response is not an object\n");
|
||||
+ return -1;
|
||||
+ }
|
||||
+
|
||||
+ /* check for an id */
|
||||
+ json_t* _id = json_object_get(response, "id");
|
||||
+ if(!_id) {
|
||||
+ WARN("jsonrpc response does not have an id.\n");
|
||||
+ retval = -1;
|
||||
+ goto end;
|
||||
+ }
|
||||
+ int id = -1;
|
||||
+ if(json_is_integer(_id)) {
|
||||
+ id = json_integer_value(_id);
|
||||
+ } else if(json_is_string(_id)) {
|
||||
+ const char *s = json_string_value(_id);
|
||||
+ id = atoi(s);
|
||||
+ }
|
||||
+ if (!(req = pop_request(id))) {
|
||||
+ /* don't fail the server for an unrecognized id */
|
||||
+ LM_DBG("no id:%d found in the pool of requests\n", id);
|
||||
+ retval = 0;
|
||||
+ goto end;
|
||||
+ }
|
||||
+
|
||||
+ json_t* error = json_object_get(response, "errorCode");
|
||||
+
|
||||
+ pv_value_t val;
|
||||
+
|
||||
+ if(jsontoval(&val, &freeme, response)<0) {
|
||||
+ fail_request(
|
||||
+ JRPC_ERR_TO_VAL,
|
||||
+ req,
|
||||
+ "Failed to convert response json to pv\n");
|
||||
+ retval = -1;
|
||||
+ goto end;
|
||||
+ }
|
||||
+
|
||||
+ char* error_s = NULL;
|
||||
+
|
||||
+ if(send_to_script(&val, req->cmd)>=0) {
|
||||
+ goto free_and_end;
|
||||
+ }
|
||||
+
|
||||
+ if(error) {
|
||||
+ // get code from error
|
||||
+ json_t* _code = json_object_get(error, "code");
|
||||
+ if(_code) {
|
||||
+ int code = json_integer_value(_code);
|
||||
+
|
||||
+ // check if code is in global_retry_ranges
|
||||
+ retry_range_t* tmpr;
|
||||
+ for(tmpr = global_retry_ranges;
|
||||
+ tmpr != NULL;
|
||||
+ tmpr = tmpr->next) {
|
||||
+ if((tmpr->start < tmpr->end
|
||||
+ && tmpr->start <= code && code <= tmpr->end)
|
||||
+ || (tmpr->end < tmpr->start
|
||||
+ && tmpr->end <= code && code <= tmpr->start)
|
||||
+ || (tmpr->start == tmpr->end && tmpr->start == code)) {
|
||||
+ if(schedule_retry(req)==0) {
|
||||
+ goto end;
|
||||
+ }
|
||||
+ break;
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ }
|
||||
+ error_s = json_dumps(error, JSON_COMPACT);
|
||||
+ if(error_s) {
|
||||
+ WARN("Request received an error: \n%s\n", error_s);
|
||||
+ free(error_s);
|
||||
+ } else {
|
||||
+ fail_request(
|
||||
+ JRPC_ERR_BAD_RESP,
|
||||
+ req,
|
||||
+ "Could not convert 'error' response to string");
|
||||
+ retval = -1;
|
||||
+ goto end;
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+
|
||||
+free_and_end:
|
||||
+ free_req_cmd(req->cmd);
|
||||
+ free_request(req);
|
||||
+
|
||||
+end:
|
||||
+ if(freeme) free(freeme);
|
||||
+ return retval;
|
||||
+}
|
||||
|
||||
+void handle_netstring(jsonrpc_server_t* server)
|
||||
+{
|
||||
+ json_error_t error;
|
||||
json_t* res = json_loads(server->buffer->string, 0, &error);
|
||||
|
||||
+ int type = find_fakerpc_type(res);
|
||||
+ if(!fakerpc || type == 3) {
|
||||
+ unsigned int old_count = server->req_count;
|
||||
+ server->req_count--;
|
||||
+ if (server->hwm > 0
|
||||
+ && old_count >= server->hwm
|
||||
+ && server->req_count < server->hwm) {
|
||||
+ INFO("%.*s:%d in connection group %.*s is back to normal\n",
|
||||
+ STR(server->addr), server->port, STR(server->conn));
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
if (res) {
|
||||
- if(handle_response(res)<0){
|
||||
+ if(fakerpc) {
|
||||
+ if(type==3) {
|
||||
+ if(handle_response_string(res)<0) {
|
||||
+ ERR("Cannot handle jsonrpc response: %s\n",
|
||||
+ server->buffer->string);
|
||||
+ }
|
||||
+ } else {
|
||||
+ LM_DBG("msg %s dismissed[%d]\n",
|
||||
+ ZSW(server->buffer->string), type);
|
||||
+ }
|
||||
+ } else {
|
||||
+ if (handle_response(res)<0)
|
||||
ERR("Cannot handle jsonrpc response: %s\n", server->buffer->string);
|
||||
}
|
||||
json_decref(res);
|
||||
} else {
|
||||
- ERR("Failed to parse json: %s\n", server->buffer->string);
|
||||
+ ERR("Failed to parse json: %s\n", ZSW(server->buffer->string));
|
||||
ERR("PARSE ERROR: %s at %d,%d\n",
|
||||
error.text, error.line, error.column);
|
||||
}
|
||||
@@ -701,49 +861,51 @@ void bev_read_cb(struct bufferevent* bev, void* arg)
|
||||
{
|
||||
jsonrpc_server_t* server = (jsonrpc_server_t*)arg;
|
||||
int retval = 0;
|
||||
- while (retval == 0) {
|
||||
- int retval = netstring_read_evbuffer(bev, &server->buffer);
|
||||
-
|
||||
- if (retval == NETSTRING_INCOMPLETE) {
|
||||
- return;
|
||||
- } else if (retval < 0) {
|
||||
- char* msg = "";
|
||||
- switch(retval) {
|
||||
- case NETSTRING_ERROR_TOO_LONG:
|
||||
- msg = "too long";
|
||||
- break;
|
||||
- case NETSTRING_ERROR_NO_COLON:
|
||||
- msg = "no colon after length field";
|
||||
- break;
|
||||
- case NETSTRING_ERROR_TOO_SHORT:
|
||||
- msg = "too short";
|
||||
- break;
|
||||
- case NETSTRING_ERROR_NO_COMMA:
|
||||
- msg = "missing comma";
|
||||
- break;
|
||||
- case NETSTRING_ERROR_LEADING_ZERO:
|
||||
- msg = "length field has a leading zero";
|
||||
- break;
|
||||
- case NETSTRING_ERROR_NO_LENGTH:
|
||||
- msg = "missing length field";
|
||||
- break;
|
||||
- case NETSTRING_INCOMPLETE:
|
||||
- msg = "incomplete";
|
||||
- break;
|
||||
- default:
|
||||
- ERR("bad netstring: unknown error (%d)\n", retval);
|
||||
- goto reconnect;
|
||||
- }
|
||||
- ERR("bad netstring: %s\n", msg);
|
||||
-reconnect:
|
||||
- force_reconnect(server);
|
||||
- return;
|
||||
- }
|
||||
|
||||
- handle_netstring(server);
|
||||
- free_netstring(server->buffer);
|
||||
- server->buffer = NULL;
|
||||
+ if(fakerpc) retval = string_read_evbuffer(bev, &server->buffer);
|
||||
+ else retval = netstring_read_evbuffer(bev, &server->buffer);
|
||||
+
|
||||
+ if (retval == NETSTRING_INCOMPLETE) {
|
||||
+ LM_DBG("NETSTRING_INCOMPLETE\n");
|
||||
+ return;
|
||||
+ } else if (retval < 0) {
|
||||
+ char* msg = "";
|
||||
+ switch(retval) {
|
||||
+ case NETSTRING_ERROR_TOO_LONG:
|
||||
+ msg = "too long";
|
||||
+ break;
|
||||
+ case NETSTRING_ERROR_NO_COLON:
|
||||
+ msg = "no colon after length field";
|
||||
+ break;
|
||||
+ case NETSTRING_ERROR_TOO_SHORT:
|
||||
+ msg = "too short";
|
||||
+ break;
|
||||
+ case NETSTRING_ERROR_NO_COMMA:
|
||||
+ msg = "missing comma";
|
||||
+ break;
|
||||
+ case NETSTRING_ERROR_LEADING_ZERO:
|
||||
+ msg = "length field has a leading zero";
|
||||
+ break;
|
||||
+ case NETSTRING_ERROR_NO_LENGTH:
|
||||
+ msg = "missing length field";
|
||||
+ break;
|
||||
+ case NETSTRING_INCOMPLETE:
|
||||
+ msg = "incomplete";
|
||||
+ break;
|
||||
+ default:
|
||||
+ ERR("bad netstring: unknown error (%d)\n", retval);
|
||||
+ goto reconnect;
|
||||
+ }
|
||||
+ ERR("bad netstring: %s\n", msg);
|
||||
+reconnect:
|
||||
+ force_reconnect(server);
|
||||
+ return;
|
||||
}
|
||||
+
|
||||
+ handle_netstring(server);
|
||||
+ free_netstring(server->buffer);
|
||||
+ server->buffer = NULL;
|
||||
+
|
||||
}
|
||||
|
||||
int set_non_blocking(int fd)
|
||||
diff --git a/modules/janssonrpc-c/janssonrpc_io.h b/modules/janssonrpc-c/janssonrpc_io.h
|
||||
index 5abb75e..652c0be 100644
|
||||
--- a/modules/janssonrpc-c/janssonrpc_io.h
|
||||
+++ b/modules/janssonrpc-c/janssonrpc_io.h
|
||||
@@ -86,5 +86,14 @@ typedef enum
|
||||
, JRPC_ERR_REQ_BUILD = -1
|
||||
} jsonrpc_error;
|
||||
|
||||
+#define JANSSONRPC_KEEPALIVE_RES "heartbeatResponse"
|
||||
+#define JANSSONRPC_KEEPALIVE_RES_SIZE 17
|
||||
+#define JANSSONRPC_KEEPALIVE_REQ "heartbeatRequest"
|
||||
+#define JANSSONRPC_KEEPALIVE_REQ_SIZE 16
|
||||
+#define JANSSONRPC_FAKERPC_RES "npResponse"
|
||||
+#define JANSSONRPC_FAKERPC_RES_SIZE 10
|
||||
+#define JANSSONRPC_FAKERPC_REQ "npRequest"
|
||||
+#define JANSSONRPC_FAKERPC_REQ_SIZE 9
|
||||
+#define JANSSONRPC_FAKERPC_CON "connectionPermission"
|
||||
|
||||
#endif /* _JSONRPC_IO_H_ */
|
||||
diff --git a/modules/janssonrpc-c/janssonrpc_mod.c b/modules/janssonrpc-c/janssonrpc_mod.c
|
||||
index 7db22cc..2e9c27c 100644
|
||||
--- a/modules/janssonrpc-c/janssonrpc_mod.c
|
||||
+++ b/modules/janssonrpc-c/janssonrpc_mod.c
|
||||
@@ -57,6 +57,7 @@ static int fixup_notify_free(void** param, int param_no);
|
||||
int fixup_pvar_shm(void** param, int param_no);
|
||||
|
||||
int pipe_fds[2] = {-1,-1};
|
||||
+int fakerpc = 0;
|
||||
|
||||
struct tm_binds tmb;
|
||||
|
||||
@@ -90,6 +91,7 @@ static param_export_t mod_params[]={
|
||||
{"retry_codes", STR_PARAM|USE_FUNC_PARAM, (void*)parse_retry_codes_param},
|
||||
{"min_srv_ttl", INT_PARAM|USE_FUNC_PARAM, (void*)parse_min_ttl_param},
|
||||
{"result_pv", STR_PARAM, &result_pv_str.s},
|
||||
+ {"fakerpc", INT_PARAM, &fakerpc},
|
||||
{ 0,0,0 }
|
||||
};
|
||||
|
||||
diff --git a/modules/janssonrpc-c/janssonrpc_request.c b/modules/janssonrpc-c/janssonrpc_request.c
|
||||
index 39c7f31..904a58a 100644
|
||||
--- a/modules/janssonrpc-c/janssonrpc_request.c
|
||||
+++ b/modules/janssonrpc-c/janssonrpc_request.c
|
||||
@@ -88,6 +88,16 @@ void free_request(jsonrpc_request_t* req)
|
||||
pkg_free(req);
|
||||
}
|
||||
|
||||
+int get_next_id(void)
|
||||
+{
|
||||
+ if (next_id>JSONRPC_MAX_ID) {
|
||||
+ next_id = 1;
|
||||
+ } else {
|
||||
+ next_id++;
|
||||
+ }
|
||||
+ return next_id;
|
||||
+}
|
||||
+
|
||||
jsonrpc_request_t* create_request(jsonrpc_req_cmd_t* cmd)
|
||||
{
|
||||
if (cmd == NULL) {
|
||||
@@ -117,22 +127,35 @@ jsonrpc_request_t* create_request(jsonrpc_req_cmd_t* cmd)
|
||||
req->ntries = 0;
|
||||
req->next = NULL;
|
||||
|
||||
- req->payload = json_object();
|
||||
- if(!(req->payload)) {
|
||||
- ERR("Failed to create request payload\n");
|
||||
- goto fail;
|
||||
+ json_t* params = NULL;
|
||||
+ json_error_t error;
|
||||
+ if(cmd->params.len > 0) {
|
||||
+ params = json_loads(cmd->params.s, 0, &error);
|
||||
+ if(!params) {
|
||||
+ ERR("Failed to parse json: %.*s\n", STR(cmd->params));
|
||||
+ ERR("PARSE ERROR: %s at %d,%d\n",
|
||||
+ error.text, error.line, error.column);
|
||||
+ goto fail;
|
||||
+ }
|
||||
}
|
||||
|
||||
- if(req->type == RPC_REQUEST) {
|
||||
- if (next_id>JSONRPC_MAX_ID) {
|
||||
- next_id = 1;
|
||||
- } else {
|
||||
- next_id++;
|
||||
+ if(fakerpc) {
|
||||
+ req->payload = params;
|
||||
+ } else {
|
||||
+ req->payload = json_object();
|
||||
+ if(!(req->payload)) {
|
||||
+ ERR("Failed to create request payload\n");
|
||||
+ goto fail;
|
||||
}
|
||||
- req->id = next_id;
|
||||
+ }
|
||||
+
|
||||
+ if(req->type == RPC_REQUEST) {
|
||||
+ req->id = get_next_id();
|
||||
req->timeout = cmd->timeout;
|
||||
|
||||
- json_t* id_js = json_integer(next_id);
|
||||
+ json_t* id_js; int id;
|
||||
+ if(fakerpc) id_js = json_string(int2str(req->id, &id));
|
||||
+ else id_js = json_integer(req->id);
|
||||
if(id_js) {
|
||||
json_object_set(req->payload, "id", id_js);
|
||||
json_decref(id_js);
|
||||
@@ -155,44 +178,34 @@ jsonrpc_request_t* create_request(jsonrpc_req_cmd_t* cmd)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
- json_t* version_js = json_string(JSONRPC_VERSION);
|
||||
- if(version_js) {
|
||||
- json_object_set(req->payload, "jsonrpc", version_js);
|
||||
- json_decref(version_js);
|
||||
- } else {
|
||||
- ERR("Failed to create request version\n");
|
||||
- goto fail;
|
||||
- }
|
||||
+ if(!fakerpc) {
|
||||
+ json_t* version_js = json_string(JSONRPC_VERSION);
|
||||
+ if(version_js) {
|
||||
+ json_object_set(req->payload, "jsonrpc", version_js);
|
||||
+ json_decref(version_js);
|
||||
+ } else {
|
||||
+ ERR("Failed to create request version\n");
|
||||
+ goto fail;
|
||||
+ }
|
||||
|
||||
- json_t* method_js = json_string(cmd->method.s);
|
||||
- if(method_js) {
|
||||
- json_object_set(req->payload, "method", method_js);
|
||||
- json_decref(method_js);
|
||||
- } else {
|
||||
- ERR("Failed to create request method\n");
|
||||
- goto fail;
|
||||
- }
|
||||
+ json_t* method_js = json_string(cmd->method.s);
|
||||
+ if(method_js) {
|
||||
+ json_object_set(req->payload, "method", method_js);
|
||||
+ json_decref(method_js);
|
||||
+ } else {
|
||||
+ ERR("Failed to create request method\n");
|
||||
+ goto fail;
|
||||
+ }
|
||||
|
||||
- json_t* params = NULL;
|
||||
- json_error_t error;
|
||||
- if(cmd->params.len > 0) {
|
||||
- params = json_loads(cmd->params.s, 0, &error);
|
||||
- if(!params) {
|
||||
- ERR("Failed to parse json: %.*s\n", STR(cmd->params));
|
||||
- ERR("PARSE ERROR: %s at %d,%d\n",
|
||||
- error.text, error.line, error.column);
|
||||
+ json_object_set(req->payload, "params", params);
|
||||
+ if(!(req->payload)) {
|
||||
+ ERR("Failed to add request payload params\n");
|
||||
goto fail;
|
||||
}
|
||||
- }
|
||||
|
||||
- json_object_set(req->payload, "params", params);
|
||||
- if(!(req->payload)) {
|
||||
- ERR("Failed to add request payload params\n");
|
||||
- goto fail;
|
||||
+ if(params) json_decref(params);
|
||||
}
|
||||
|
||||
- if(params) json_decref(params);
|
||||
-
|
||||
req->cmd = cmd;
|
||||
return req;
|
||||
fail:
|
||||
diff --git a/modules/janssonrpc-c/janssonrpc_request.h b/modules/janssonrpc-c/janssonrpc_request.h
|
||||
index e38c08e..09dac6b 100644
|
||||
--- a/modules/janssonrpc-c/janssonrpc_request.h
|
||||
+++ b/modules/janssonrpc-c/janssonrpc_request.h
|
||||
@@ -62,4 +62,6 @@ int schedule_retry(jsonrpc_request_t* req);
|
||||
int jsonrpc_send(str conn, jsonrpc_request_t* req, bool notify_only);
|
||||
void fail_request(int code, jsonrpc_request_t* req, char* error_str);
|
||||
|
||||
+int get_next_id(void);
|
||||
+
|
||||
#endif /* _JSONRPC_H_ */
|
||||
diff --git a/modules/janssonrpc-c/netstring.c b/modules/janssonrpc-c/netstring.c
|
||||
index a559224..4949e21 100644
|
||||
--- a/modules/janssonrpc-c/netstring.c
|
||||
+++ b/modules/janssonrpc-c/netstring.c
|
||||
@@ -136,6 +136,26 @@ int netstring_read_evbuffer(struct bufferevent *bev, netstring_t **netstring)
|
||||
return 0;
|
||||
}
|
||||
|
||||
+int string_read_evbuffer(struct bufferevent *bev, netstring_t **netstring)
|
||||
+{
|
||||
+ int n;
|
||||
+ if(*netstring == NULL) {
|
||||
+ /* initialize the netstring struct */
|
||||
+ *netstring = pkg_malloc(sizeof(netstring_t));
|
||||
+ CHECK_MALLOC(*netstring);
|
||||
+ memset(*netstring, 0, sizeof(netstring_t));
|
||||
+ }
|
||||
+ struct evbuffer *input = bufferevent_get_input(bev);
|
||||
+ (*netstring)->length = evbuffer_get_length(input);
|
||||
+ (*netstring)->buffer = pkg_malloc((*netstring)->length);
|
||||
+ n = evbuffer_remove(input, (*netstring)->buffer,(*netstring)->length);
|
||||
+ assert(n==(*netstring)->length);
|
||||
+ if(n>0) (*netstring)->buffer[n-1] = '\0';
|
||||
+ (*netstring)->string = (*netstring)->buffer;
|
||||
+ LM_DBG("read:[%d][%s]\n", n, ZSW((*netstring)->string));
|
||||
+ return 0;
|
||||
+}
|
||||
+
|
||||
int netstring_read_fd(int fd, netstring_t **netstring)
|
||||
{
|
||||
int bytes, offset;
|
||||
diff --git a/modules/janssonrpc-c/netstring.h b/modules/janssonrpc-c/netstring.h
|
||||
index edba4cc..dd24311 100644
|
||||
--- a/modules/janssonrpc-c/netstring.h
|
||||
+++ b/modules/janssonrpc-c/netstring.h
|
||||
@@ -34,6 +34,7 @@ typedef struct {
|
||||
|
||||
void free_netstring(netstring_t* netstring);
|
||||
|
||||
+int string_read_evbuffer(struct bufferevent *bev, netstring_t **netstring);
|
||||
int netstring_read_evbuffer(struct bufferevent *bev, netstring_t **netstring);
|
||||
|
||||
int netstring_read_fd(int fd, netstring_t **netstring);
|
||||
--
|
||||
2.1.4
|
||||
|
@ -1,174 +0,0 @@
|
||||
From ff578962fa44668d9e8e27f8178f19e77a002eb1 Mon Sep 17 00:00:00 2001
|
||||
From: Victor Seva <linuxmaniac@torreviejawireless.org>
|
||||
Date: Wed, 5 Aug 2015 10:32:22 +0200
|
||||
Subject: [PATCH] janssonrpc-c: keepalive
|
||||
|
||||
---
|
||||
modules/janssonrpc-c/janssonrpc_io.c | 58 +++++++++++++++++++++++++++++++++++-
|
||||
1 file changed, 57 insertions(+), 1 deletion(-)
|
||||
|
||||
--- a/modules/janssonrpc-c/janssonrpc_io.c
|
||||
+++ b/modules/janssonrpc-c/janssonrpc_io.c
|
||||
@@ -53,15 +53,18 @@
|
||||
struct tm_binds tmb;
|
||||
|
||||
void cmd_pipe_cb(int fd, short event, void *arg);
|
||||
+void cmd_pipe_keepalive_cb(int fd, short event, void *arg);
|
||||
void io_shutdown(int sig);
|
||||
|
||||
int jsonrpc_io_child_process(int cmd_pipe)
|
||||
{
|
||||
global_ev_base = event_base_new();
|
||||
global_evdns_base = evdns_base_new(global_ev_base, 1);
|
||||
+ struct timeval t = {keepalive_interval,0};
|
||||
|
||||
set_non_blocking(cmd_pipe);
|
||||
- struct event* pipe_ev = event_new(global_ev_base, cmd_pipe, EV_READ | EV_PERSIST, cmd_pipe_cb, NULL);
|
||||
+ struct event* pipe_ev = event_new(global_ev_base, cmd_pipe,
|
||||
+ EV_READ | EV_PERSIST, cmd_pipe_cb, NULL);
|
||||
if(!pipe_ev) {
|
||||
ERR("Failed to create pipe event\n");
|
||||
return -1;
|
||||
@@ -71,7 +74,19 @@
|
||||
ERR("Failed to start pipe event\n");
|
||||
return -1;
|
||||
}
|
||||
+ if(fakerpc && keepalive_interval>0) {
|
||||
+ struct event* pipe_ev_keepalive = event_new(global_ev_base, cmd_pipe,
|
||||
+ EV_TIMEOUT | EV_PERSIST, cmd_pipe_keepalive_cb, NULL);
|
||||
+ if(!pipe_ev_keepalive) {
|
||||
+ ERR("Failed to create pipe keepalive event\n");
|
||||
+ return -1;
|
||||
+ }
|
||||
|
||||
+ if(event_add(pipe_ev_keepalive, &t)<0) {
|
||||
+ ERR("Failed to start pipe keepalive event\n");
|
||||
+ return -1;
|
||||
+ }
|
||||
+ }
|
||||
connect_servers(global_server_group);
|
||||
|
||||
#if 0
|
||||
@@ -540,6 +555,49 @@
|
||||
free_pipe_cmd(cmd);
|
||||
}
|
||||
|
||||
+json_t* keepalive_build_req(void)
|
||||
+{
|
||||
+ int id;
|
||||
+ json_t *return_obj = json_object();
|
||||
+ json_t* t = json_string(JANSSONRPC_KEEPALIVE_REQ);
|
||||
+ json_object_set(return_obj, "messageType", t); json_decref(t);
|
||||
+ t = json_string(int2str(get_next_id(), &id));
|
||||
+ json_object_set(return_obj, "id", t); json_decref(t);
|
||||
+ return return_obj;
|
||||
+}
|
||||
+
|
||||
+void cmd_pipe_keepalive_cb(int fd, short event, void *arg)
|
||||
+{
|
||||
+ if(keepalive_debug) LM_DBG("keepalive event\n");
|
||||
+ char *ns, *json;
|
||||
+ size_t bytes;
|
||||
+ json_t *req;
|
||||
+
|
||||
+ INIT_SERVER_LOOP
|
||||
+ FOREACH_SERVER_IN(global_server_group)
|
||||
+ server = wgroup->server;
|
||||
+ if(server->status == JSONRPC_SERVER_CONNECTED)
|
||||
+ {
|
||||
+ req = keepalive_build_req();
|
||||
+ json = (char*)json_dumps(req, JSON_COMPACT);
|
||||
+ json_decref(req);
|
||||
+ bytes = strlen(json)+1;
|
||||
+ ns = pkg_malloc(sizeof(char)*bytes);
|
||||
+ strncpy(ns, json, bytes);
|
||||
+ ns[bytes-1] = '\n';
|
||||
+ if(bufferevent_write(server->bev, ns, bytes) == 0) {
|
||||
+ pkg_free(ns);
|
||||
+ if(keepalive_debug) {
|
||||
+ LM_DBG("keepalive sent to server %.*s:%d for conn %.*s.\n",
|
||||
+ STR(server->addr), server->port, STR(server->conn));
|
||||
+ }
|
||||
+ } else {
|
||||
+ force_reconnect(server);
|
||||
+ }
|
||||
+ }
|
||||
+ ENDFOR
|
||||
+}
|
||||
+
|
||||
int handle_response(json_t* response)
|
||||
{
|
||||
int retval = 0;
|
||||
@@ -839,14 +897,23 @@
|
||||
|
||||
if (res) {
|
||||
if(fakerpc) {
|
||||
- if(type==3) {
|
||||
- if(handle_response_string(res)<0) {
|
||||
- ERR("Cannot handle jsonrpc response: %s\n",
|
||||
- server->buffer->string);
|
||||
- }
|
||||
- } else {
|
||||
- LM_DBG("msg %s dismissed[%d]\n",
|
||||
- ZSW(server->buffer->string), type);
|
||||
+ switch(type) {
|
||||
+ case 3:
|
||||
+ if(handle_response_string(res)<0) {
|
||||
+ ERR("Cannot handle jsonrpc response: %s\n",
|
||||
+ server->buffer->string);
|
||||
+ }
|
||||
+ break;
|
||||
+ case 1:
|
||||
+ if(keepalive_debug) {
|
||||
+ LM_DBG("msg %s dismissed[%d]\n",
|
||||
+ ZSW(server->buffer->string), type);
|
||||
+ }
|
||||
+ break;
|
||||
+ default:
|
||||
+ LM_DBG("msg %s dismissed[%d]\n",
|
||||
+ ZSW(server->buffer->string), type);
|
||||
+ break;
|
||||
}
|
||||
} else {
|
||||
if (handle_response(res)<0)
|
||||
--- a/modules/janssonrpc-c/janssonrpc_mod.c
|
||||
+++ b/modules/janssonrpc-c/janssonrpc_mod.c
|
||||
@@ -58,6 +58,8 @@
|
||||
|
||||
int pipe_fds[2] = {-1,-1};
|
||||
int fakerpc = 0;
|
||||
+int keepalive_debug = 0;
|
||||
+int keepalive_interval = 3;
|
||||
|
||||
struct tm_binds tmb;
|
||||
|
||||
@@ -92,6 +94,8 @@
|
||||
{"min_srv_ttl", INT_PARAM|USE_FUNC_PARAM, (void*)parse_min_ttl_param},
|
||||
{"result_pv", STR_PARAM, &result_pv_str.s},
|
||||
{"fakerpc", INT_PARAM, &fakerpc},
|
||||
+ {"keepalive_debug", INT_PARAM, &keepalive_debug},
|
||||
+ {"keepalive_interval", INT_PARAM, &keepalive_interval},
|
||||
{ 0,0,0 }
|
||||
};
|
||||
|
||||
--- a/modules/janssonrpc-c/netstring.c
|
||||
+++ b/modules/janssonrpc-c/netstring.c
|
||||
@@ -152,7 +152,6 @@
|
||||
assert(n==(*netstring)->length);
|
||||
if(n>0) (*netstring)->buffer[n-1] = '\0';
|
||||
(*netstring)->string = (*netstring)->buffer;
|
||||
- LM_DBG("read:[%d][%s]\n", n, ZSW((*netstring)->string));
|
||||
return 0;
|
||||
}
|
||||
|
||||
--- a/modules/janssonrpc-c/janssonrpc.h
|
||||
+++ b/modules/janssonrpc-c/janssonrpc.h
|
||||
@@ -91,6 +91,8 @@
|
||||
str result_pv_str;
|
||||
extern int fakerpc;
|
||||
retry_range_t* global_retry_ranges;
|
||||
+extern int keepalive_debug;
|
||||
+extern int keepalive_interval;
|
||||
|
||||
static inline str pkg_strdup(str src)
|
||||
{
|
Loading…
Reference in new issue