Issue 9799 - Multirow results for func_odbc

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@66734 65c4cc65-6c06-0410-ace0-fbb531ad65f3
1.6.0
Tilghman Lesher 19 years ago
parent 8ea9dcc221
commit e9251f42df

@ -67,4 +67,11 @@ writesql=UPDATE presence SET location='${SQL_ESC(${VAL1})}' WHERE id='${SQL_ESC(
;escapecommas=no ; Normally, commas within a field are escaped such that each
; field may be separated into individual variables with ARRAY.
; This option turns that behavior off [default=yes].
;mode=multirow ; Enable multirow fetching. Instead of returning results directly,
; mode=multirow queries will return a result-id, which can be passed
; multiple times to ODBC_FETCH, and that function will return each
; row, in order. You can add to this the following parameter:
;rowlimit=5 ; rowlimit will limit the number of rows retrieved and stored from
; the database. If not specified, all rows, up to available memory,
; will be retrieved and stored.

@ -57,6 +57,7 @@ static char *config = "func_odbc.conf";
enum {
OPT_ESCAPECOMMAS = (1 << 0),
OPT_MULTIROW = (1 << 1),
} odbc_option_flags;
struct acf_odbc_query {
@ -66,11 +67,47 @@ struct acf_odbc_query {
char sql_read[2048];
char sql_write[2048];
unsigned int flags;
int rowlimit;
struct ast_custom_function *acf;
};
static void odbc_datastore_free(void *data);
struct ast_datastore_info odbc_info = {
.type = "FUNC_ODBC",
.destroy = odbc_datastore_free,
};
/* For storing each result row */
struct odbc_datastore_row {
AST_LIST_ENTRY(odbc_datastore_row) list;
char data[0];
};
/* For storing each result set */
struct odbc_datastore {
AST_LIST_HEAD(, odbc_datastore_row);
char names[0];
};
AST_LIST_HEAD_STATIC(queries, acf_odbc_query);
static int resultcount = 0;
AST_MUTEX_DEFINE_STATIC(resultlock);
static void odbc_datastore_free(void *data)
{
struct odbc_datastore *result = data;
struct odbc_datastore_row *row;
AST_LIST_LOCK(result);
while ((row = AST_LIST_REMOVE_HEAD(result, list))) {
ast_free(row);
}
AST_LIST_UNLOCK(result);
AST_LIST_HEAD_DESTROY(result);
ast_free(result);
}
static SQLHSTMT generic_prepare(struct odbc_obj *obj, void *data)
{
int res;
@ -200,8 +237,8 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
{
struct odbc_obj *obj = NULL;
struct acf_odbc_query *query;
char sql[2048] = "", varname[15], colnames[2048] = "";
int res, x, buflen = 0, escapecommas, dsn;
char sql[2048] = "", varname[15], colnames[2048] = "", rowcount[12] = "-1";
int res, x, y, buflen = 0, escapecommas, rowlimit = 1, dsn;
AST_DECLARE_APP_ARGS(args,
AST_APP_ARG(field)[100];
);
@ -209,6 +246,8 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
SQLSMALLINT colcount=0;
SQLLEN indicator;
SQLSMALLINT collength;
struct odbc_datastore *resultset = NULL;
struct odbc_datastore_row *row = NULL;
AST_LIST_LOCK(&queries);
AST_LIST_TRAVERSE(&queries, query, list) {
@ -220,6 +259,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
if (!query) {
ast_log(LOG_ERROR, "No such function '%s'\n", cmd);
AST_LIST_UNLOCK(&queries);
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
return -1;
}
@ -237,9 +277,16 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
pbx_builtin_setvar_helper(chan, varname, NULL);
}
/* Save this flag, so we can release the lock */
/* Save these flags, so we can release the lock */
escapecommas = ast_test_flag(query, OPT_ESCAPECOMMAS);
if (ast_test_flag(query, OPT_MULTIROW)) {
resultset = ast_calloc(1, sizeof(*resultset));
AST_LIST_HEAD_INIT(resultset);
if (query->rowlimit)
rowlimit = query->rowlimit;
else
rowlimit = INT_MAX;
}
AST_LIST_UNLOCK(&queries);
for (dsn = 0; dsn < 5; dsn++) {
@ -256,6 +303,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
ast_log(LOG_ERROR, "Unable to execute query [%s]\n", sql);
if (obj)
ast_odbc_release_obj(obj);
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
return -1;
}
@ -264,30 +312,36 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
ast_log(LOG_WARNING, "SQL Column Count error!\n[%s]\n\n", sql);
SQLFreeHandle (SQL_HANDLE_STMT, stmt);
ast_odbc_release_obj(obj);
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
return -1;
}
*buf = '\0';
res = SQLFetch(stmt);
if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
int res1 = -1;
if (res == SQL_NO_DATA) {
if (option_verbose > 3) {
if (option_verbose > 3)
ast_verbose(VERBOSE_PREFIX_4 "Found no rows [%s]\n", sql);
}
res1 = 0;
} else if (option_verbose > 3) {
ast_copy_string(rowcount, "0", sizeof(rowcount));
} else {
ast_log(LOG_WARNING, "Error %d in FETCH [%s]\n", res, sql);
}
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
ast_odbc_release_obj(obj);
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
return res1;
}
for (y = 0; y < rowlimit; y++) {
*buf = '\0';
for (x = 0; x < colcount; x++) {
int i, namelen;
char coldata[256], colname[256];
int i;
char coldata[256];
if (y == 0) {
char colname[256];
int namelen;
res = SQLDescribeCol(stmt, x + 1, (unsigned char *)colname, sizeof(colname), &collength, NULL, NULL, NULL, NULL);
if (((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) || collength == 0) {
@ -314,6 +368,21 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
break;
}
if (resultset) {
void *tmp = ast_realloc(resultset, sizeof(*resultset) + strlen(colnames) + 1);
if (!tmp) {
ast_log(LOG_ERROR, "No space for a new resultset?\n");
ast_free(resultset);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
ast_odbc_release_obj(obj);
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
return -1;
}
resultset = tmp;
strcpy((char *)resultset + sizeof(*resultset), colnames);
}
}
buflen = strlen(buf);
res = SQLGetData(stmt, x + 1, SQL_CHAR, coldata, sizeof(coldata), &indicator);
if (indicator == SQL_NULL_DATA) {
@ -323,9 +392,8 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
ast_log(LOG_WARNING, "SQL Get Data error!\n[%s]\n\n", sql);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
ast_odbc_release_obj(obj);
return -1;
y = -1;
goto end_acf_read;
}
/* Copy data, encoding '\' and ',' for the argument parser */
@ -348,8 +416,48 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
/* Trim trailing comma */
buf[buflen - 1] = '\0';
pbx_builtin_setvar_helper(chan, "~ODBCFIELDS~", colnames);
if (resultset) {
row = ast_calloc(1, sizeof(*row) + buflen);
if (!row) {
ast_log(LOG_ERROR, "Unable to allocate space for more rows in this resultset.\n");
goto end_acf_read;
}
strcpy((char *)row + sizeof(*row), buf);
AST_LIST_INSERT_TAIL(resultset, row, list);
/* Get next row */
res = SQLFetch(stmt);
if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
if (res != SQL_NO_DATA)
ast_log(LOG_WARNING, "Error %d in FETCH [%s]\n", res, sql);
y++;
break;
}
}
}
end_acf_read:
snprintf(rowcount, sizeof(rowcount), "%d", y);
pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
pbx_builtin_setvar_helper(chan, "~ODBCFIELDS~", colnames);
if (resultset) {
int uid;
struct ast_datastore *odbc_store;
ast_mutex_lock(&resultlock);
uid = ++resultcount;
ast_mutex_unlock(&resultlock);
snprintf(buf, len, "%d", uid);
odbc_store = ast_channel_datastore_alloc(&odbc_info, buf);
if (!odbc_store) {
ast_log(LOG_ERROR, "Rows retrieved, but unable to store it in the channel. Results fail.\n");
odbc_datastore_free(resultset);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
ast_odbc_release_obj(obj);
return -1;
}
odbc_store->data = resultset;
ast_channel_datastore_add(chan, odbc_store);
}
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
ast_odbc_release_obj(obj);
return 0;
@ -383,6 +491,60 @@ static struct ast_custom_function escape_function = {
.write = NULL,
};
static int acf_fetch(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len)
{
struct ast_datastore *store;
struct odbc_datastore *resultset;
struct odbc_datastore_row *row;
store = ast_channel_datastore_find(chan, &odbc_info, data);
if (!store) {
return -1;
}
resultset = store->data;
AST_LIST_LOCK(resultset);
row = AST_LIST_REMOVE_HEAD(resultset, list);
AST_LIST_UNLOCK(resultset);
if (!row) {
/* Cleanup datastore */
ast_channel_datastore_remove(chan, store);
ast_channel_datastore_free(store);
return -1;
}
pbx_builtin_setvar_helper(chan, "~ODBCFIELDS~", resultset->names);
ast_copy_string(buf, row->data, len);
ast_free(row);
return 0;
}
static struct ast_custom_function fetch_function = {
.name = "ODBC_FETCH",
.synopsis = "Fetch a row from a multirow query",
.syntax = "ODBC_FETCH(<result-id>)",
.desc =
"For queries which are marked as mode=multirow, the original query returns a\n"
"result-id from which results may be fetched. This function implements the\n"
"actual fetch of the results.\n",
.read = acf_fetch,
.write = NULL,
};
static char *app_odbcfinish = "ODBCFinish";
static char *syn_odbcfinish = "Clear the resultset of a successful multirow query";
static char *desc_odbcfinish =
"ODBCFinish(<result-id>)\n"
" Clears any remaining rows of the specified resultset\n";
static int exec_odbcfinish(struct ast_channel *chan, void *data)
{
struct ast_datastore *store = ast_channel_datastore_find(chan, &odbc_info, data);
if (!store) /* Already freed; no big deal. */
return 0;
ast_channel_datastore_remove(chan, store);
ast_channel_datastore_free(store);
return 0;
}
static int init_acf_query(struct ast_config *cfg, char *catg, struct acf_odbc_query **query)
{
const char *tmp;
@ -459,6 +621,13 @@ static int init_acf_query(struct ast_config *cfg, char *catg, struct acf_odbc_qu
ast_clear_flag((*query), OPT_ESCAPECOMMAS);
}
if ((tmp = ast_variable_retrieve(cfg, catg, "mode"))) {
if (strcasecmp(tmp, "multirow") == 0)
ast_set_flag((*query), OPT_MULTIROW);
if ((tmp = ast_variable_retrieve(cfg, catg, "rowlimit")))
sscanf(tmp, "%d", &((*query)->rowlimit));
}
(*query)->acf = ast_calloc(1, sizeof(struct ast_custom_function));
if (! (*query)->acf) {
free(*query);
@ -569,6 +738,8 @@ static int load_module(void)
struct ast_config *cfg;
char *catg;
res |= ast_custom_function_register(&fetch_function);
res |= ast_register_application(app_odbcfinish, exec_odbcfinish, syn_odbcfinish, desc_odbcfinish);
AST_LIST_LOCK(&queries);
cfg = ast_config_load(config);
@ -617,6 +788,8 @@ static int unload_module(void)
}
res |= ast_custom_function_unregister(&escape_function);
res |= ast_custom_function_unregister(&fetch_function);
res |= ast_unregister_application(app_odbcfinish);
/* Allow any threads waiting for this lock to pass (avoids a race) */
AST_LIST_UNLOCK(&queries);

Loading…
Cancel
Save