#include #include #ifdef MARIADB #include #include #else #include #include #endif #include #include "medmysql.h" #include "config.h" #include "records.h" #define _TEST_SIMULATE_SQL_ERRORS 0 #define MED_CACHE_DURATION 30 #define MED_SQL_BUF_LEN(fixed_string_len, escaped_string_len) \ (fixed_string_len + 7 + 2 + (escaped_string_len) * 2 + 1) // _latin1'STRING'\0 #define MED_CALLID_QUERY "select a.callid from acc a" \ " where a.method = 'INVITE' " \ " group by a.callid limit 0,200000" #define MED_FETCH_QUERY "(select distinct sip_code, sip_reason, method, callid, time, time_hires, " \ "src_leg, dst_leg, branch_id, id " \ "from acc where method = 'INVITE' and callid = '%s' order by time_hires asc) " \ "union all " \ "(select distinct sip_code, sip_reason, method, callid, time, time_hires, " \ "src_leg, dst_leg, branch_id, id " \ "from acc where method = 'BYE' and callid in ('%s', '%s"PBXSUFFIX"') " \ "order by length(callid) asc, time_hires asc) " \ "union all " \ "(select distinct sip_code, sip_reason, method, callid, time, time_hires, " \ "src_leg, dst_leg, branch_id, id " \ "from acc where method = 'BYE' and callid in ('%s', '%s"XFERSUFFIX"') " \ "order by length(callid) asc, time_hires asc) " \ "union all " \ "(select distinct sip_code, sip_reason, method, callid, time, time_hires, " \ "src_leg, dst_leg, branch_id, id " \ "from acc where method = 'BYE' and callid in ('%s', '%s"PBXSUFFIX""XFERSUFFIX"') " \ "order by length(callid) asc, time_hires asc)" #define MED_LOAD_PEER_QUERY "select h.ip, h.host, g.peering_contract_id, h.id " \ "from provisioning.voip_peer_hosts h, provisioning.voip_peer_groups g " \ "where g.id = h.group_id" #define MED_LOAD_CDR_TAG_IDS_QUERY "select id, type from accounting.cdr_tag" typedef struct _medmysql_handler { const char *name; MYSQL *m; int is_transaction; GQueue transaction_statements; } medmysql_handler; typedef struct _statement_str { char *str; unsigned long len; } statement_str; typedef struct _cdr_tag_record { unsigned long long cdr_id; char *sql_record; } cdr_tag_record; typedef struct { const char *begin; const char *end; unsigned long cdr_id; } single_cdr; typedef struct _medmysql_batch_definition { const char *sql_init_string, *sql_finish_string; unsigned int min_string_tail_room; // defaults to 1024 if not set int (*single_flush_func)(struct medmysql_str *); int (*full_flush_func)(struct medmysql_batches *); medmysql_handler **handler_ptr; } medmysql_batch_definition; static medmysql_handler *cdr_handler; static medmysql_handler *int_cdr_handler; static medmysql_handler *med_handler; static medmysql_handler *prov_handler; static medmysql_handler *stats_handler; static unsigned long medmysql_cdr_auto_increment_value; static unsigned long medmysql_tag_provider_customer; static unsigned long medmysql_tag_provider_reseller; static unsigned long medmysql_tag_provider_carrier; static unsigned long medmysql_tag_direction_source; static unsigned long medmysql_tag_direction_destination; static int medmysql_flush_cdr(struct medmysql_batches *); static int medmysql_flush_int_cdr(struct medmysql_batches *); static int medmysql_flush_all_med(struct medmysql_batches *); static int medmysql_flush_med_str(struct medmysql_str *); static int medmysql_flush_medlist(struct medmysql_str *); static int medmysql_flush_call_stat_info(void); static void medmysql_handler_close(medmysql_handler **h); static int medmysql_handler_transaction(medmysql_handler *h); static const medmysql_batch_definition medmysql_trash_def = { .sql_init_string = "insert into acc_trash (method, from_tag, to_tag, callid, sip_code, " \ "sip_reason, time, time_hires, src_leg, dst_leg, dst_user, dst_ouser, " \ "dst_domain, src_user, src_domain, branch_id) select method, from_tag, to_tag, " \ "callid, sip_code, sip_reason, time, time_hires, src_leg, dst_leg, " \ "dst_user, dst_ouser, dst_domain, src_user, src_domain, branch_id from acc " \ "where callid in (", .sql_finish_string = ")", .single_flush_func = medmysql_flush_medlist, .handler_ptr = &med_handler, }; static const medmysql_batch_definition medmysql_backup_def = { .sql_init_string = "insert into acc_backup (method, from_tag, to_tag, callid, sip_code, " \ "sip_reason, time, time_hires, src_leg, dst_leg, dst_user, dst_ouser, " \ "dst_domain, src_user, src_domain, branch_id) select method, from_tag, to_tag, " \ "callid, sip_code, sip_reason, time, time_hires, src_leg, dst_leg, " \ "dst_user, dst_ouser, dst_domain, src_user, src_domain, branch_id from acc " \ "where callid in (", .sql_finish_string = ")", .single_flush_func = medmysql_flush_medlist, .handler_ptr = &med_handler, }; static const medmysql_batch_definition medmysql_delete_def = { .sql_init_string = "delete from acc where callid in (", .sql_finish_string = ")", .full_flush_func = medmysql_flush_all_med, .handler_ptr = &med_handler, }; static const medmysql_batch_definition medmysql_cdr_def = { .sql_init_string = "insert into cdr (id, update_time, " \ "source_user_id, source_provider_id, source_external_subscriber_id, "\ "source_external_contract_id, source_account_id, source_user, source_domain, " \ "source_cli, source_clir, source_ip, "\ "destination_user_id, destination_provider_id, destination_external_subscriber_id, "\ "destination_external_contract_id, destination_account_id, destination_user, destination_domain, " \ "destination_user_in, destination_domain_in, destination_user_dialed, " \ "peer_auth_user, peer_auth_realm, call_type, call_status, call_code, init_time, start_time, "\ "duration, call_id, " \ "source_carrier_cost, source_reseller_cost, source_customer_cost, " \ "destination_carrier_cost, destination_reseller_cost, destination_customer_cost, " \ "split, " \ "source_gpp0, source_gpp1, source_gpp2, source_gpp3, source_gpp4, " \ "source_gpp5, source_gpp6, source_gpp7, source_gpp8, source_gpp9, " \ "destination_gpp0, destination_gpp1, destination_gpp2, destination_gpp3, destination_gpp4, " \ "destination_gpp5, destination_gpp6, destination_gpp7, destination_gpp8, destination_gpp9, " \ "source_lnp_prefix, destination_lnp_prefix, " \ "source_user_out, destination_user_out, " \ "source_lnp_type, destination_lnp_type" \ ") values ", .full_flush_func = medmysql_flush_cdr, .min_string_tail_room = 6000, .handler_ptr = &cdr_handler, }; static const medmysql_batch_definition medmysql_int_cdr_def = { .sql_init_string = "insert into int_cdr (id, update_time, " \ "source_user_id, source_provider_id, source_external_subscriber_id, "\ "source_external_contract_id, source_account_id, source_user, source_domain, " \ "source_cli, source_clir, source_ip, "\ "destination_user_id, destination_provider_id, destination_external_subscriber_id, "\ "destination_external_contract_id, destination_account_id, destination_user, destination_domain, " \ "destination_user_in, destination_domain_in, destination_user_dialed, " \ "peer_auth_user, peer_auth_realm, call_type, call_status, call_code, init_time, start_time, "\ "duration, call_id, " \ "source_carrier_cost, source_reseller_cost, source_customer_cost, " \ "destination_carrier_cost, destination_reseller_cost, destination_customer_cost, " \ "split, " \ "source_gpp0, source_gpp1, source_gpp2, source_gpp3, source_gpp4, " \ "source_gpp5, source_gpp6, source_gpp7, source_gpp8, source_gpp9, " \ "destination_gpp0, destination_gpp1, destination_gpp2, destination_gpp3, destination_gpp4, " \ "destination_gpp5, destination_gpp6, destination_gpp7, destination_gpp8, destination_gpp9, " \ "source_lnp_prefix, destination_lnp_prefix, " \ "source_user_out, destination_user_out, " \ "source_lnp_type, destination_lnp_type, acc_ref" \ ") values ", .sql_finish_string = " on duplicate key update " "update_time = values(update_time), source_user_id = values(source_user_id), " \ "source_provider_id = values(source_provider_id), source_external_subscriber_id " \ "= values(source_external_subscriber_id), source_external_contract_id = " \ "values(source_external_contract_id), source_account_id = " \ "values(source_account_id), source_user = values(source_user), source_domain = " \ "values(source_domain), source_cli = values(source_cli), source_clir = " \ "values(source_clir), source_ip = values(source_ip), destination_user_id = " \ "values(destination_user_id), destination_provider_id = " \ "values(destination_provider_id), destination_external_subscriber_id = " \ "values(destination_external_subscriber_id), destination_external_contract_id = " \ "values(destination_external_contract_id), destination_account_id = " \ "values(destination_account_id), destination_user = values(destination_user), " \ "destination_domain = values(destination_domain), destination_user_in = " \ "values(destination_user_in), destination_domain_in = " \ "values(destination_domain_in), destination_user_dialed = " \ "values(destination_user_dialed), peer_auth_user = values(peer_auth_user), " \ "peer_auth_realm = values(peer_auth_realm), call_type = values(call_type), " \ "call_status = values(call_status), call_code = values(call_code), init_time = " \ "values(init_time), start_time = values(start_time), duration = " \ "values(duration), call_id = values(call_id), source_carrier_cost = " \ "values(source_carrier_cost), source_reseller_cost = " \ "values(source_reseller_cost), source_customer_cost = " \ "values(source_customer_cost), destination_carrier_cost = " \ "values(destination_carrier_cost), destination_reseller_cost = " \ "values(destination_reseller_cost), destination_customer_cost = " \ "values(destination_customer_cost), split = values(split), source_gpp0 = " \ "values(source_gpp0), source_gpp1 = values(source_gpp1), source_gpp2 = " \ "values(source_gpp2), source_gpp3 = values(source_gpp3), source_gpp4 = " \ "values(source_gpp4), source_gpp5 = values(source_gpp5), source_gpp6 = " \ "values(source_gpp6), source_gpp7 = values(source_gpp7), source_gpp8 = " \ "values(source_gpp8), source_gpp9 = values(source_gpp9), destination_gpp0 = " \ "values(destination_gpp0), destination_gpp1 = values(destination_gpp1), " \ "destination_gpp2 = values(destination_gpp2), destination_gpp3 = " \ "values(destination_gpp3), destination_gpp4 = values(destination_gpp4), " \ "destination_gpp5 = values(destination_gpp5), destination_gpp6 = " \ "values(destination_gpp6), destination_gpp7 = values(destination_gpp7), " \ "destination_gpp8 = values(destination_gpp8), destination_gpp9 = " \ "values(destination_gpp9), source_lnp_prefix = values(source_lnp_prefix), " \ "destination_lnp_prefix = values(destination_lnp_prefix), source_user_out = " \ "values(source_user_out), destination_user_out = values(destination_user_out), " \ "source_lnp_type = values(source_lnp_type), destination_lnp_type = " \ "values(destination_lnp_type), export_status = 'unexported'", .full_flush_func = medmysql_flush_int_cdr, .min_string_tail_room = 9000, .handler_ptr = &int_cdr_handler, }; static const medmysql_batch_definition medmysql_del_int_cdr_def = { .sql_init_string = "delete from int_cdr where call_id in (", .sql_finish_string = ")", .single_flush_func = medmysql_flush_med_str, .handler_ptr = &int_cdr_handler, }; static const medmysql_batch_definition medmysql_tag_def = { .sql_init_string = "insert into cdr_tag_data (cdr_id, provider_id, direction_id, tag_id, " \ "val, cdr_start_time) values ", .single_flush_func = medmysql_flush_med_str, .handler_ptr = &cdr_handler, }; static const medmysql_batch_definition medmysql_int_tag_def = { .sql_init_string = "insert into int_cdr_tag_data (cdr_id, provider_id, direction_id, tag_id, " \ "val, cdr_start_time) values ", .sql_finish_string = " on duplicate key update " "val = values(val)", .single_flush_func = medmysql_flush_med_str, .handler_ptr = &int_cdr_handler, }; static const medmysql_batch_definition medmysql_mos_def = { .sql_init_string = "insert into cdr_mos_data (" \ "cdr_id, mos_average, mos_average_packetloss, mos_average_jitter, " \ "mos_average_roundtrip, cdr_start_time" \ ") values ", .single_flush_func = medmysql_flush_med_str, .handler_ptr = &cdr_handler, }; static const medmysql_batch_definition medmysql_group_def = { .sql_init_string = "insert into cdr_group (" \ "cdr_id, call_id, cdr_start_time" \ ") values ", .single_flush_func = medmysql_flush_med_str, .handler_ptr = &cdr_handler, }; static const medmysql_batch_definition medmysql_int_group_def = { .sql_init_string = "insert ignore into int_cdr_group (" \ "cdr_id, call_id, cdr_start_time" \ ") values ", .single_flush_func = medmysql_flush_med_str, .handler_ptr = &int_cdr_handler, }; static void statement_free(void *stm_p) { statement_str *stm = stm_p; free(stm->str); free(stm); } static void __g_queue_clear_full(GQueue *q, GDestroyNotify free_func) { void *p; while ((p = g_queue_pop_head(q))) free_func(p); } static unsigned int medmysql_real_query_errno(MYSQL *m, const char *s, unsigned long len) { #if _TEST_SIMULATE_SQL_ERRORS if (rand() % 10 == 0) { L_INFO("Simulating SQL error - statement '%.*s'", (int) len, s); return CR_SERVER_LOST; } #endif int ret = mysql_real_query(m, s, len); if (!ret) return 0; return mysql_errno(m); } static int medmysql_query_wrapper(medmysql_handler *mysql, const char *stmt_str, unsigned long length) { int i; unsigned int err; for (i = 0; i < 10; i++) { err = medmysql_real_query_errno(mysql->m, stmt_str, length); if (!err) break; if (err == CR_SERVER_GONE_ERROR || err == CR_SERVER_LOST || err == CR_CONN_HOST_ERROR || err == CR_CONNECTION_ERROR) { L_WARNING("Lost connection to SQL server during query, retrying..."); sleep(10); continue; } break; } return !!err; } static int medmysql_query_wrapper_tx(medmysql_handler *mysql, const char *stmt_str, unsigned long length) { int i; unsigned int err; if (!mysql->is_transaction) { L_CRITICAL("SQL mode is not in transaction"); return -1; } for (i = 0; i < 10; i++) { err = medmysql_real_query_errno(mysql->m, stmt_str, length); if (!err) break; if (err == CR_SERVER_GONE_ERROR || err == CR_SERVER_LOST || err == CR_CONN_HOST_ERROR || err == CR_CONNECTION_ERROR || err == ER_LOCK_WAIT_TIMEOUT || err == ER_LOCK_DEADLOCK) { // rollback, cancel transaction, restart transaction, replay all statements, // and then try again L_WARNING("Got error %u from SQL server during transaction, retrying...", err); err = medmysql_real_query_errno(mysql->m, "rollback", 8); if (err) { L_CRITICAL("Got error %u from SQL during rollback", mysql_errno(mysql->m)); return -1; } mysql->is_transaction = 0; sleep(10); if (medmysql_handler_transaction(mysql)) return -1; // steal the statement queue and recursively replay them into a new empty queue GQueue replay = mysql->transaction_statements; g_queue_init(&mysql->transaction_statements); statement_str *stm; while ((stm = g_queue_pop_head(&replay))) { if (medmysql_query_wrapper_tx(mysql, stm->str, stm->len)) { __g_queue_clear_full(&mysql->transaction_statements, statement_free); statement_free(stm); return -1; } statement_free(stm); } continue; } break; } if (!err) { // append statement to queue for possible replaying statement_str *stm = malloc(sizeof(*stm)); if (!stm) { L_CRITICAL("Out of memory (malloc statement_str)"); return -1; } stm->str = malloc(length); if (!stm->str) { L_CRITICAL("Out of memory (malloc statement_str body)"); free(stm); return -1; } memcpy(stm->str, stmt_str, length); stm->len = length; g_queue_push_tail(&mysql->transaction_statements, stm); } return !!err; } static medmysql_handler *medmysql_handler_init(const char *name, const char *host, const char *user, const char *pass, const char *db, unsigned int port) { medmysql_handler *ret; bool recon = 1; ret = malloc(sizeof(*ret)); if (!ret) { L_CRITICAL("Out of memory (malloc in medmysql_handler_init)"); return NULL; } memset(ret, 0, sizeof(*ret)); ret->name = name; g_queue_init(&ret->transaction_statements); ret->m = mysql_init(NULL); if (!ret->m) { L_CRITICAL("Out of memory (mysql_init)"); goto err; } if(!mysql_real_connect(ret->m, host, user, pass, db, port, NULL, 0)) { L_CRITICAL("Error connecting to %s db: %s", name, mysql_error(ret->m)); goto err; } if(mysql_options(ret->m, MYSQL_OPT_RECONNECT, &recon) != 0) { L_CRITICAL("Error setting reconnect-option for %s db: %s", name, mysql_error(ret->m)); goto err; } if(mysql_autocommit(ret->m, 1) != 0) { L_CRITICAL("Error setting autocommit=1 for %s db: %s", name, mysql_error(ret->m)); goto err; } return ret; err: medmysql_handler_close(&ret); return NULL; } /**********************************************************************/ static unsigned long medmysql_get_num_col(medmysql_handler *handler, const char *query) { if (medmysql_query_wrapper(handler, query, strlen(query))) { L_CRITICAL("Error getting DB value (query '%s'): %s", query, mysql_error(handler->m)); return 0; } MYSQL_RES *res = mysql_store_result(handler->m); if (!res) { L_CRITICAL("No result set returned from SQL (query '%s'): %s", query, mysql_error(handler->m)); return 0; } MYSQL_ROW row = mysql_fetch_row(res); if (!row || !row[0]) { L_CRITICAL("No row returned from SQL (query '%s'): %s", query, mysql_error(handler->m)); mysql_free_result(res); return 0; } unsigned long num = strtoul(row[0], NULL, 10); if (!num) { L_CRITICAL("Returned number from DB (query '%s') is '%s'", query, row[0]); mysql_free_result(res); return 0; } mysql_free_result(res); return num; } /**********************************************************************/ int medmysql_init() { cdr_handler = medmysql_handler_init("CDR", config_cdr_host, config_cdr_user, config_cdr_pass, config_cdr_db, config_cdr_port); if (!cdr_handler) goto err; int_cdr_handler = medmysql_handler_init("INT-CDR", config_intermediate_cdr_host, config_cdr_user, config_cdr_pass, config_cdr_db, config_intermediate_cdr_port); if (!int_cdr_handler) goto err; med_handler = medmysql_handler_init("ACC", config_med_host, config_med_user, config_med_pass, config_med_db, config_med_port); if (!med_handler) goto err; prov_handler = medmysql_handler_init("provisioning", config_prov_host, config_prov_user, config_prov_pass, config_prov_db, config_prov_port); if (!prov_handler) goto err; stats_handler = medmysql_handler_init("STATS", config_stats_host, config_stats_user, config_stats_pass, config_stats_db, config_stats_port); if (!stats_handler) goto err; return 0; err: medmysql_cleanup(); return -1; } /**********************************************************************/ static void medmysql_handler_close(medmysql_handler **h) { if (!*h) return; if ((*h)->m) mysql_close((*h)->m); if ((*h)->transaction_statements.length) L_WARNING("Closing DB handle with still %u statements in queue", (*h)->transaction_statements.length); __g_queue_clear_full(&(*h)->transaction_statements, statement_free); free(*h); *h = NULL; } void medmysql_cleanup() { medmysql_handler_close(&cdr_handler); medmysql_handler_close(&int_cdr_handler); medmysql_handler_close(&med_handler); medmysql_handler_close(&prov_handler); medmysql_handler_close(&stats_handler); } static void medmysql_buf_escape(MYSQL *m, size_t *buflen, const char *str, const size_t str_len, char *orig_dest, size_t sql_buffer_size) { char *dest = orig_dest; // verify buffer space requirements: string itself * 2, quotes '', optional "_latin1", null if (MED_SQL_BUF_LEN(*buflen, str_len) > sql_buffer_size) abort(); if (!g_utf8_validate(str, str_len, NULL)) { // force the string as latin1 memcpy(dest, "_latin1'", 8); dest += 8; } else *dest++ = '\''; size_t esc_len = mysql_real_escape_string(m, dest, str, str_len); dest += esc_len; *dest++ = '\''; *dest = '\0'; *buflen += dest - orig_dest; } static inline void medmysql_buf_escape_c(MYSQL *m, size_t *buflen, const char *str, char *orig_dest, size_t sql_buffer_size) { medmysql_buf_escape(m, buflen, str, strlen(str), orig_dest, sql_buffer_size); } static inline void medmysql_buf_escape_gstring(MYSQL *m, const char *str, GString *s) { size_t str_len = strlen(str); size_t orig_size = s->len; size_t req_size = MED_SQL_BUF_LEN(orig_size, str_len); g_string_set_size(s, req_size); char *dst = s->str + orig_size; medmysql_buf_escape(m, &orig_size, str, str_len, dst, s->len); g_string_set_size(s, orig_size); } #define BUFPRINT(x...) g_string_append_printf(sql_buffer, x) #define BUFESCAPE(x) medmysql_buf_escape_gstring(med_handler->m, x, sql_buffer) /**********************************************************************/ int medmysql_insert_records(GQueue *records, const char *table) { int ret = 0, entries = 0; if (!records->length) return 0; GString *sql_buffer = g_string_sized_new(records->length * 512); BUFPRINT("INSERT INTO kamailio.acc_%s " \ "(sip_code,sip_reason,method,callid,time,time_hires,src_leg,dst_leg,branch_id) VALUES ", table); for (GList *l = records->head; l; l = l->next) { med_entry_t *e = l->data; // this is only used for inserting redis entries into mysql if (!e->redis) continue; BUFPRINT("("); BUFESCAPE(e->sip_code); BUFPRINT(","); BUFESCAPE(e->sip_reason); BUFPRINT(","); BUFESCAPE(e->sip_method); BUFPRINT(","); BUFESCAPE(e->callid); BUFPRINT(","); BUFESCAPE(e->timestamp); BUFPRINT(",'%f',", e->unix_timestamp); BUFESCAPE(e->src_leg); BUFPRINT(","); BUFESCAPE(e->dst_leg); BUFPRINT(","); BUFESCAPE(e->branch_id); BUFPRINT("),"); entries++; } if (!entries) goto out; g_string_set_size(sql_buffer, sql_buffer->len - 1); L_DEBUG("Issuing record insert query: %s\n", sql_buffer->str); ret = medmysql_query_wrapper(med_handler, sql_buffer->str, sql_buffer->len); if (ret != 0) { L_ERROR("Error executing query '%s': %s", sql_buffer->str, mysql_error(med_handler->m)); } out: g_string_free(sql_buffer, TRUE); return ret; } /**********************************************************************/ gboolean medmysql_fetch_callids(GQueue *output) { MYSQL_RES *res; MYSQL_ROW row; /* char query[1024] = ""; */ /* g_strlcpy(query, MED_CALLID_QUERY, sizeof(query)); */ /*L_DEBUG("q='%s'", query);*/ if(medmysql_query_wrapper(med_handler, MED_CALLID_QUERY, strlen(MED_CALLID_QUERY)) != 0) { L_CRITICAL("Error getting acc callids: %s", mysql_error(med_handler->m)); return FALSE; } res = mysql_store_result(med_handler->m); while((row = mysql_fetch_row(res)) != NULL) { if(row[0] == NULL) { g_queue_push_tail(output, g_strdup("0")); } else { g_queue_push_tail(output, g_strdup(row[0])); } /*L_DEBUG("callid[%"PRIu64"]='%s'", i, c->value);*/ if (check_shutdown()) { mysql_free_result(res); g_queue_clear_full(output, free); return FALSE; } } mysql_free_result(res); return TRUE; } /**********************************************************************/ int medmysql_fetch_records(char *callid, GQueue *entries, int warn_empty, records_filter_func filter, void *filter_data) { MYSQL_RES *res; MYSQL_ROW row; size_t callid_len = strlen(callid); int ret = 0; unsigned long long count = 0; char esc_callid[callid_len*2+1]; mysql_real_escape_string(med_handler->m, esc_callid, callid, callid_len); g_autoptr(char) query = g_strdup_printf(MED_FETCH_QUERY, esc_callid, esc_callid, esc_callid, esc_callid, esc_callid, esc_callid, esc_callid); assert(query != NULL); /*L_DEBUG("q='%s'", query);*/ if(medmysql_query_wrapper(med_handler, query, strlen(query)) != 0) { L_CRITICAL("Error getting acc records for callid '%s': %s", callid, mysql_error(med_handler->m)); return -1; } res = mysql_store_result(med_handler->m); count = mysql_num_rows(res); if(count == 0) { if (warn_empty) L_CRITICAL("No records found for callid '%s'!", callid); ret = -1; goto out; } ret = 0; while((row = mysql_fetch_row(res)) != NULL) { med_entry_t *e = g_slice_alloc0(sizeof(*e)); g_strlcpy(e->sip_code, row[0], sizeof(e->sip_code)); g_strlcpy(e->sip_reason, row[1], sizeof(e->sip_reason)); g_strlcpy(e->sip_method, row[2], sizeof(e->sip_method)); e->callid = g_strdup(row[3]); g_strlcpy(e->timestamp, row[4], sizeof(e->timestamp)); e->unix_timestamp = atof(row[5]); e->src_leg = g_strdup(row[6] ? : ""); e->dst_leg = g_strdup(row[7] ? : ""); g_strlcpy(e->branch_id, row[8] ? : "", sizeof(e->branch_id)); e->acc_ref = g_strdup(row[9]); e->valid = 1; cdr_parse_entry(e); if (!filter || filter(e, filter_data)) { g_queue_push_tail(entries, e); if (records_handle_refer(entries, e, callid)) ret = 1; } else med_entry_free(e); if (check_shutdown()) return -1; } out: mysql_free_result(res); return ret; } /**********************************************************************/ static int medmysql_batch_prepare(struct medmysql_str *str) { const medmysql_batch_definition *def = str->def; struct medmysql_batches *batches = str->batches; unsigned int tail_room = def->min_string_tail_room; if (!tail_room) tail_room = 1024; if (str->len > (PACKET_SIZE - tail_room)) { if (def->single_flush_func) { if (def->single_flush_func(str)) return -1; } if (def->full_flush_func) { if (def->full_flush_func(batches)) return -1; } } if (str->len == 0) str->len = sprintf(str->str, "%s", def->sql_init_string); return 0; } /**********************************************************************/ int medmysql_trash_entries(const char *callid, struct medmysql_batches *batches) { char esc_callid[strlen(callid)*2+1]; mysql_real_escape_string(med_handler->m, esc_callid, callid, strlen(callid)); if (medmysql_batch_prepare(&batches->acc_trash)) return -1; batches->acc_trash.len += sprintf(batches->acc_trash.str + batches->acc_trash.len, "'%s',", esc_callid); return medmysql_delete_entries(esc_callid, batches); } /**********************************************************************/ int medmysql_backup_entries(const char *callid, struct medmysql_batches *batches) { char esc_callid[strlen(callid)*2+1]; mysql_real_escape_string(med_handler->m, esc_callid, callid, strlen(callid)); if (medmysql_batch_prepare(&batches->acc_backup)) return -1; batches->acc_backup.len += sprintf(batches->acc_backup.str + batches->acc_backup.len, "'%s',", esc_callid); return medmysql_delete_entries(esc_callid, batches); } /**********************************************************************/ static int medmysql_flush_all_med(struct medmysql_batches *batches) { if (medmysql_flush_medlist(&batches->acc_backup)) return -1; if (medmysql_flush_medlist(&batches->acc_trash)) return -1; if (medmysql_flush_medlist(&batches->to_delete)) return -1; return 0; } int medmysql_delete_entries(const char *callid, struct medmysql_batches *batches) { if (medmysql_batch_prepare(&batches->to_delete)) return -1; batches->to_delete.len += sprintf(batches->to_delete.str + batches->to_delete.len, "'%s',", callid); return 0; } #define CDRPRINT(x) batch->cdrs.len += sprintf(batch->cdrs.str + batch->cdrs.len, x) #define CDRESCAPE(x) medmysql_buf_escape(med_handler->m, &batch->cdrs.len, x->str, x->len, batch->cdrs.str + batch->cdrs.len, PACKET_SIZE) #define CDRESCAPE_C(x) medmysql_buf_escape_c(med_handler->m, &batch->cdrs.len, x, batch->cdrs.str + batch->cdrs.len, PACKET_SIZE) /**********************************************************************/ static int medmysql_tag_record(GQueue *q, unsigned long cdr_id, unsigned long provider_id, unsigned long direction_id, const GString *value, double start_time, unsigned long tag_id) { char esc_value[value->len*2+1]; mysql_real_escape_string(med_handler->m, esc_value, value->str, value->len); cdr_tag_record *record = malloc(sizeof(*record)); record->cdr_id = cdr_id; if (asprintf(&record->sql_record, "%lu, %lu, %lu, '%s', %f", provider_id, direction_id, tag_id, esc_value, start_time) <= 0) { free(record); return -1; } g_queue_push_tail(q, record); return 0; } static int medmysql_mos_record(GQueue *q, unsigned long cdr_id, double avg_score, int avg_packetloss, int avg_jitter, int avg_rtt, double start_time) { cdr_tag_record *record = malloc(sizeof(*record)); record->cdr_id = cdr_id; if (asprintf(&record->sql_record, "%.1f, %.1f, %.1f, %.1f, %.3f", avg_score, (double) avg_packetloss, (double) avg_jitter, (double) avg_rtt, start_time) <= 0) { free(record); return -1; } g_queue_push_tail(q, record); return 0; } static int medmysql_group_record(MYSQL *m, GQueue *q, unsigned long cdr_id, const GString *group, double start_time) { char esc_group[group->len*2+1]; mysql_real_escape_string(m, esc_group, group->str, group->len); cdr_tag_record *record = malloc(sizeof(*record)); record->cdr_id = cdr_id; if (asprintf(&record->sql_record, "'%s', %.3f", esc_group, start_time) <= 0) { free(record); return -1; } g_queue_push_tail(q, record); return 0; } static int medmysql_tag_cdr(struct medmysql_cdr_batch *batch, unsigned long provider_id, unsigned long direction_id, const char *tag_name, const GString *tag_value, const cdr_entry_t *e) { gpointer tag_id; if (!tag_value->len) return 0; if ((tag_id = g_hash_table_lookup(med_cdr_tag_table, tag_name)) == NULL) { L_WARNING("Call-Id '%s' has no cdr tag type '%s', '%s'", e->call_id->str, tag_name, tag_value->str); return -1; } if (medmysql_tag_record(&batch->tags.q, batch->num_cdrs, provider_id, direction_id, tag_value, e->start_time, GPOINTER_TO_UINT(tag_id))) return -1; return 0; } int medmysql_insert_cdrs(cdr_entry_t *entries, uint64_t count, struct medmysql_batches *batches) { uint64_t i; int gpp; int ret = 1; // default to intermediate for(i = 0; i < count; ++i) { cdr_entry_t *e = &(entries[i]); struct medmysql_cdr_batch *batch = &batches->cdr_batch; if (e->intermediate) batch = &batches->int_cdr_batch; else ret = 0; // if at least one record is not intermediate, we delete if (medmysql_batch_prepare(&batch->cdrs)) return -1; char str_source_clir[4] = ""; char str_split[4] = ""; char str_init_time[32] = ""; char str_start_time[32] = ""; char str_duration[32] = ""; char str_source_carrier_cost[32] = ""; char str_source_reseller_cost[32] = ""; char str_source_customer_cost[32] = ""; char str_dest_carrier_cost[32] = ""; char str_dest_reseller_cost[32] = ""; char str_dest_customer_cost[32] = ""; char str_source_accid[32] = ""; char str_dest_accid[32] = ""; snprintf(str_source_clir, sizeof(str_source_clir), "%u", e->source_clir); snprintf(str_split, sizeof(str_split), "%u", e->split); snprintf(str_init_time, sizeof(str_init_time), "%f", e->init_time); snprintf(str_start_time, sizeof(str_start_time), "%f", e->start_time); snprintf(str_duration, sizeof(str_duration), "%f", e->duration); snprintf(str_source_carrier_cost, sizeof(str_source_carrier_cost), "%u", e->source_carrier_cost); snprintf(str_source_reseller_cost, sizeof(str_source_reseller_cost), "%u", e->source_reseller_cost); snprintf(str_source_customer_cost, sizeof(str_source_customer_cost), "%u", e->source_customer_cost); snprintf(str_dest_carrier_cost, sizeof(str_dest_carrier_cost), "%u", e->destination_carrier_cost); snprintf(str_dest_reseller_cost, sizeof(str_dest_reseller_cost), "%u", e->destination_reseller_cost); snprintf(str_dest_customer_cost, sizeof(str_dest_customer_cost), "%u", e->destination_customer_cost); snprintf(str_source_accid, sizeof(str_source_accid), "%llu", (long long unsigned int) e->source_account_id); snprintf(str_dest_accid, sizeof(str_dest_accid), "%llu", (long long unsigned int) e->destination_account_id); const char *begin_ptr = batch->cdrs.str + batch->cdrs.len; CDRPRINT("(NULL, now(), "); CDRESCAPE(e->source_user_id); CDRPRINT(","); CDRESCAPE(e->source_provider_id); CDRPRINT(","); CDRESCAPE(e->source_ext_subscriber_id); CDRPRINT(","); CDRESCAPE(e->source_ext_contract_id); CDRPRINT(","); CDRESCAPE_C(str_source_accid); CDRPRINT(","); CDRESCAPE(e->source_user); CDRPRINT(","); CDRESCAPE(e->source_domain); CDRPRINT(","); CDRESCAPE(e->source_cli); CDRPRINT(","); CDRESCAPE_C(str_source_clir); CDRPRINT(","); CDRESCAPE(e->source_ip); CDRPRINT(","); CDRESCAPE(e->destination_user_id); CDRPRINT(","); CDRESCAPE(e->destination_provider_id); CDRPRINT(","); CDRESCAPE(e->destination_ext_subscriber_id); CDRPRINT(","); CDRESCAPE(e->destination_ext_contract_id); CDRPRINT(","); CDRESCAPE_C(str_dest_accid); CDRPRINT(","); CDRESCAPE(e->destination_user); CDRPRINT(","); CDRESCAPE(e->destination_domain); CDRPRINT(","); CDRESCAPE(e->destination_user_in); CDRPRINT(","); CDRESCAPE(e->destination_domain_in); CDRPRINT(","); CDRESCAPE(e->destination_dialed); CDRPRINT(","); CDRESCAPE(e->peer_auth_user); CDRPRINT(","); CDRESCAPE(e->peer_auth_realm); CDRPRINT(","); CDRESCAPE(e->call_type); CDRPRINT(","); CDRESCAPE(e->call_status); CDRPRINT(","); CDRESCAPE(e->call_code); CDRPRINT(","); CDRESCAPE_C(str_init_time); CDRPRINT(","); CDRESCAPE_C(str_start_time); CDRPRINT(","); CDRESCAPE_C(str_duration); CDRPRINT(","); CDRESCAPE(e->call_id); CDRPRINT(","); CDRESCAPE_C(str_source_carrier_cost); CDRPRINT(","); CDRESCAPE_C(str_source_reseller_cost); CDRPRINT(","); CDRESCAPE_C(str_source_customer_cost); CDRPRINT(","); CDRESCAPE_C(str_dest_carrier_cost); CDRPRINT(","); CDRESCAPE_C(str_dest_reseller_cost); CDRPRINT(","); CDRESCAPE_C(str_dest_customer_cost); CDRPRINT(","); CDRESCAPE_C(str_split); for(gpp = 0; gpp < 10; ++gpp) { if(e->source_gpp[gpp]->len > 0) { CDRPRINT(","); CDRESCAPE(e->source_gpp[gpp]); } else { CDRPRINT(",NULL"); } } for(gpp = 0; gpp < 10; ++gpp) { if(e->destination_gpp[gpp]->len > 0) { CDRPRINT(","); CDRESCAPE(e->destination_gpp[gpp]); } else { CDRPRINT(",NULL"); } } CDRPRINT(","); CDRESCAPE(e->source_lnp_prefix); CDRPRINT(","); CDRESCAPE(e->destination_lnp_prefix); CDRPRINT(","); CDRESCAPE(e->source_user_out); CDRPRINT(","); CDRESCAPE(e->destination_user_out); if(e->source_lnp_type->len > 0) { CDRPRINT(","); CDRESCAPE(e->source_lnp_type); } else { CDRPRINT(",NULL"); } if(e->destination_lnp_type->len > 0) { CDRPRINT(","); CDRESCAPE(e->destination_lnp_type); } else { CDRPRINT(",NULL"); } if (batch == &batches->int_cdr_batch) { CDRPRINT(","); CDRESCAPE(e->acc_ref); } CDRPRINT("),"); const char *end_ptr = batch->cdrs.str + batch->cdrs.len; size_t cdr_len = end_ptr - begin_ptr; L_DEBUG("CDR entry to be written: %.*s", (int) cdr_len, begin_ptr); if (!cdr_verify_fields(e)) { L_WARNING("CDR field verification failed for record"); // backtrack if (config_cdr_error_file) if (!cdr_write_error_record(config_cdr_error_file, begin_ptr, cdr_len)) return -1; batch->cdrs.len -= cdr_len; continue; } single_cdr *single = g_slice_alloc(sizeof(*single)); single->begin = begin_ptr; single->end = end_ptr; single->cdr_id = batch->num_cdrs; g_queue_push_tail(&batch->cdrs.q, single); if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_source, "header=P-Asserted-Identity", e->header_pai, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_source, "header=Diversion", e->header_diversion, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_source, "header=User-to-User", e->header_u2u, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_source, "concurrent_calls_quota", e->source_concurrent_calls_quota, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_source, "concurrent_calls_count", e->source_concurrent_calls_count, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_source, "concurrent_calls_count_customer", e->source_concurrent_calls_count_customer, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_source, "header=P-Preferred-Identity", e->header_ppi, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_destination, "furnished_charging_info", e->furnished_charging_info, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_destination, "concurrent_calls_quota", e->destination_concurrent_calls_quota, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_destination, "concurrent_calls_count", e->destination_concurrent_calls_count, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_destination, "concurrent_calls_count_customer", e->destination_concurrent_calls_count_customer, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_destination, "hg_ext_response", e->hg_ext_response, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_destination, "r_user", e->r_user, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_destination, "r_ua", e->r_ua, e)) return -1; if (medmysql_tag_cdr(batch, medmysql_tag_provider_customer, medmysql_tag_direction_source, "header=History-Info", e->source_last_hih, e)) return -1; if (e->mos.filled && batch->mos.def) { if (medmysql_mos_record(&batch->mos.q, batch->num_cdrs, e->mos.avg_score, e->mos.avg_packetloss, e->mos.avg_jitter, e->mos.avg_rtt, e->start_time)) return -1; } if (medmysql_group_record(med_handler->m, &batch->group.q, batch->num_cdrs, e->group, e->start_time)) return -1; batch->num_cdrs++; // no check for return codes here we should keep on nevertheless medmysql_update_call_stat_info(e->call_code->str, e->start_time); if (check_shutdown()) return -1; } /*L_DEBUG("q='%s'", query);*/ return ret; } /**********************************************************************/ int medmysql_delete_intermediate(cdr_entry_t *entries, uint64_t count, struct medmysql_batches *batches) { uint64_t i; for(i = 0; i < count; ++i) { cdr_entry_t *e = &(entries[i]); char esc_callid[e->call_id->len*2+1]; mysql_real_escape_string(int_cdr_handler->m, esc_callid, e->call_id->str, e->call_id->len); if (medmysql_batch_prepare(&batches->int_cdr_delete)) return -1; batches->int_cdr_delete.len += sprintf(batches->int_cdr_delete.str + batches->int_cdr_delete.len, "'%s',", esc_callid); } return 0; } /**********************************************************************/ int medmysql_update_call_stat_info(const char *call_code, const double start_time) { if (!med_call_stat_info_table) return -1; char period[STAT_PERIOD_SIZE]; time_t etime = (time_t)start_time; struct tm etm; char period_key[STAT_PERIOD_SIZE+4]; struct medmysql_call_stat_info_t * period_t; if (!localtime_r(&etime, &etm)) { L_CRITICAL("Cannot get localtime: %s", strerror(errno)); return -1; } switch (config_stats_period) { case MED_STATS_HOUR: strftime(period, STAT_PERIOD_SIZE, "%Y-%m-%d %H:00:00", &etm); break; case MED_STATS_DAY: strftime(period, STAT_PERIOD_SIZE, "%Y-%m-%d 00:00:00", &etm); break; case MED_STATS_MONTH: strftime(period, STAT_PERIOD_SIZE, "%Y-%m-01 00:00:00", &etm); break; default: L_CRITICAL("Undefinied or wrong config_stats_period %d", config_stats_period); return -1; } snprintf(period_key, sizeof(period_key), "%s-%s", period, call_code); if ((period_t = g_hash_table_lookup(med_call_stat_info_table, &period_key)) == NULL) { period_t = malloc(sizeof(struct medmysql_call_stat_info_t)); strcpy(period_t->period, period); g_strlcpy(period_t->call_code, call_code, sizeof(period_t->call_code)); period_t->amount = 1; g_hash_table_insert(med_call_stat_info_table, strdup(period_key), period_t); } else { period_t->amount += 1; } return 0; } /**********************************************************************/ int medmysql_load_maps(GHashTable *ip_table, GHashTable *host_table, GHashTable *id_table) { MYSQL_RES *res; MYSQL_ROW row; int ret = 0; /* char query[1024] = ""; */ /* snprintf(query, sizeof(query), MED_LOAD_PEER_QUERY); */ /* L_DEBUG("q='%s'", query); */ if(medmysql_query_wrapper(prov_handler, MED_LOAD_PEER_QUERY, strlen(MED_LOAD_PEER_QUERY)) != 0) { L_CRITICAL("Error loading peer hosts: %s", mysql_error(prov_handler->m)); return -1; } res = mysql_store_result(prov_handler->m); while((row = mysql_fetch_row(res)) != NULL) { if(row[0] == NULL || row[2] == NULL) { L_CRITICAL("Error loading peer hosts, a mandatory column is NULL"); ret = -1; goto out; } if(ip_table != NULL) { if(g_hash_table_lookup(ip_table, row[0]) != NULL) L_DEBUG("Skipping duplicate IP '%s'", row[0]); else g_hash_table_insert(ip_table, strdup(row[0]), strdup(row[2])); } if(host_table != NULL && row[1] != NULL) // host column is optional { if(g_hash_table_lookup(host_table, row[1]) != NULL) L_DEBUG("Skipping duplicate host '%s'", row[1]); else g_hash_table_insert(host_table, strdup(row[1]), strdup(row[2])); } if (id_table) g_hash_table_insert(id_table, strdup(row[3]), strdup(row[2])); } out: mysql_free_result(res); return ret; } /**********************************************************************/ static char *medmysql_lookup_cache_str(GHashTable *table, const char *key) { med_cache_entry_t *e = g_hash_table_lookup(table, key); if (!e) return NULL; if (time(NULL) - e->created > MED_CACHE_DURATION) { g_hash_table_remove(table, key); return NULL; } L_DEBUG("Returning cached map entry: '%s' -> '%s'", key, e->str_value); return e->str_value; } void medmysql_cache_cleanup(GHashTable *table) { GHashTableIter iter; g_hash_table_iter_init(&iter, table); gpointer value; time_t expiry = time(NULL) - MED_CACHE_DURATION; while (g_hash_table_iter_next(&iter, NULL, &value)) { med_cache_entry_t *e = value; if (e->created > expiry) continue; g_hash_table_iter_remove(&iter); } } static void medmysql_insert_cache_str(GHashTable *table, const char *key, char *val) { med_cache_entry_t *e = g_slice_alloc0(sizeof(*e)); e->created = time(NULL); e->str_value = val; // allocated per g_strdup g_hash_table_replace(table, g_strdup(key), e); } static char *medmysql_select_one(medmysql_handler *mysql, const char *query, size_t query_len) { int err = medmysql_query_wrapper(mysql, query, query_len); if (err != 0) { L_ERROR("Error executing query '%.*s': %s", (int) query_len, query, mysql_error(mysql->m)); return NULL; } char *ret = NULL; MYSQL_RES *res = mysql_store_result(mysql->m); unsigned long long rows = mysql_num_rows(res); if (rows > 0) { if (rows > 1) L_WARNING("More than one row returned from query '%.*s' (%llu rows)", (int) query_len, query, rows); MYSQL_ROW row = mysql_fetch_row(res); if (!row) { L_ERROR("Failed to fetch row from query '%.*s': %s", (int) query_len, query, mysql_error(mysql->m)); } else { const char *val = row[0]; if (!val) L_ERROR("NULL value returned from query '%.*s'", (int) query_len, query); else ret = g_strdup(val); } } mysql_free_result(res); return ret; } char *medmysql_lookup_uuid(const char *uuid) { char *ret = medmysql_lookup_cache_str(med_uuid_cache, uuid); if (ret) return ret; // query DB static const char *query_prefix = "SELECT r.contract_id FROM billing.voip_subscribers vs, " \ "billing.contracts c, billing.contacts ct, billing.resellers r WHERE c.id = vs.contract_id AND " \ "c.contact_id = ct.id AND ct.reseller_id = r.id AND vs.uuid = "; size_t query_len = strlen(query_prefix); // compile-time constant char query[MED_SQL_BUF_LEN(query_len, strlen(uuid))]; strcpy(query, query_prefix); char *buf_ptr = query + query_len; medmysql_buf_escape_c(prov_handler->m, &query_len, uuid, buf_ptr, sizeof(query)); L_DEBUG("UUID lookup with query: '%.*s'\n", (int) query_len, query); ret = medmysql_select_one(prov_handler, query, query_len); if (!ret) return NULL; L_DEBUG("UUID query returned: '%.*s' -> '%s'\n", (int) query_len, query, ret); medmysql_insert_cache_str(med_uuid_cache, uuid, ret); return ret; } int medmysql_load_db_ids() { if (!(medmysql_cdr_auto_increment_value = medmysql_get_num_col(cdr_handler, "select @@auto_increment_increment"))) return -1; if (!(medmysql_tag_provider_customer = medmysql_get_num_col(cdr_handler, "select id from cdr_provider where type = 'customer'"))) return -1; if (!(medmysql_tag_provider_carrier = medmysql_get_num_col(cdr_handler, "select id from cdr_provider where type = 'carrier'"))) return -1; if (!(medmysql_tag_provider_reseller = medmysql_get_num_col(cdr_handler, "select id from cdr_provider where type = 'reseller'"))) return -1; if (!(medmysql_tag_direction_source = medmysql_get_num_col(cdr_handler, "select id from cdr_direction where type = 'source'"))) return -1; if (!(medmysql_tag_direction_destination = medmysql_get_num_col(cdr_handler, "select id from cdr_direction where type = 'destination'"))) return -1; return 0; } int medmysql_load_cdr_tag_ids(GHashTable *cdr_tag_table) { MYSQL_RES *res; MYSQL_ROW row; int ret = 0; /* char query[1024] = ""; */ gpointer key; gpointer tag_id; /* snprintf(query, sizeof(query), MED_LOAD_CDR_TAG_IDS_QUERY); */ /* L_DEBUG("q='%s'", query); */ if(medmysql_query_wrapper(cdr_handler, MED_LOAD_CDR_TAG_IDS_QUERY, strlen(MED_LOAD_CDR_TAG_IDS_QUERY)) != 0) { L_CRITICAL("Error loading cdr tag ids: %s", mysql_error(prov_handler->m)); return -1; } res = mysql_store_result(cdr_handler->m); while((row = mysql_fetch_row(res)) != NULL) { if(row[0] == NULL || row[1] == NULL) { L_CRITICAL("Error loading cdr tag ids, a column is NULL"); ret = -1; goto out; } tag_id = GUINT_TO_POINTER(strtoul(row[0], NULL, 10)); if(tag_id == NULL) { L_CRITICAL("Error allocating cdr tag id memory: %s", strerror(errno)); ret = -1; goto out; } key = (gpointer)g_strdup(row[1]); g_hash_table_insert(cdr_tag_table, key, tag_id); } out: mysql_free_result(res); return ret; } static int medmysql_handler_transaction(medmysql_handler *h) { if (h->is_transaction) return 0; if (medmysql_query_wrapper(h, "start transaction", 17)) return -1; h->is_transaction = 1; return 0; } static int medmysql_handler_commit(medmysql_handler *h) { if (!h->is_transaction) return 0; if (medmysql_query_wrapper(h, "commit", 6)) return -1; h->is_transaction = 0; __g_queue_clear_full(&h->transaction_statements, statement_free); return 0; } static void medmysql_str_init(struct medmysql_str *str, const medmysql_batch_definition *def, struct medmysql_batches *batches, struct medmysql_cdr_batch *batch) { str->len = 0; str->def = def; str->batches = batches; str->cdr_batch = batch; g_queue_init(&str->q); } int medmysql_batch_start(struct medmysql_batches *batches) { if (medmysql_handler_transaction(cdr_handler)) return -1; if (medmysql_handler_transaction(int_cdr_handler)) return -1; if (medmysql_handler_transaction(med_handler)) return -1; if (!med_call_stat_info_table) med_call_stat_info_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); medmysql_str_init(&batches->cdr_batch.cdrs, &medmysql_cdr_def, batches, &batches->cdr_batch); medmysql_str_init(&batches->cdr_batch.tags, &medmysql_tag_def, batches, &batches->cdr_batch); medmysql_str_init(&batches->cdr_batch.mos, &medmysql_mos_def, batches, &batches->cdr_batch); medmysql_str_init(&batches->cdr_batch.group, &medmysql_group_def, batches, &batches->cdr_batch); batches->cdr_batch.num_cdrs = 0; medmysql_str_init(&batches->int_cdr_batch.cdrs, &medmysql_int_cdr_def, batches, &batches->int_cdr_batch); medmysql_str_init(&batches->int_cdr_batch.tags, &medmysql_int_tag_def, batches, &batches->int_cdr_batch); medmysql_str_init(&batches->int_cdr_batch.mos, NULL, batches, &batches->int_cdr_batch); medmysql_str_init(&batches->int_cdr_batch.group, &medmysql_int_group_def, batches, &batches->int_cdr_batch); batches->int_cdr_batch.num_cdrs = 0; medmysql_str_init(&batches->acc_backup, &medmysql_backup_def, batches, NULL); medmysql_str_init(&batches->acc_trash, &medmysql_trash_def, batches, NULL); medmysql_str_init(&batches->to_delete, &medmysql_delete_def, batches, NULL); medmysql_str_init(&batches->int_cdr_delete, &medmysql_del_int_cdr_def, batches, NULL); return 0; } static int medmysql_flush_med_str(struct medmysql_str *str) { const medmysql_batch_definition *def = str->def; if (!def) return 0; if (str->len == 0) return 0; if (str->str[str->len - 1] != ',') return 0; str->len--; str->str[str->len] = '\0'; if (def->sql_finish_string) str->len += sprintf(str->str + str->len, "%s", def->sql_finish_string); L_DEBUG("SQL flush med str\n"); L_DEBUG("SQL: %.*s\n", (int) str->len, str->str); if (medmysql_query_wrapper_tx(*def->handler_ptr, str->str, str->len) != 0) { str->len = 0; L_CRITICAL("Error executing query: %s", mysql_error((*def->handler_ptr)->m)); if (str->len <= 500) L_CRITICAL("Failed query: '%s'", str->str); else L_CRITICAL("Failed query: '%.*s'...", 500, str->str); return -1; } str->len = 0; return 0; } static int medmysql_write_tag_records(struct medmysql_str *str, unsigned long long auto_id) { if (!str->def) return 0; cdr_tag_record *record; GQueue *q = &str->q; while ((record = g_queue_pop_head(q))) { if (medmysql_batch_prepare(str)) return -1; record->cdr_id = auto_id + record->cdr_id * medmysql_cdr_auto_increment_value; str->len += sprintf(str->str + str->len, "(%llu, %s),", record->cdr_id, record->sql_record); free(record->sql_record); free(record); } return 0; } static int medmysql_write_cdr_tags(struct medmysql_cdr_batch *batch, unsigned long long auto_id) { if (medmysql_write_tag_records(&batch->tags, auto_id)) return -1; if (medmysql_write_tag_records(&batch->mos, auto_id)) return -1; if (medmysql_write_tag_records(&batch->group, auto_id)) return -1; return 0; } static int medmysql_write_single_tags(struct medmysql_str *str, unsigned long cdr_id) { const medmysql_batch_definition *def = str->def; if (!def) return 0; int ret = 0; // iterate list, skip/free old entries, process current entries, stop at later entry cdr_tag_record *record; while ((record = g_queue_pop_head(&str->q))) { if (record->cdr_id < cdr_id) goto next; // old entry, skip and free if (record->cdr_id > cdr_id) { // gone past - end of list for now, push back entry for later g_queue_push_head(&str->q, record); return 0; } // process this entry: construct SQL statement char *stmt; if (def->sql_finish_string) stmt = g_strdup_printf("%s(%lu, %s)%s", def->sql_init_string, cdr_id, record->sql_record, def->sql_finish_string); else stmt = g_strdup_printf("%s(%lu, %s)", def->sql_init_string, cdr_id, record->sql_record); L_DEBUG("SQL single tag/group statement\n"); L_DEBUG("SQL: %s\n", stmt); if (medmysql_query_wrapper_tx(*def->handler_ptr, stmt, strlen(stmt))) { L_CRITICAL("Error executing query: %s", mysql_error((*def->handler_ptr)->m)); L_CRITICAL("Failed query: '%s'", stmt); ret = -1; } g_free(stmt); next: free(record->sql_record); free(record); } return ret; } // retry the CDR inserts one by one static int medmysql_flush_single_cdrs(struct medmysql_cdr_batch *batch) { const medmysql_batch_definition *def = batch->cdrs.def; if (!def) return -1; unsigned long failed = 0; single_cdr *single; while ((single = g_queue_pop_head(&batch->cdrs.q))) { // construct single SQL insert statement char *stmt; if (def->sql_finish_string) stmt = g_strdup_printf("%s%.*s%s", def->sql_init_string, (int) (single->end - single->begin) - 1, single->begin, def->sql_finish_string); else stmt = g_strdup_printf("%s%.*s", def->sql_init_string, (int) (single->end - single->begin) - 1, single->begin); unsigned long cdr_id = single->cdr_id; g_slice_free1(sizeof(*single), single); L_DEBUG("SQL single statement\n"); L_DEBUG("SQL: %s\n", stmt); if (medmysql_query_wrapper_tx(*def->handler_ptr, stmt, strlen(stmt)) == 0) { // insert OK unsigned long auto_id = mysql_insert_id((*def->handler_ptr)->m); if (!auto_id) { L_CRITICAL("Received zero auto-ID from SQL"); g_free(stmt); return -1; } if (medmysql_write_single_tags(&batch->tags, cdr_id)) failed++; if (medmysql_write_single_tags(&batch->mos, cdr_id)) failed++; if (medmysql_write_single_tags(&batch->group, cdr_id)) failed++; } else { L_CRITICAL("Error executing query: %s", mysql_error((*def->handler_ptr)->m)); L_CRITICAL("Failed query: '%s'", stmt); failed++; } g_free(stmt); } if (failed) { double failed_pct = (double) failed * 100. / (double) batch->num_cdrs; if (failed_pct > 10.) { L_CRITICAL("Large number of SQL failures during insert (%lu out of %lu, %.1f%%), bailing", failed, batch->num_cdrs, failed_pct); return -1; } L_WARNING("Ignoring %lu failed inserts (out of %lu = %.1f%%), continuing", failed, batch->num_cdrs, failed_pct); } return 0; } static int medmysql_flush_cdr_batch(struct medmysql_cdr_batch *batch) { // TODO: the config option is a bit misleading if (config_dumpcdr && batch->cdrs.len) { FILE *qlog; qlog = fopen("/var/log/ngcp/cdr-query.log", "a"); if(qlog == NULL) { L_CRITICAL("Failed to open cdr query log file '/var/log/ngcp/cdr-query.log': %s", strerror(errno)); return -1; } if(fputs(batch->cdrs.str, qlog) == EOF) { L_CRITICAL("Failed to write to cdr query log file '/var/log/ngcp/cdr-query.log': %s", strerror(errno)); } fclose(qlog); } if (!batch->num_cdrs) return 0; if (medmysql_flush_med_str(&batch->cdrs)) { if (medmysql_flush_single_cdrs(batch)) return -1; } else { unsigned long auto_id = mysql_insert_id((*batch->cdrs.def->handler_ptr)->m); if (!auto_id) { L_CRITICAL("Received zero auto-ID from SQL"); return -1; } single_cdr *single; while ((single = g_queue_pop_head(&batch->cdrs.q))) g_slice_free1(sizeof(*single), single); if (medmysql_write_cdr_tags(batch, auto_id)) return -1; if (medmysql_flush_med_str(&batch->tags)) return -1; if (medmysql_flush_med_str(&batch->mos)) return -1; if (medmysql_flush_med_str(&batch->group)) return -1; } batch->num_cdrs = 0; return 0; } static int medmysql_flush_cdr(struct medmysql_batches *batches) { if (medmysql_flush_med_str(&batches->int_cdr_delete)) return -1; return medmysql_flush_cdr_batch(&batches->cdr_batch); } static int medmysql_flush_int_cdr(struct medmysql_batches *batches) { return medmysql_flush_cdr_batch(&batches->int_cdr_batch); } static int medmysql_flush_medlist(struct medmysql_str *str) { if (medmysql_flush_med_str(str)) { critical("Failed to execute potentially crucial SQL query, check LOG for details"); return -1; } return 0; } static int medmysql_flush_call_stat_info() { if (!stats_handler) return 0; if (!med_call_stat_info_table) return 0; GHashTable * call_stat_info = med_call_stat_info_table; struct medmysql_str query; struct medmysql_call_stat_info_t * period_t; GList * keys = g_hash_table_get_keys(call_stat_info); GList * iter; for (iter = keys; iter != NULL; iter = iter->next) { char * period_key = iter->data; if ((period_t = g_hash_table_lookup(call_stat_info, period_key)) == NULL) { _LOG(LOG_CRIT, "Error dumping call stats info: no data for period_key %s\n", period_key ); return -1; } query.len = sprintf(query.str, "insert into %s.call_info set period='%s', sip_code='%s', amount=%" PRIu64 " on duplicate key update period='%s', sip_code='%s', amount=(amount+%" PRIu64 ");", config_stats_db, period_t->period, period_t->call_code, period_t->amount, period_t->period, period_t->call_code, period_t->amount ); //L_DEBUG("updating call stats info: %s -- %s", period_t->call_code, period_t->period); //L_DEBUG("sql: %s", query.str); if(medmysql_query_wrapper(stats_handler, query.str, query.len) != 0) { L_CRITICAL("Error executing call info stats query: %s", mysql_error(stats_handler->m)); L_CRITICAL("stats query: %s", query.str); critical("Failed to execute potentially crucial SQL query, check LOG for details"); return -1; } period_t->amount = 0; // reset code counter on success } g_list_free(keys); g_list_free(iter); return 0; } int medmysql_batch_end(struct medmysql_batches *batches) { if (medmysql_flush_cdr(batches) || check_shutdown()) return -1; if (medmysql_flush_int_cdr(batches) || check_shutdown()) return -1; if (medmysql_flush_all_med(batches) || check_shutdown()) return -1; if (medmysql_flush_call_stat_info() || check_shutdown()) return -1; if (medmysql_handler_commit(cdr_handler)) return -1; if (medmysql_handler_commit(int_cdr_handler)) return -1; if (medmysql_handler_commit(med_handler)) return -1; return 0; }