mirror of https://github.com/sipwise/sems.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
222 lines
5.5 KiB
222 lines
5.5 KiB
#include "DRedisConnection.h"
|
|
#include "jsonArg.h"
|
|
#include "AmUtils.h"
|
|
|
|
#include <errno.h>
|
|
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
|
|
DRedisConfig::DRedisConfig(const string& host, unsigned int port,
|
|
bool unix_socket, bool full_logging,
|
|
bool use_transactions, int connect_timeout)
|
|
: host(host), port(port),
|
|
unix_socket(unix_socket),
|
|
full_logging(full_logging),
|
|
use_transactions(use_transactions)
|
|
{
|
|
tv_timeout.tv_sec = connect_timeout / 1000;
|
|
tv_timeout.tv_usec = (connect_timeout - 1000 * tv_timeout.tv_sec) * 1000;
|
|
}
|
|
|
|
DRedisConnection::DRedisConnection(const DRedisConfig& cfg)
|
|
: redis_context(NULL), cfg(cfg)
|
|
{
|
|
}
|
|
|
|
DRedisConnection::~DRedisConnection()
|
|
{
|
|
disconnect();
|
|
}
|
|
|
|
void DRedisConnection::disconnect()
|
|
{
|
|
if(redis_context) {
|
|
DBG("disconnecting from Redis...");
|
|
|
|
int socketFd = redis_context->fd;
|
|
struct sockaddr_in addr;
|
|
socklen_t len = sizeof(addr);
|
|
|
|
if (getsockname(socketFd, (struct sockaddr*)&addr, &len) == 0 && addr.sin_family == AF_INET) {
|
|
DBG("Freeing an ephemeral port used by this redis connection: '%u'\n", (unsigned int)ntohs(addr.sin_port));
|
|
}
|
|
|
|
redisFree(redis_context);
|
|
redis_context = NULL;
|
|
}
|
|
}
|
|
|
|
bool DRedisConnection::connect()
|
|
{
|
|
if(redis_context)
|
|
return true;
|
|
|
|
if(!cfg.unix_socket) {
|
|
DBG("connecting to REDIS at %s:%u\n", cfg.host.c_str(), cfg.port);
|
|
redis_context = redisConnectWithTimeout((char*)cfg.host.c_str(),
|
|
cfg.port, cfg.tv_timeout);
|
|
}
|
|
else {
|
|
DBG("connecting to REDIS at %s\n", cfg.host.c_str());
|
|
redis_context = redisConnectUnixWithTimeout(cfg.host.c_str(),
|
|
cfg.tv_timeout);
|
|
}
|
|
|
|
if (redis_context != NULL) {
|
|
|
|
/* handle connection error */
|
|
if (redis_context->err) {
|
|
ERROR("REDIS Connection error: %s\n", redis_context->errstr);
|
|
disconnect();
|
|
return false;
|
|
}
|
|
|
|
int socketFd = redis_context->fd;
|
|
struct sockaddr_in addr;
|
|
socklen_t len = sizeof(addr);
|
|
|
|
if (getsockname(socketFd, (struct sockaddr*)&addr, &len) == 0 && addr.sin_family == AF_INET) {
|
|
DBG("Allocated an ephemeral port for this redis connection: '%u'\n", (unsigned int)ntohs(addr.sin_port));
|
|
}
|
|
} else {
|
|
ERROR("REDIS Connection error: %s\n", redis_context->errstr);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
int DRedisConnection::handle_redis_reply(redisReply *reply, const char* _cmd) {
|
|
if (!reply) {
|
|
switch (redis_context->err) {
|
|
case REDIS_ERR_IO:
|
|
ERROR("I/O error: %s (%s)\n", redis_context->errstr,_cmd);
|
|
disconnect();
|
|
return DB_E_CONNECTION;
|
|
|
|
case REDIS_ERR_EOF: // silently reconnect
|
|
case REDIS_ERR_OTHER:
|
|
disconnect();
|
|
return DB_E_CONNECTION;
|
|
|
|
case REDIS_ERR_PROTOCOL:
|
|
ERROR("REDIS Protocol error detected\n");
|
|
disconnect();
|
|
return DB_E_CONNECTION;
|
|
|
|
default:
|
|
disconnect();
|
|
return DB_E_CONNECTION;
|
|
}
|
|
}
|
|
|
|
switch (reply->type) {
|
|
case REDIS_REPLY_ERROR:
|
|
ERROR("REDIS %s ERROR: %s\n", _cmd, reply->str);
|
|
return DB_E_WRITE;
|
|
|
|
case REDIS_REPLY_STATUS:
|
|
case REDIS_REPLY_STRING:
|
|
if (cfg.full_logging) {
|
|
DBG("REDIS %s: str: '%.*s'\n", _cmd, (int) reply->len, reply->str);
|
|
}
|
|
break;
|
|
|
|
case REDIS_REPLY_INTEGER:
|
|
if (cfg.full_logging) {
|
|
DBG("REDIS %s: int: %lld\n", _cmd, reply->integer);
|
|
} break;
|
|
|
|
case REDIS_REPLY_ARRAY: {
|
|
if (cfg.full_logging) {
|
|
DBG("REDIS %s: array START\n", _cmd);
|
|
};
|
|
for (size_t i=0;i<reply->elements;i++) {
|
|
switch(reply->element[i]->type) {
|
|
case REDIS_REPLY_ERROR: ERROR("REDIS %s ERROR: %.*s\n", _cmd, (int) reply->element[i]->len, reply->element[i]->str);
|
|
return DB_E_WRITE;
|
|
|
|
case REDIS_REPLY_INTEGER:
|
|
if (cfg.full_logging) {
|
|
DBG("REDIS %s: %lld\n", _cmd, reply->element[i]->integer);
|
|
} break;
|
|
|
|
case REDIS_REPLY_NIL:
|
|
if (cfg.full_logging) {
|
|
DBG("REDIS %s: nil\n", _cmd);
|
|
} break;
|
|
|
|
case REDIS_REPLY_ARRAY:
|
|
if (cfg.full_logging) {
|
|
DBG("REDIS : %zd elements\n", reply->elements);
|
|
} break;
|
|
|
|
case REDIS_REPLY_STATUS:
|
|
case REDIS_REPLY_STRING:
|
|
if (cfg.full_logging) {
|
|
if (reply->element[i]->len >= 0) {
|
|
DBG("REDIS %s: %.*s\n", _cmd, (int) reply->element[i]->len, reply->element[i]->str);
|
|
}
|
|
}
|
|
break;
|
|
default:
|
|
ERROR("unknown REDIS reply %d to %s!",reply->element[i]->type, _cmd); break;
|
|
}
|
|
}
|
|
if (cfg.full_logging) {
|
|
DBG("REDIS %s: array END\n", _cmd);
|
|
};
|
|
}; break;
|
|
|
|
default: ERROR("unknown REDIS reply %d to %s!", reply->type, _cmd); break;
|
|
}
|
|
|
|
if (cfg.full_logging) {
|
|
DBG("REDIS cmd %s executed successfully\n", _cmd);
|
|
}
|
|
return DB_E_OK;
|
|
}
|
|
|
|
#define RETURN_READ_ERROR \
|
|
freeReplyObject(reply); \
|
|
return DB_E_READ;
|
|
|
|
int DRedisConnection::exec_cmd(const char* cmd, redisReply*& reply) {
|
|
|
|
if(!redis_context) {
|
|
ERROR("REDIS cmd '%s': not connected",cmd);
|
|
return DB_E_CONNECTION;
|
|
}
|
|
reply = NULL;
|
|
|
|
reply = (redisReply *)redisCommand(redis_context, cmd);
|
|
int ret = handle_redis_reply(reply, cmd);
|
|
if (ret != DB_E_OK)
|
|
return ret;
|
|
|
|
DBG("successfully executed redis cmd\n");
|
|
return DB_E_OK;
|
|
}
|
|
|
|
int DRedisConnection::append_cmd(const char* cmd) {
|
|
if(!redis_context) {
|
|
ERROR("REDIS append cmd '%s': not connected",cmd);
|
|
return DB_E_CONNECTION;
|
|
}
|
|
return redisAppendCommand(redis_context, cmd) == REDIS_OK ?
|
|
DB_E_OK : DB_E_CONNECTION;
|
|
}
|
|
|
|
int DRedisConnection::get_reply(redisReply*& reply) {
|
|
if(!redis_context) {
|
|
ERROR("REDIS get_reply: not connected");
|
|
return DB_E_CONNECTION;
|
|
}
|
|
|
|
redisGetReply(redis_context, (void**)&reply);
|
|
int ret = handle_redis_reply(reply, "<pipelined>");
|
|
return ret;
|
|
}
|
|
|