#include "DRedisConnection.h" #include "jsonArg.h" #include "AmUtils.h" #include #include #include DRedisConfig::DRedisConfig(const string& host, unsigned int port, bool unix_socket, bool full_logging, bool use_transactions, int connect_timeout) : host(host), port(port), unix_socket(unix_socket), full_logging(full_logging), use_transactions(use_transactions) { tv_timeout.tv_sec = connect_timeout / 1000; tv_timeout.tv_usec = (connect_timeout - 1000 * tv_timeout.tv_sec) * 1000; } DRedisConnection::DRedisConnection(const DRedisConfig& cfg) : redis_context(NULL), cfg(cfg) { } DRedisConnection::~DRedisConnection() { disconnect(); } void DRedisConnection::disconnect() { if(redis_context) { DBG("disconnecting from Redis..."); int socketFd = redis_context->fd; struct sockaddr_in addr; socklen_t len = sizeof(addr); if (getsockname(socketFd, (struct sockaddr*)&addr, &len) == 0 && addr.sin_family == AF_INET) { DBG("Freeing an ephemeral port used by this redis connection: '%u'\n", (unsigned int)ntohs(addr.sin_port)); } redisFree(redis_context); redis_context = NULL; } } bool DRedisConnection::connect() { if(redis_context) return true; if(!cfg.unix_socket) { DBG("connecting to REDIS at %s:%u\n", cfg.host.c_str(), cfg.port); redis_context = redisConnectWithTimeout((char*)cfg.host.c_str(), cfg.port, cfg.tv_timeout); } else { DBG("connecting to REDIS at %s\n", cfg.host.c_str()); redis_context = redisConnectUnixWithTimeout(cfg.host.c_str(), cfg.tv_timeout); } if (redis_context != NULL) { /* handle connection error */ if (redis_context->err) { ERROR("REDIS Connection error: %s\n", redis_context->errstr); disconnect(); return false; } int socketFd = redis_context->fd; struct sockaddr_in addr; socklen_t len = sizeof(addr); if (getsockname(socketFd, (struct sockaddr*)&addr, &len) == 0 && addr.sin_family == AF_INET) { DBG("Allocated an ephemeral port for this redis connection: '%u'\n", (unsigned int)ntohs(addr.sin_port)); } } else { ERROR("REDIS Connection error: %s\n", redis_context->errstr); return false; } return true; } int DRedisConnection::handle_redis_reply(redisReply *reply, const char* _cmd) { if (!reply) { switch (redis_context->err) { case REDIS_ERR_IO: ERROR("I/O error: %s (%s)\n", redis_context->errstr,_cmd); disconnect(); return DB_E_CONNECTION; case REDIS_ERR_EOF: // silently reconnect case REDIS_ERR_OTHER: disconnect(); return DB_E_CONNECTION; case REDIS_ERR_PROTOCOL: ERROR("REDIS Protocol error detected\n"); disconnect(); return DB_E_CONNECTION; default: disconnect(); return DB_E_CONNECTION; } } switch (reply->type) { case REDIS_REPLY_ERROR: ERROR("REDIS %s ERROR: %s\n", _cmd, reply->str); return DB_E_WRITE; case REDIS_REPLY_STATUS: case REDIS_REPLY_STRING: if (cfg.full_logging) { DBG("REDIS %s: str: '%.*s'\n", _cmd, (int) reply->len, reply->str); } break; case REDIS_REPLY_INTEGER: if (cfg.full_logging) { DBG("REDIS %s: int: %lld\n", _cmd, reply->integer); } break; case REDIS_REPLY_ARRAY: { if (cfg.full_logging) { DBG("REDIS %s: array START\n", _cmd); }; for (size_t i=0;ielements;i++) { switch(reply->element[i]->type) { case REDIS_REPLY_ERROR: ERROR("REDIS %s ERROR: %.*s\n", _cmd, (int) reply->element[i]->len, reply->element[i]->str); return DB_E_WRITE; case REDIS_REPLY_INTEGER: if (cfg.full_logging) { DBG("REDIS %s: %lld\n", _cmd, reply->element[i]->integer); } break; case REDIS_REPLY_NIL: if (cfg.full_logging) { DBG("REDIS %s: nil\n", _cmd); } break; case REDIS_REPLY_ARRAY: if (cfg.full_logging) { DBG("REDIS : %zd elements\n", reply->elements); } break; case REDIS_REPLY_STATUS: case REDIS_REPLY_STRING: if (cfg.full_logging) { if (reply->element[i]->len >= 0) { DBG("REDIS %s: %.*s\n", _cmd, (int) reply->element[i]->len, reply->element[i]->str); } } break; default: ERROR("unknown REDIS reply %d to %s!",reply->element[i]->type, _cmd); break; } } if (cfg.full_logging) { DBG("REDIS %s: array END\n", _cmd); }; }; break; default: ERROR("unknown REDIS reply %d to %s!", reply->type, _cmd); break; } if (cfg.full_logging) { DBG("REDIS cmd %s executed successfully\n", _cmd); } return DB_E_OK; } #define RETURN_READ_ERROR \ freeReplyObject(reply); \ return DB_E_READ; int DRedisConnection::exec_cmd(const char* cmd, redisReply*& reply) { if(!redis_context) { ERROR("REDIS cmd '%s': not connected",cmd); return DB_E_CONNECTION; } reply = NULL; reply = (redisReply *)redisCommand(redis_context, cmd); int ret = handle_redis_reply(reply, cmd); if (ret != DB_E_OK) return ret; DBG("successfully executed redis cmd\n"); return DB_E_OK; } int DRedisConnection::append_cmd(const char* cmd) { if(!redis_context) { ERROR("REDIS append cmd '%s': not connected",cmd); return DB_E_CONNECTION; } return redisAppendCommand(redis_context, cmd) == REDIS_OK ? DB_E_OK : DB_E_CONNECTION; } int DRedisConnection::get_reply(redisReply*& reply) { if(!redis_context) { ERROR("REDIS get_reply: not connected"); return DB_E_CONNECTION; } redisGetReply(redis_context, (void**)&reply); int ret = handle_redis_reply(reply, ""); return ret; }