/** * $Id$ * * Copyright (C) 2011 Flowroute LLC (flowroute.com) * * This file is part of Kamailio, a free SIP server. * * This file is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version * * * This file is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include #include #include #include #include "../../sr_module.h" #include "../../route.h" #include "../../route_struct.h" #include "../../lvalue.h" #include "../tm/tm_load.h" #include "jsonrpc_io.h" #include "jsonrpc.h" #include "netstring.h" #define CHECK_MALLOC_VOID(p) if(!p) {LM_ERR("Out of memory!"); return;} #define CHECK_MALLOC(p) if(!p) {LM_ERR("Out of memory!"); return -1;} struct jsonrpc_server { char *host; int port, socket, status; struct jsonrpc_server *next; struct event *ev; struct itimerspec *timer; }; struct jsonrpc_server_group { struct jsonrpc_server *next_server; int priority; struct jsonrpc_server_group *next_group; }; struct tm_binds tmb; struct jsonrpc_server_group *server_group; void socket_cb(int fd, short event, void *arg); void cmd_pipe_cb(int fd, short event, void *arg); int set_non_blocking(int fd); int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr); int connect_servers(struct jsonrpc_server_group *group); int connect_server(struct jsonrpc_server *server); int handle_server_failure(struct jsonrpc_server *server); int jsonrpc_io_child_process(int cmd_pipe, char* _servers) { if (parse_servers(_servers, &server_group) != 0) { LM_ERR("servers parameter could not be parsed\n"); return -1; } event_init(); struct event pipe_ev; set_non_blocking(cmd_pipe); event_set(&pipe_ev, cmd_pipe, EV_READ | EV_PERSIST, cmd_pipe_cb, &pipe_ev); event_add(&pipe_ev, NULL); if (!connect_servers(server_group)) { LM_ERR("failed to connect to any servers\n"); return -1; } event_dispatch(); return 0; } void timeout_cb(int fd, short event, void *arg) { LM_ERR("message timeout\n"); jsonrpc_request_t *req = (jsonrpc_request_t*)arg; json_object *error = json_object_new_string("timeout"); void_jsonrpc_request(req->id); close(req->timerfd); event_del(req->timer_ev); pkg_free(req->timer_ev); req->cbfunc(error, req->cbdata, 1); pkg_free(req); } int result_cb(json_object *result, char *data, int error) { struct jsonrpc_pipe_cmd *cmd = (struct jsonrpc_pipe_cmd*)data; pv_spec_t *dst = cmd->cb_pv; pv_value_t val; const char* res = json_object_get_string(result); val.rs.s = (char*)res; val.rs.len = strlen(res); val.flags = PV_VAL_STR; dst->setf(0, &dst->pvp, (int)EQ_T, &val); int n; if (error) { n = route_get(&main_rt, cmd->err_route); } else { n = route_get(&main_rt, cmd->cb_route); } struct action *a = main_rt.rlist[n]; tmb.t_continue(cmd->t_hash, cmd->t_label, a); free_pipe_cmd(cmd); return 0; } int (*res_cb)(json_object*, char*, int) = &result_cb; void cmd_pipe_cb(int fd, short event, void *arg) { struct jsonrpc_pipe_cmd *cmd; /* struct event *ev = (struct event*)arg; */ if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) { LM_ERR("failed to read from command pipe: %s\n", strerror(errno)); return; } json_object *params = json_tokener_parse(cmd->params); json_object *payload = NULL; jsonrpc_request_t *req = NULL; if (cmd->notify_only) { payload = build_jsonrpc_notification(cmd->method, params); } else { req = build_jsonrpc_request(cmd->method, params, (char*)cmd, res_cb); if (req) payload = req->payload; } if (!payload) { LM_ERR("Failed to build jsonrpc_request_t (method: %s, params: %s)\n", cmd->method, cmd->params); return; } char *json = (char*)json_object_get_string(payload); char *ns; size_t bytes; bytes = netstring_encode_new(&ns, json, (size_t)strlen(json)); struct jsonrpc_server_group *g; int sent = 0; for (g = server_group; g != NULL; g = g->next_group) { struct jsonrpc_server *s, *first = NULL; for (s = g->next_server; s != first; s = s->next) { if (first == NULL) first = s; if (s->status == JSONRPC_SERVER_CONNECTED) { if (send(s->socket, ns, bytes, 0) == bytes) { sent = 1; break; } else { handle_server_failure(s); } } g->next_server = s->next; } if (sent) { break; } else { LM_WARN("Failed to send on priority group %d... proceeding to next priority group.\n", g->priority); } } if (sent && req) { int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); if (timerfd == -1) { LM_ERR("Could not create timerfd."); return; } req->timerfd = timerfd; struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec)); CHECK_MALLOC_VOID(itime); itime->it_interval.tv_sec = 0; itime->it_interval.tv_nsec = 0; itime->it_value.tv_sec = JSONRPC_TIMEOUT/1000; itime->it_value.tv_nsec = (JSONRPC_TIMEOUT % 1000) * 1000000; if (timerfd_settime(timerfd, 0, itime, NULL) == -1) { LM_ERR("Could not set timer."); return; } pkg_free(itime); struct event *timer_ev = pkg_malloc(sizeof(struct event)); CHECK_MALLOC_VOID(timer_ev); event_set(timer_ev, timerfd, EV_READ, timeout_cb, req); if(event_add(timer_ev, NULL) == -1) { LM_ERR("event_add failed while setting request timer (%s).", strerror(errno)); return; } req->timer_ev = timer_ev; } else if (!sent) { LM_ERR("Request could not be sent... no more failover groups.\n"); if (req) { json_object *error = json_object_new_string("failure"); void_jsonrpc_request(req->id); req->cbfunc(error, req->cbdata, 1); } } pkg_free(ns); json_object_put(payload); } void socket_cb(int fd, short event, void *arg) { struct jsonrpc_server *server = (struct jsonrpc_server*)arg; if (event != EV_READ) { LM_ERR("unexpected socket event (%d)\n", event); handle_server_failure(server); return; } char *netstring; int retval = netstring_read_fd(fd, &netstring); if (retval != 0) { LM_ERR("bad netstring (%d)\n", retval); handle_server_failure(server); return; } struct json_object *res = json_tokener_parse(netstring); if (res) { handle_jsonrpc_response(res); json_object_put(res); } else { LM_ERR("netstring could not be parsed: (%s)\n", netstring); handle_server_failure(server); } pkg_free(netstring); } int set_non_blocking(int fd) { int flags; flags = fcntl(fd, F_GETFL); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } int parse_servers(char *_servers, struct jsonrpc_server_group **group_ptr) { char cpy[strlen(_servers)+1]; char *servers = strcpy(cpy, _servers); struct jsonrpc_server_group *group = NULL; /* parse servers string */ char *token = strtok(servers, ":"); while (token != NULL) { char *host, *port_s, *priority_s, *tail; int port, priority; host = token; /* validate domain */ if (!(isalpha(host[0]) || isdigit(host[0]))) { LM_ERR("invalid domain (1st char is '%c')\n", host[0]); return -1; } int i; for (i=1; ihost = h; server->port = port; server->status = JSONRPC_SERVER_DISCONNECTED; server->socket = 0; int group_cnt = 0; /* search for a server group with this server's priority */ struct jsonrpc_server_group *selected_group = NULL; for (selected_group=group; selected_group != NULL; selected_group=selected_group->next_group) { if (selected_group->priority == priority) break; } if (selected_group == NULL) { group_cnt++; LM_INFO("Creating group for priority %d\n", priority); /* this is the first server for this priority... link it to itself */ server->next = server; selected_group = pkg_malloc(sizeof(struct jsonrpc_server_group)); CHECK_MALLOC(selected_group); selected_group->priority = priority; selected_group->next_server = server; /* insert the group properly in the linked list */ struct jsonrpc_server_group *x, *pg; pg = NULL; if (group == NULL) { group = selected_group; group->next_group = NULL; } else { for (x = group; x != NULL; x = x->next_group) { if (priority > x->priority) { if (pg == NULL) { group = selected_group; } else { pg->next_group = selected_group; } selected_group->next_group = x; break; } else if (x->next_group == NULL) { x->next_group = selected_group; break; } else { pg = x; } } } } else { LM_ERR("Using existing group for priority %d\n", priority); server->next = selected_group->next_server->next; selected_group->next_server->next = server; } token = strtok(NULL, ":"); } *group_ptr = group; return 0; } int connect_server(struct jsonrpc_server *server) { struct sockaddr_in server_addr; struct hostent *hp; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(server->port); hp = gethostbyname(server->host); if (hp == NULL) { LM_ERR("gethostbyname(%s) failed with h_errno=%d.\n", server->host, h_errno); handle_server_failure(server); return -1; } memcpy(&(server_addr.sin_addr.s_addr), hp->h_addr, hp->h_length); int sockfd = socket(AF_INET,SOCK_STREAM,0); if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in))) { LM_WARN("Failed to connect to %s on port %d... %s\n", server->host, server->port, strerror(errno)); handle_server_failure(server); return -1; } if (set_non_blocking(sockfd) != 0) { LM_WARN("Failed to set socket (%s:%d) to non blocking.\n", server->host, server->port); handle_server_failure(server); return -1; } server->socket = sockfd; server->status = JSONRPC_SERVER_CONNECTED; struct event *socket_ev = pkg_malloc(sizeof(struct event)); CHECK_MALLOC(socket_ev); event_set(socket_ev, sockfd, EV_READ | EV_PERSIST, socket_cb, server); event_add(socket_ev, NULL); server->ev = socket_ev; return 0; } int connect_servers(struct jsonrpc_server_group *group) { int connected_servers = 0; for (;group != NULL; group = group->next_group) { struct jsonrpc_server *s, *first = NULL; LM_INFO("Connecting to servers for priority %d:\n", group->priority); for (s=group->next_server;s!=first;s=s->next) { if (connect_server(s) == 0) { connected_servers++; LM_INFO("Connected to host %s on port %d\n", s->host, s->port); } if (first == NULL) first = s; } } return connected_servers; } void reconnect_cb(int fd, short event, void *arg) { LM_INFO("Attempting to reconnect now."); struct jsonrpc_server *server = (struct jsonrpc_server*)arg; if (server->status == JSONRPC_SERVER_CONNECTED) { LM_WARN("Trying to connect an already connected server."); return; } if (server->ev != NULL) { event_del(server->ev); pkg_free(server->ev); server->ev = NULL; } close(fd); pkg_free(server->timer); connect_server(server); } int handle_server_failure(struct jsonrpc_server *server) { LM_INFO("Setting timer to reconnect to %s on port %d in %d seconds.\n", server->host, server->port, JSONRPC_RECONNECT_INTERVAL); if (server->socket) close(server->socket); server->socket = 0; if (server->ev != NULL) { event_del(server->ev); pkg_free(server->ev); server->ev = NULL; } server->status = JSONRPC_SERVER_FAILURE; int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); if (timerfd == -1) { LM_ERR("Could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server."); return -1; } struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec)); CHECK_MALLOC(itime); itime->it_interval.tv_sec = 0; itime->it_interval.tv_nsec = 0; itime->it_value.tv_sec = JSONRPC_RECONNECT_INTERVAL; itime->it_value.tv_nsec = 0; if (timerfd_settime(timerfd, 0, itime, NULL) == -1) { LM_ERR("Could not set timer to reschedule connection. No further attempts will be made to reconnect this server."); return -1; } LM_INFO("timerfd value is %d\n", timerfd); struct event *timer_ev = pkg_malloc(sizeof(struct event)); CHECK_MALLOC(timer_ev); event_set(timer_ev, timerfd, EV_READ, reconnect_cb, server); if(event_add(timer_ev, NULL) == -1) { LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.", strerror(errno)); return -1; } server->ev = timer_ev; server->timer = itime; return 0; } void free_pipe_cmd(struct jsonrpc_pipe_cmd *cmd) { if (cmd->method) shm_free(cmd->method); if (cmd->params) shm_free(cmd->params); if (cmd->cb_route) shm_free(cmd->cb_route); if (cmd->err_route) shm_free(cmd->err_route); if (cmd->cb_pv) shm_free(cmd->cb_pv); shm_free(cmd); }