TT#34650 Fix table scan with lots of rows

When pre-loading e.g. the location table with >50k records,
we have to make sure to:
* scan and process DB_ROWs in batch of 1000 to balance a
  trade-of between number of redis commands and memory
  consumption
* prepend to list of keys instead of append in order to keep
  this operation as O(1)

Change-Id: Ifc4a2318af4a786391bcae4001f658aea28d1099
changes/98/20298/9
Andreas Granig 8 years ago
parent 243697ce14
commit 04cb118b78

@ -12,6 +12,21 @@
*reply = NULL;
ret = redisGetReply(con->con, reply);
if (con->con->err == REDIS_ERR_EOF) {
@@ -350,6 +355,7 @@
redisFree(con->con);
con->con = NULL;
}
+ return -1;
}
// take commands from oldest to newest and re-do again,
// but don't queue them once again in retry-mode
@@ -396,4 +402,4 @@
LM_DBG("consuming queued command\n");
db_redis_key_free(&query);
}
-}
\ No newline at end of file
+}
--- a/src/modules/db_redis/redis_dbase.c
+++ b/src/modules/db_redis/redis_dbase.c
@@ -378,8 +378,6 @@
@ -60,7 +75,59 @@
goto err;
}
@@ -1062,7 +1068,7 @@
@@ -665,6 +671,14 @@
LM_ERR("Failed to add match pattern to scan query\n");
goto err;
}
+ if (db_redis_key_add_string(&query_v, "COUNT", 5) != 0) {
+ LM_ERR("Failed to add count command to scan query\n");
+ goto err;
+ }
+ if (db_redis_key_add_string(&query_v, "1000", 5) != 0) {
+ LM_ERR("Failed to add count value to scan query\n");
+ goto err;
+ }
pkg_free(match); match = NULL;
reply = db_redis_command_argv(con, query_v);
@@ -691,6 +705,7 @@
table_name->len, table_name->s);
goto err;
}
+ LM_DBG("cursor is %lu\n", cursor);
if (reply->element[1]->type != REDIS_REPLY_ARRAY) {
LM_ERR("Invalid content type for scan on table '%.*s', expected array\n",
@@ -717,8 +732,8 @@
j, table_name->len, table_name->s);
goto err;
}
- if (db_redis_key_add_string(query_keys, key->str, strlen(key->str)) != 0) {
- LM_ERR("Failed to add redis key\n");
+ if (db_redis_key_prepend_string(query_keys, key->str, strlen(key->str)) != 0) {
+ LM_ERR("Failed to prepend redis key\n");
goto err;
}
}
@@ -740,6 +755,8 @@
if (reply) {
db_redis_free_reply(&reply);
}
+
+ LM_DBG("got %lu entries by scan\n", i);
return 0;
err:
@@ -1047,7 +1064,7 @@
redis_key_t *query_v = NULL;
int num_rows = 0;
redis_key_t *key;
- int j;
+ int i, j, max;
*_r = db_redis_new_result();
if (!*_r) {
@@ -1062,7 +1079,7 @@
RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
RES_COL_N(*_r) = _nc;
@ -69,7 +136,154 @@
LM_DBG("performing full table scan\n");
if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n,
keys, keys_count,
@@ -1354,7 +1360,9 @@
@@ -1072,6 +1089,16 @@
}
}
+ // we allocate best case scenario (all rows match)
+ RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = *keys_count;
+ if (db_allocate_rows(*_r) != 0) {
+ LM_ERR("Failed to allocate memory for rows\n");
+ return -1;
+ }
+ RES_COL_N(*_r) = _nc;
+ // reset and increment in convert_row
+ RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
+
for (key = *keys; key; key = key->next) {
redis_key_t *tmp = NULL;
str *keyname = &(key->key);
@@ -1134,58 +1161,58 @@
db_redis_key_free(&query_v);
query_v = NULL;
- }
- // we allocate best case scenario (all rows match)
- RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = num_rows;
- if (db_allocate_rows(*_r) != 0) {
- LM_ERR("Failed to allocate memory for rows\n");
- return -1;
- }
- RES_COL_N(*_r) = _nc;
- // reset and increment in convert_row
- RES_NUM_ROWS(*_r) = RES_ROW_N(*_r) = 0;
-
- for (key = *keys; key; key = key->next) {
- // get reply for EXISTS query
- if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
- LM_ERR("Failed to get reply for query: %s\n",
- con->con->errstr);
- goto error;
- }
- db_redis_check_reply(con, reply, error);
- if (reply->integer == 0) {
- LM_DBG("key does not exist, returning no row for query\n");
- db_redis_free_reply(&reply);
- // also free next reply, as this is a null row for the HMGET
- db_redis_get_reply(con, (void**)&reply);
- db_redis_check_reply(con, reply, error);
- db_redis_free_reply(&reply);
- continue;
- }
- db_redis_free_reply(&reply);
+ max = 0;
+ if (*keys_count == num_rows)
+ max = (*keys_count) % 1000;
+ else if (num_rows % 1000 == 0)
+ max = 1000;
+
+ if (max) {
+ LM_DBG("fetching next %d results\n", max);
+ for (i = 0; i < max; ++i) {
+ // get reply for EXISTS query
+ if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
+ LM_ERR("Failed to get reply for query: %s\n",
+ con->con->errstr);
+ goto error;
+ }
+ db_redis_check_reply(con, reply, error);
+ if (reply->integer == 0) {
+ LM_DBG("key does not exist, returning no row for query\n");
+ db_redis_free_reply(&reply);
+ // also free next reply, as this is a null row for the HMGET
+ db_redis_get_reply(con, (void**)&reply);
+ db_redis_check_reply(con, reply, error);
+ db_redis_free_reply(&reply);
+ continue;
+ }
+ db_redis_free_reply(&reply);
- // get reply for actual HMGET query
- if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
- LM_ERR("Failed to get reply for query: %s\n",
- con->con->errstr);
- goto error;
- }
- db_redis_check_reply(con, reply, error);
- if (reply->type != REDIS_REPLY_ARRAY) {
- LM_ERR("Unexpected reply, expected array\n");
- goto error;
- }
- LM_DBG("dumping full query reply for row\n");
- db_redis_dump_reply(reply);
+ // get reply for actual HMGET query
+ if (db_redis_get_reply(con, (void**)&reply) != REDIS_OK) {
+ LM_ERR("Failed to get reply for query: %s\n",
+ con->con->errstr);
+ goto error;
+ }
+ db_redis_check_reply(con, reply, error);
+ if (reply->type != REDIS_REPLY_ARRAY) {
+ LM_ERR("Unexpected reply, expected array\n");
+ goto error;
+ }
+ LM_DBG("dumping full query reply for row\n");
+ db_redis_dump_reply(reply);
- if (db_redis_convert_row(con, *_r, _k, _v, _op, reply, CON_TABLE(_h), _c, _nc, *manual_keys, *manual_keys_count)) {
- LM_ERR("Failed to convert redis reply for row\n");
- goto error;
+ if (db_redis_convert_row(con, *_r, _k, _v, _op, reply, CON_TABLE(_h), _c, _nc, *manual_keys, *manual_keys_count)) {
+ LM_ERR("Failed to convert redis reply for row\n");
+ goto error;
+ }
+ db_redis_free_reply(&reply);
+ }
}
- db_redis_free_reply(&reply);
}
+ LM_DBG("done performing query\n");
return 0;
error:
@@ -1193,7 +1220,7 @@
db_redis_key_free(&query_v);
if(reply)
db_redis_free_reply(&reply);
- if(_r && *_r) {
+ if(*_r) {
db_redis_free_result((db1_con_t*)_h, *_r); *_r = NULL;
}
return -1;
@@ -1238,13 +1265,13 @@
goto error;
}
- LM_DBG("+++ delete all keys\n");
+ LM_DBG("delete all keys\n");
for (k = keys; k; k = k->next) {
redis_key_t *all_type_key;
str *key = &k->key;
redis_key_t *tmp = NULL;
int row_match;
- LM_DBG("+++ delete key '%.*s'\n", key->len, key->s);
+ LM_DBG("delete key '%.*s'\n", key->len, key->s);
if (db_redis_key_add_string(&query_v, "EXISTS", 6) != 0) {
LM_ERR("Failed to add exists command to pre-delete query\n");
@@ -1354,7 +1381,9 @@
goto error;
}
pkg_free(db_keys);
@ -79,7 +293,12 @@
db_redis_free_reply(&reply);
if (db_redis_key_add_string(&query_v, "DEL", 3) != 0) {
@@ -1395,6 +1403,7 @@
@@ -1391,10 +1420,11 @@
}
//db_redis_key_free(&type_keys);
- LM_DBG("+++ done with loop '%.*s'\n", k->key.len, k->key.s);
+ LM_DBG("done with loop '%.*s'\n", k->key.len, k->key.s);
}
db_redis_key_free(&type_keys);
db_redis_key_free(&all_type_keys);
@ -87,7 +306,7 @@
return 0;
@@ -1426,7 +1435,7 @@
@@ -1426,7 +1456,7 @@
int j;
size_t col;
@ -96,7 +315,7 @@
LM_DBG("performing full table scan\n");
if (db_redis_scan_query_keys(con, CON_TABLE(_h), _k, _n,
keys, keys_count,
@@ -1664,6 +1673,11 @@
@@ -1664,6 +1694,11 @@
// TODO: optimize mapping-based manual post-check (remove check for keys already
// in type query key)
@ -108,7 +327,7 @@
con = REDIS_CON(_h);
if (con && con->con == NULL) {
if (db_redis_connect(con) != 0) {
@@ -1683,7 +1697,7 @@
@@ -1683,7 +1718,7 @@
CON_TABLE(_h)->len, CON_TABLE(_h)->s);
}
@ -117,7 +336,7 @@
// check if we have a version query, and return version directly from
// schema instead of loading it from redis
@@ -1731,6 +1745,7 @@
@@ -1731,6 +1766,7 @@
} else {
LM_DBG("no columns given to build query keys, falling back to full table scan\n");
keys_count = 0;
@ -182,7 +401,16 @@
pkg_free(e);
return NULL;
}
@@ -539,6 +550,10 @@
@@ -494,6 +505,8 @@
p = start = redis_keys.s;
state = DBREDIS_KEYS_TABLE_ST;
do {
+ type = NULL;
+ key = NULL;
switch(state) {
case DBREDIS_KEYS_TABLE_ST:
while(p != end && *p != '=')
@@ -539,6 +552,10 @@
if (!table->types) {
table->types = type_target = type;
} else {
@ -193,7 +421,7 @@
type_target->next = type;
type_target = type_target->next;
}
@@ -571,6 +586,10 @@
@@ -571,6 +588,10 @@
if (*key_target == NULL) {
*key_target = key_location = key;
} else {
@ -204,12 +432,87 @@
key_location->next = key;
key_location = key_location->next;
}
@@ -608,7 +627,7 @@
@@ -586,6 +607,10 @@
return 0;
err:
+ if (type)
+ pkg_free(type);
+ if (key)
+ pkg_free(key);
db_redis_free_tables(con);
return -1;
}
@@ -608,7 +633,8 @@
char full_path[_POSIX_PATH_MAX + 1];
int path_len;
struct stat fstat;
- char c;
+ unsigned char c;
+ int cc;
enum {
DBREDIS_SCHEMA_COLUMN_ST,
@@ -632,6 +658,10 @@
}
dir_name = (char*)pkg_malloc((redis_schema_path.len + 1) * sizeof(char));
+ if (!dir_name) {
+ LM_ERR("Failed to allocate memory for schema directory name\n");
+ goto err;
+ }
strncpy(dir_name, redis_schema_path.s, redis_schema_path.len);
dir_name[redis_schema_path.len] = '\0';
srcdir = opendir(dir_name);
@@ -699,14 +729,15 @@
goto err;
}
- c = fgetc(fin);
+ cc = fgetc(fin);
+ c = (unsigned char)cc;
if (c == '\r')
continue;
//LM_DBG("parsing char %c, buf is '%s' at pos %lu\n", c, buf, bufpos);
switch(state) {
case DBREDIS_SCHEMA_COLUMN_ST:
- if (c == EOF) {
+ if (cc == EOF) {
LM_ERR("Unexpected end of file in schema column name of file %s\n", full_path);
goto err;
}
@@ -732,7 +763,7 @@
LM_DBG("found column name '%.*s'\n", column_name.len, column_name.s);
break;
case DBREDIS_SCHEMA_TYPE_ST:
- if (c == EOF) {
+ if (cc == EOF) {
LM_ERR("Unexpected end of file in schema column type of file %s\n", full_path);
goto err;
}
@@ -772,7 +803,7 @@
bufptr = buf;
break;
case DBREDIS_SCHEMA_VERSION_ST:
- if (c != '\n' && c != EOF) {
+ if (c != '\n' && cc != EOF) {
*bufptr = c;
bufptr++;
continue;
@@ -785,7 +816,7 @@
goto fileend;
break;
}
- } while (c != EOF);
+ } while (cc != EOF);
fileend:
fclose(fin);
@@ -838,4 +869,4 @@
pkg_free(redis_keys.s);
}
return -1;
-}
\ No newline at end of file
+}

Loading…
Cancel
Save