diff --git a/debian/patches/sipwise/fix_db_redis_memory_issues.patch b/debian/patches/sipwise/fix_db_redis_memory_issues.patch index 590bbcb66..a5a05f883 100644 --- a/debian/patches/sipwise/fix_db_redis_memory_issues.patch +++ b/debian/patches/sipwise/fix_db_redis_memory_issues.patch @@ -12,6 +12,21 @@ *reply = NULL; ret = redisGetReply(con->con, reply); if (con->con->err == REDIS_ERR_EOF) { +@@ -350,6 +355,7 @@ + redisFree(con->con); + con->con = NULL; + } ++ return -1; + } + // take commands from oldest to newest and re-do again, + // but don't queue them once again in retry-mode +@@ -396,4 +402,4 @@ + LM_DBG("consuming queued command\n"); + db_redis_key_free(&query); + } +-} +\ No newline at end of file ++} --- a/src/modules/db_redis/redis_dbase.c +++ b/src/modules/db_redis/redis_dbase.c @@ -378,8 +378,6 @@ @@ -60,7 +75,59 @@ goto err; } -@@ -1062,7 +1068,7 @@ +@@ -665,6 +671,14 @@ + LM_ERR("Failed to add match pattern to scan query\n"); + goto err; + } ++ if (db_redis_key_add_string(&query_v, "COUNT", 5) != 0) { ++ LM_ERR("Failed to add count command to scan query\n"); ++ goto err; ++ } ++ if (db_redis_key_add_string(&query_v, "1000", 5) != 0) { ++ LM_ERR("Failed to add count value to scan query\n"); ++ goto err; ++ } + pkg_free(match); match = NULL; + + reply = db_redis_command_argv(con, query_v); +@@ -691,6 +705,7 @@ + table_name->len, table_name->s); + goto err; + } ++ LM_DBG("cursor is %lu\n", cursor); + + if (reply->element[1]->type != REDIS_REPLY_ARRAY) { + LM_ERR("Invalid content type for scan on table '%.*s', expected array\n", +@@ -717,8 +732,8 @@ + j, table_name->len, table_name->s); + goto err; + } +- if (db_redis_key_add_string(query_keys, key->str, strlen(key->str)) != 0) { +- LM_ERR("Failed to add redis key\n"); ++ if (db_redis_key_prepend_string(query_keys, key->str, strlen(key->str)) != 0) { ++ LM_ERR("Failed to prepend redis key\n"); + goto err; + } + } +@@ -740,6 +755,8 @@ + if (reply) { + db_redis_free_reply(&reply); + } ++ ++ LM_DBG("got %lu entries by scan\n", i); + return 0; + + err: +@@ -1047,7 +1064,7 @@ + redis_key_t *query_v = NULL; + int num_rows = 0; + redis_key_t *key; +- int j; ++ int i, j, max; + + *_r = db_redis_new_result(); + if (!*_r) { +@@ -1062,7 +1079,7 @@ RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0; RES_COL_N(*_r) = _nc; @@ -69,7 +136,154 @@ LM_DBG("performing full table scan\n"); if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n, keys, keys_count, -@@ -1354,7 +1360,9 @@ +@@ -1072,6 +1089,16 @@ + } + } + ++ // we allocate best case scenario (all rows match) ++ RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = *keys_count; ++ if (db_allocate_rows(*_r) != 0) { ++ LM_ERR("Failed to allocate memory for rows\n"); ++ return -1; ++ } ++ RES_COL_N(*_r) = _nc; ++ // reset and increment in convert_row ++ RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0; ++ + for (key = *keys; key; key = key->next) { + redis_key_t *tmp = NULL; + str *keyname = &(key->key); +@@ -1134,58 +1161,58 @@ + + db_redis_key_free(&query_v); + query_v = NULL; +- } + +- // we allocate best case scenario (all rows match) +- RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = num_rows; +- if (db_allocate_rows(*_r) != 0) { +- LM_ERR("Failed to allocate memory for rows\n"); +- return -1; +- } +- RES_COL_N(*_r) = _nc; +- // reset and increment in convert_row +- RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0; +- +- for (key = *keys; key; key = key->next) { +- // get reply for EXISTS query +- if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) { +- LM_ERR("Failed to get reply for query: %s\n", +- con->con->errstr); +- goto error; +- } +- db_redis_check_reply(con, reply, error); +- if (reply->integer == 0) { +- LM_DBG("key does not exist, returning no row for query\n"); +- db_redis_free_reply(&reply); +- // also free next reply, as this is a null row for the HMGET +- db_redis_get_reply(con, (void**)&reply); +- db_redis_check_reply(con, reply, error); +- db_redis_free_reply(&reply); +- continue; +- } +- db_redis_free_reply(&reply); ++ max = 0; ++ if (*keys_count == num_rows) ++ max = (*keys_count) % 1000; ++ else if (num_rows % 1000 == 0) ++ max = 1000; ++ ++ if (max) { ++ LM_DBG("fetching next %d results\n", max); ++ for (i = 0; i < max; ++i) { ++ // get reply for EXISTS query ++ if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) { ++ LM_ERR("Failed to get reply for query: %s\n", ++ con->con->errstr); ++ goto error; ++ } ++ db_redis_check_reply(con, reply, error); ++ if (reply->integer == 0) { ++ LM_DBG("key does not exist, returning no row for query\n"); ++ db_redis_free_reply(&reply); ++ // also free next reply, as this is a null row for the HMGET ++ db_redis_get_reply(con, (void**)&reply); ++ db_redis_check_reply(con, reply, error); ++ db_redis_free_reply(&reply); ++ continue; ++ } ++ db_redis_free_reply(&reply); + +- // get reply for actual HMGET query +- if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) { +- LM_ERR("Failed to get reply for query: %s\n", +- con->con->errstr); +- goto error; +- } +- db_redis_check_reply(con, reply, error); +- if (reply->type != REDIS_REPLY_ARRAY) { +- LM_ERR("Unexpected reply, expected array\n"); +- goto error; +- } +- LM_DBG("dumping full query reply for row\n"); +- db_redis_dump_reply(reply); ++ // get reply for actual HMGET query ++ if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) { ++ LM_ERR("Failed to get reply for query: %s\n", ++ con->con->errstr); ++ goto error; ++ } ++ db_redis_check_reply(con, reply, error); ++ if (reply->type != REDIS_REPLY_ARRAY) { ++ LM_ERR("Unexpected reply, expected array\n"); ++ goto error; ++ } ++ LM_DBG("dumping full query reply for row\n"); ++ db_redis_dump_reply(reply); + +- if (db_redis_convert_row(con, *_r, _k, _v, _op, reply, CON_TABLE(_h), _c, _nc, *manual_keys, *manual_keys_count)) { +- LM_ERR("Failed to convert redis reply for row\n"); +- goto error; ++ if (db_redis_convert_row(con, *_r, _k, _v, _op, reply, CON_TABLE(_h), _c, _nc, *manual_keys, *manual_keys_count)) { ++ LM_ERR("Failed to convert redis reply for row\n"); ++ goto error; ++ } ++ db_redis_free_reply(&reply); ++ } + } +- db_redis_free_reply(&reply); + } + ++ LM_DBG("done performing query\n"); + return 0; + + error: +@@ -1193,7 +1220,7 @@ + db_redis_key_free(&query_v); + if(reply) + db_redis_free_reply(&reply); +- if(_r && *_r) { ++ if(*_r) { + db_redis_free_result((db1_con_t*)_h, *_r); *_r = NULL; + } + return -1; +@@ -1238,13 +1265,13 @@ + goto error; + } + +- LM_DBG("+++ delete all keys\n"); ++ LM_DBG("delete all keys\n"); + for (k = keys; k; k = k->next) { + redis_key_t *all_type_key; + str *key = &k->key; + redis_key_t *tmp = NULL; + int row_match; +- LM_DBG("+++ delete key '%.*s'\n", key->len, key->s); ++ LM_DBG("delete key '%.*s'\n", key->len, key->s); + + if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) { + LM_ERR("Failed to add exists command to pre-delete query\n"); +@@ -1354,7 +1381,9 @@ goto error; } pkg_free(db_keys); @@ -79,7 +293,12 @@ db_redis_free_reply(&reply); if (db_redis_key_add_string(&query_v, "DEL", 3) != 0) { -@@ -1395,6 +1403,7 @@ +@@ -1391,10 +1420,11 @@ + } + + //db_redis_key_free(&type_keys); +- LM_DBG("+++ done with loop '%.*s'\n", k->key.len, k->key.s); ++ LM_DBG("done with loop '%.*s'\n", k->key.len, k->key.s); } db_redis_key_free(&type_keys); db_redis_key_free(&all_type_keys); @@ -87,7 +306,7 @@ return 0; -@@ -1426,7 +1435,7 @@ +@@ -1426,7 +1456,7 @@ int j; size_t col; @@ -96,7 +315,7 @@ LM_DBG("performing full table scan\n"); if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n, keys, keys_count, -@@ -1664,6 +1673,11 @@ +@@ -1664,6 +1694,11 @@ // TODO: optimize mapping-based manual post-check (remove check for keys already // in type query key) @@ -108,7 +327,7 @@ con = REDIS_CON(_h); if (con && con->con == NULL) { if (db_redis_connect(con) != 0) { -@@ -1683,7 +1697,7 @@ +@@ -1683,7 +1718,7 @@ CON_TABLE(_h)->len, CON_TABLE(_h)->s); } @@ -117,7 +336,7 @@ // check if we have a version query, and return version directly from // schema instead of loading it from redis -@@ -1731,6 +1745,7 @@ +@@ -1731,6 +1766,7 @@ } else { LM_DBG("no columns given to build query keys, falling back to full table scan\n"); keys_count = 0; @@ -182,7 +401,16 @@ pkg_free(e); return NULL; } -@@ -539,6 +550,10 @@ +@@ -494,6 +505,8 @@ + p = start = redis_keys.s; + state = DBREDIS_KEYS_TABLE_ST; + do { ++ type = NULL; ++ key = NULL; + switch(state) { + case DBREDIS_KEYS_TABLE_ST: + while(p != end && *p != '=') +@@ -539,6 +552,10 @@ if (!table->types) { table->types = type_target = type; } else { @@ -193,7 +421,7 @@ type_target->next = type; type_target = type_target->next; } -@@ -571,6 +586,10 @@ +@@ -571,6 +588,10 @@ if (*key_target == NULL) { *key_target = key_location = key; } else { @@ -204,12 +432,87 @@ key_location->next = key; key_location = key_location->next; } -@@ -608,7 +627,7 @@ +@@ -586,6 +607,10 @@ + return 0; + + err: ++ if (type) ++ pkg_free(type); ++ if (key) ++ pkg_free(key); + db_redis_free_tables(con); + return -1; + } +@@ -608,7 +633,8 @@ char full_path[_POSIX_PATH_MAX + 1]; int path_len; struct stat fstat; - char c; + unsigned char c; ++ int cc; enum { DBREDIS_SCHEMA_COLUMN_ST, +@@ -632,6 +658,10 @@ + } + + dir_name = (char*)pkg_malloc((redis_schema_path.len + 1) * sizeof(char)); ++ if (!dir_name) { ++ LM_ERR("Failed to allocate memory for schema directory name\n"); ++ goto err; ++ } + strncpy(dir_name, redis_schema_path.s, redis_schema_path.len); + dir_name[redis_schema_path.len] = '\0'; + srcdir = opendir(dir_name); +@@ -699,14 +729,15 @@ + goto err; + } + +- c = fgetc(fin); ++ cc = fgetc(fin); ++ c = (unsigned char)cc; + + if (c == '\r') + continue; + //LM_DBG("parsing char %c, buf is '%s' at pos %lu\n", c, buf, bufpos); + switch(state) { + case DBREDIS_SCHEMA_COLUMN_ST: +- if (c == EOF) { ++ if (cc == EOF) { + LM_ERR("Unexpected end of file in schema column name of file %s\n", full_path); + goto err; + } +@@ -732,7 +763,7 @@ + LM_DBG("found column name '%.*s'\n", column_name.len, column_name.s); + break; + case DBREDIS_SCHEMA_TYPE_ST: +- if (c == EOF) { ++ if (cc == EOF) { + LM_ERR("Unexpected end of file in schema column type of file %s\n", full_path); + goto err; + } +@@ -772,7 +803,7 @@ + bufptr = buf; + break; + case DBREDIS_SCHEMA_VERSION_ST: +- if (c != '\n' && c != EOF) { ++ if (c != '\n' && cc != EOF) { + *bufptr = c; + bufptr++; + continue; +@@ -785,7 +816,7 @@ + goto fileend; + break; + } +- } while (c != EOF); ++ } while (cc != EOF); + + fileend: + fclose(fin); +@@ -838,4 +869,4 @@ + pkg_free(redis_keys.s); + } + return -1; +-} +\ No newline at end of file ++}