/* * Copyright (C) 2011 Stefan Sayer * * This file is part of SEMS, a free SIP media server. * * SEMS is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * For a license to use the sems software under conditions * other than those described here, or to purchase support for this * software, please contact iptel.org by e-mail at the following addresses: * info@iptel.org * * SEMS is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "DBUtils.h" #include "DBRegAgent.h" #include "AmSession.h" #include "AmEventDispatcher.h" #include #include EXPORT_MODULE_FACTORY(DBRegAgent); DEFINE_MODULE_INSTANCE(DBRegAgent, MOD_NAME); mysqlpp::Connection DBRegAgent::MainDBConnection(mysqlpp::use_exceptions); mysqlpp::Connection DBRegAgent::ProcessorDBConnection(mysqlpp::use_exceptions); string DBRegAgent::joined_query_subscribers; string DBRegAgent::joined_query_peerings; string DBRegAgent::registrations_table = "registrations"; double DBRegAgent::reregister_interval = 0.5; double DBRegAgent::minimum_reregister_interval = -1; bool DBRegAgent::enable_ratelimiting = false; unsigned int DBRegAgent::ratelimit_rate = 0; unsigned int DBRegAgent::ratelimit_per = 0; bool DBRegAgent::ratelimit_slowstart = false; bool DBRegAgent::delete_removed_registrations = true; bool DBRegAgent::delete_failed_deregistrations = false; bool DBRegAgent::save_contacts = true; bool DBRegAgent::db_read_contact = false; string DBRegAgent::contact_hostport; bool DBRegAgent::username_with_domain = false; string DBRegAgent::outbound_proxy; bool DBRegAgent::save_auth_replies = false; unsigned int DBRegAgent::error_retry_interval = 300; class RegTimer : public timer { public: void fire() { DBRegAgent::instance()->timer_cb(this); } long object_id; RegistrationActionEvent::RegAction action; regType type; RegTimer() : object_id(0), action(RegistrationActionEvent::Unknown), type(TYPE_UNDEFINED) {} }; DBRegAgent::DBRegAgent(const string& _app_name) : AmDynInvokeFactory(_app_name), AmEventQueue(this), expires(0), uac_auth_i(NULL) { } DBRegAgent::~DBRegAgent() { } int DBRegAgent::onLoad() { DBG("loading db_reg_agent....\n"); AmDynInvokeFactory* uac_auth_f = AmPlugIn::instance()->getFactory4Di("uac_auth"); if (uac_auth_f == NULL) { WARN("unable to get a uac_auth factory. " "registrations will not be authenticated.\n"); WARN("(do you want to load uac_auth module?)\n"); } else { uac_auth_i = uac_auth_f->getInstance(); } AmConfigReader cfg; if(cfg.loadFile(add2path(AmConfig::ModConfigPath,1, MOD_NAME ".conf"))) return -1; expires = cfg.getParameterInt("expires", 7200); DBG("requesting registration expires of %u seconds\n", expires); if (cfg.hasParameter("reregister_interval")) { reregister_interval = -1; reregister_interval = atof(cfg.getParameter("reregister_interval").c_str()); if (reregister_interval <= 0 || reregister_interval > 1) { ERROR("configuration value 'reregister_interval' could not be read. " "needs to be 0 .. 1.0 (recommended: 0.5)\n"); return -1; } } if (cfg.hasParameter("minimum_reregister_interval")) { minimum_reregister_interval = -1; minimum_reregister_interval = atof(cfg.getParameter("minimum_reregister_interval").c_str()); if (minimum_reregister_interval <= 0 || minimum_reregister_interval > 1) { ERROR("configuration value 'minimum_reregister_interval' could not be read. " "needs to be 0 .. reregister_interval (recommended: 0.4)\n"); return -1; } if (minimum_reregister_interval >= reregister_interval) { ERROR("configuration value 'minimum_reregister_interval' must be smaller " "than reregister_interval (recommended: 0.4)\n"); return -1; } } enable_ratelimiting = cfg.getParameter("enable_ratelimiting") == "yes"; if (enable_ratelimiting) { if (!cfg.hasParameter("ratelimit_rate") || !cfg.hasParameter("ratelimit_per")) { ERROR("if ratelimiting is enabled, ratelimit_rate and ratelimit_per must be set\n"); return -1; } ratelimit_rate = cfg.getParameterInt("ratelimit_rate", 0); ratelimit_per = cfg.getParameterInt("ratelimit_per", 0); if (!ratelimit_rate || !ratelimit_per) { ERROR("ratelimit_rate and ratelimit_per must be > 0\n"); return -1; } ratelimit_slowstart = cfg.getParameter("ratelimit_slowstart") == "yes"; } delete_removed_registrations = cfg.getParameter("delete_removed_registrations", "yes") == "yes"; delete_failed_deregistrations = cfg.getParameter("delete_failed_deregistrations", "no") == "yes"; save_contacts = cfg.getParameter("save_contacts", "yes") == "yes"; db_read_contact = cfg.getParameter("db_read_contact", "no") == "yes"; username_with_domain = cfg.getParameter("username_with_domain", "no") == "yes"; save_auth_replies = cfg.getParameter("save_auth_replies", "no") == "yes"; contact_hostport = cfg.getParameter("contact_hostport"); outbound_proxy = cfg.getParameter("outbound_proxy"); error_retry_interval = cfg.getParameterInt("error_retry_interval", 300); if (!error_retry_interval) { WARN("disabled retry on errors!\n"); } string mysql_server, mysql_user, mysql_passwd, mysql_db; mysql_server = cfg.getParameter("mysql_server", "localhost"); mysql_user = cfg.getParameter("mysql_user"); if (mysql_user.empty()) { ERROR(MOD_NAME ".conf parameter 'mysql_user' is missing.\n"); return -1; } mysql_passwd = cfg.getParameter("mysql_passwd"); if (mysql_passwd.empty()) { ERROR(MOD_NAME ".conf parameter 'mysql_passwd' is missing.\n"); return -1; } mysql_db = cfg.getParameter("mysql_db", "sems"); try { std::initializer_list args = { dbReconnectOption, // if present = dbReconnectOption(true) dbConnectTimeoutOption(5), // seconds dbReadTimeoutOption(5), // seconds dbWriteTimeoutOption(5), // seconds dbFoundRowsOption // if present = dbFoundRowsOption(true) }; apply_mysql_options(MainDBConnection, args); MainDBConnection.connect(mysql_db.c_str(), mysql_server.c_str(), mysql_user.c_str(), mysql_passwd.c_str()); if (!MainDBConnection) { ERROR("Database connection failed: %s\n", MainDBConnection.error()); return -1; } apply_mysql_options(ProcessorDBConnection, args); ProcessorDBConnection.connect(mysql_db.c_str(), mysql_server.c_str(), mysql_user.c_str(), mysql_passwd.c_str()); if (!ProcessorDBConnection) { ERROR("Database connection failed: %s\n", ProcessorDBConnection.error()); return -1; } } catch (const mysqlpp::Exception& er) { // Catch-all for any MySQL++ exceptions ERROR("MySQL++ error: %s\n", er.what()); return -1; } // register us as SIP event receiver for MOD_NAME AmEventDispatcher::instance()->addEventQueue(MOD_NAME,this); if (!AmPlugIn::registerDIInterface(MOD_NAME, this)) { ERROR("registering %s DI interface\n", MOD_NAME); return -1; } joined_query_subscribers = cfg.getParameter("joined_query_subscribers"); joined_query_peerings = cfg.getParameter("joined_query_peerings"); if (joined_query_subscribers.empty() || joined_query_peerings.empty()) { // todo: name! ERROR("joined_query must be set\n"); return -1; } if (cfg.hasParameter("registrations_table")) { registrations_table = cfg.getParameter("registrations_table"); } DBG("using registrations table '%s'\n", registrations_table.c_str()); if (!loadRegistrations()) { ERROR("REGISTER: loading registrations for subscribers from DB\n"); return -1; } if (!loadRegistrationsPeerings()) { ERROR("REGISTER: loading registrations for peerings from DB\n"); return -1; } DBG("starting registration timer thread...\n"); registration_scheduler.start(); // run_tests(); start(); return 0; } void DBRegAgent::onUnload() { stop(); join(); registration_scheduler.stop(); DBG("unclean shutdown. Waiting for processing thread to stop.\n"); registration_scheduler.join(); DBG("closing main DB connection\n"); MainDBConnection.disconnect(); DBG("closing auxiliary DB connection\n"); ProcessorDBConnection.disconnect(); } bool DBRegAgent::loadRegistrations() { try { time_t now_time = time(NULL); mysqlpp::Query query_sb = DBRegAgent::MainDBConnection.query(); string query_string_subscribers, table; query_string_subscribers = joined_query_subscribers; DBG("REGISTER: querying all registrations for subscribers with : '%s'\n", query_string_subscribers.c_str()); query_sb << query_string_subscribers; mysqlpp::UseQueryResult res_sb = query_sb.use(); while (mysqlpp::Row row = res_sb.fetch_row()) { int status = 0; regType type = TYPE_SUBSCRIBER; long object_id = row[COLNAME_SUBSCRIBER_ID]; if (object_id == 0) { WARN("REGISTER: object_id is 0 for this subscriber, skipping..\n"); continue; } DBG("REGISTER: Triggering for subscriber with object_id=<%ld>\n", object_id); string contact_uri; if (db_read_contact && row[COLNAME_CONTACT] != mysqlpp::null) { contact_uri = (string) row[COLNAME_CONTACT]; } if (row[COLNAME_STATUS] != mysqlpp::null) status = row[COLNAME_STATUS]; else { DBG("registration status entry for id %ld does not exist, creating...\n", object_id); createDBRegistration(object_id, type, ProcessorDBConnection); } DBG("got subscriber '%s@%s' status %i\n", string(row[COLNAME_USER]).c_str(), string(row[COLNAME_REALM]).c_str(), status); switch (status) { case REG_STATUS_INACTIVE: case REG_STATUS_PENDING: // try again case REG_STATUS_FAILED: // try again { createRegistration(object_id, (string)row[COLNAME_AUTH_USER], (string)row[COLNAME_USER], (string)row[COLNAME_PASS], (string)row[COLNAME_REALM], contact_uri, type); scheduleRegistration(object_id, type); }; break; case REG_STATUS_ACTIVE: { createRegistration(object_id, (string)row[COLNAME_AUTH_USER], (string)row[COLNAME_USER], (string)row[COLNAME_PASS], (string)row[COLNAME_REALM], contact_uri, type); time_t dt_expiry = now_time; if (row[COLNAME_EXPIRY] != mysqlpp::null) { dt_expiry = (time_t)((mysqlpp::DateTime)row[COLNAME_EXPIRY]); } time_t dt_registration_ts = now_time; if (row[COLNAME_REGISTRATION_TS] != mysqlpp::null) { dt_registration_ts = (time_t)((mysqlpp::DateTime)row[COLNAME_REGISTRATION_TS]); } DBG("got expiry '%ld, registration_ts %ld, now %ld'\n", dt_expiry, dt_registration_ts, now_time); if (dt_registration_ts > now_time) { WARN("needed to sanitize last_registration timestamp TS from the %ld (now %ld) - " "DB host time mismatch?\n", dt_registration_ts, now_time); dt_registration_ts = now_time; } // if expired add to pending registrations, else schedule re-regstration if (dt_expiry <= now_time) { DBG("scheduling imminent re-registration for subscriber %ld\n", object_id); scheduleRegistration(object_id, type); } else { setRegistrationTimer(object_id, dt_expiry, dt_registration_ts, now_time, type); } }; break; case REG_STATUS_REMOVED: { DBG("ignoring removed registration %ld %s@%s of type: %s\n", object_id, ((string)row[COLNAME_USER]).c_str(), ((string)row[COLNAME_REALM]).c_str(), TYPE_TO_STRING(type)); } break; case REG_STATUS_TO_BE_REMOVED: { DBG("Scheduling Deregister of registration %ld %s@%s of type: %s\n", object_id, ((string)row[COLNAME_USER]).c_str(), ((string)row[COLNAME_REALM]).c_str(), TYPE_TO_STRING(type)); createRegistration(object_id, (string)row[COLNAME_AUTH_USER], (string)row[COLNAME_USER], (string)row[COLNAME_PASS], (string)row[COLNAME_REALM], contact_uri, type); scheduleDeregistration(object_id, type); }; } } } catch (const mysqlpp::Exception& er) { // Catch-all for any MySQL++ exceptions ERROR("MySQL++ error: %s\n", er.what()); return false; } return true; } bool DBRegAgent::loadRegistrationsPeerings() { try { time_t now_time = time(NULL); mysqlpp::Query query_pr = DBRegAgent::MainDBConnection.query(); string query_string_peerings, table; query_string_peerings = joined_query_peerings; DBG("REGISTER: querying all registrations for peerings with : '%s'\n", query_string_peerings.c_str()); query_pr << query_string_peerings; mysqlpp::UseQueryResult res_pr = query_pr.use(); while (mysqlpp::Row row = res_pr.fetch_row()) { int status = 0; regType type = TYPE_PEERING; long object_id = row[COLNAME_PEER_ID]; if (object_id == 0) { WARN("REGISTER: object_id is NULL or 0 for this peering, skipping..\n"); continue; } DBG("REGISTER: Triggering for peering with object_id=<%ld>\n", object_id); string contact_uri; if (db_read_contact && row[COLNAME_CONTACT] != mysqlpp::null) { contact_uri = (string) row[COLNAME_CONTACT]; } if (row[COLNAME_STATUS] != mysqlpp::null) status = row[COLNAME_STATUS]; else { DBG("registration status entry for id %ld does not exist, creating...\n", object_id); createDBRegistration(object_id, type, ProcessorDBConnection); } DBG("got subscriber '%s@%s' status %i\n", string(row[COLNAME_USER]).c_str(), string(row[COLNAME_REALM]).c_str(), status); switch (status) { case REG_STATUS_INACTIVE: case REG_STATUS_PENDING: // try again case REG_STATUS_FAILED: // try again { createRegistration(object_id, (string)row[COLNAME_AUTH_USER], (string)row[COLNAME_USER], (string)row[COLNAME_PASS], (string)row[COLNAME_REALM], contact_uri, type); scheduleRegistration(object_id, type); }; break; case REG_STATUS_ACTIVE: { createRegistration(object_id, (string)row[COLNAME_AUTH_USER], (string)row[COLNAME_USER], (string)row[COLNAME_PASS], (string)row[COLNAME_REALM], contact_uri, type); time_t dt_expiry = now_time; if (row[COLNAME_EXPIRY] != mysqlpp::null) { dt_expiry = (time_t)((mysqlpp::DateTime)row[COLNAME_EXPIRY]); } time_t dt_registration_ts = now_time; if (row[COLNAME_REGISTRATION_TS] != mysqlpp::null) { dt_registration_ts = (time_t)((mysqlpp::DateTime)row[COLNAME_REGISTRATION_TS]); } DBG("got expiry '%ld, registration_ts %ld, now %ld'\n", dt_expiry, dt_registration_ts, now_time); if (dt_registration_ts > now_time) { WARN("needed to sanitize last_registration timestamp TS from the %ld (now %ld) - " "DB host time mismatch?\n", dt_registration_ts, now_time); dt_registration_ts = now_time; } // if expired add to pending registrations, else schedule re-regstration if (dt_expiry <= now_time) { DBG("scheduling imminent re-registration for subscriber %ld\n", object_id); scheduleRegistration(object_id, type); } else { setRegistrationTimer(object_id, dt_expiry, dt_registration_ts, now_time, type); } }; break; case REG_STATUS_REMOVED: { DBG("ignoring removed registration %ld %s@%s of type: %s\n", object_id, ((string)row[COLNAME_USER]).c_str(), ((string)row[COLNAME_REALM]).c_str(), TYPE_TO_STRING(type)); } break; case REG_STATUS_TO_BE_REMOVED: { DBG("Scheduling Deregister of registration %ld %s@%s of type: %s\n", object_id, ((string)row[COLNAME_USER]).c_str(), ((string)row[COLNAME_REALM]).c_str(), TYPE_TO_STRING(type)); createRegistration(object_id, (string)row[COLNAME_AUTH_USER], (string)row[COLNAME_USER], (string)row[COLNAME_PASS], (string)row[COLNAME_REALM], contact_uri, type); scheduleDeregistration(object_id, type); }; } } } catch (const mysqlpp::Exception& er) { // Catch-all for any MySQL++ exceptions ERROR("MySQL++ error: %s\n", er.what()); return false; } return true; } /** * Must only be used with registrations_mut lock held. */ void DBRegAgent::handleRegistrationTimer(long object_id, const regType type) { WARN("Registration for %s with ID %ld already exists, removing.\n", TYPE_TO_STRING(type), object_id); removeRegistration(object_id, type); clearRegistrationTimer(object_id, type); } /** * Must only be used with registrations_mut lock held. */ void DBRegAgent::handleRegistrationScheduling(long object_id, const std::string &auth_user, const std::string &user, const std::string &pass, const std::string &realm, const std::string &contact, const regType type) { registrations_mut.unlock(); WARN("updateRegistration - registration %ld %s@%s unknown, creating. Type: %s.\n", object_id, user.c_str(), realm.c_str(), TYPE_TO_STRING(type)); createRegistration(object_id, auth_user, user, pass, realm, contact, type); scheduleRegistration(object_id, type); } /** create registration in our list */ void DBRegAgent::createRegistration(long object_id, const string& auth_user, const string& user, const string& pass, const string& realm, const string& contact, const regType type) { string auth_user_temp = (auth_user.empty() || auth_user == "" || auth_user == "NULL") ? user : auth_user; DBG("REGISTER: authentication user picked out: <%s> \n", auth_user_temp.c_str()); string _user = user; if (username_with_domain && user.find('@')!=string::npos) { _user = user.substr(0, user.find('@')); } string contact_uri = contact; if (contact_uri.empty() && !contact_hostport.empty()) { contact_uri = "sip:"+ _user + "@" + contact_hostport; } string handle = AmSession::getNewId(); SIPRegistrationInfo reg_info(realm, _user, _user, // name auth_user_temp, // auth_user pass, outbound_proxy, // proxy contact_uri // contact ); DBG(" >>> realm '%s', user '%s', auth_user '%s', pass '%s', outbound_proxy '%s', contact_uri '%s', type '%s'\n", realm.c_str(), user.c_str(), auth_user.c_str(), pass.c_str(), outbound_proxy.c_str(), contact_uri.c_str(), TYPE_TO_STRING(type)); registrations_mut.lock(); try { /* remove already existing registration for a peering */ if (type == TYPE_PEERING && registrations_peers.find(object_id) != registrations_peers.end()) { handleRegistrationTimer(object_id, type); /* remove already existing for a usual subscriber */ } else if (type != TYPE_PEERING && registrations.find(object_id) != registrations.end()) { handleRegistrationTimer(object_id, type); } AmSIPRegistration* reg = new AmSIPRegistration(handle, reg_info, "" /*MOD_NAME*/); // a simple fix in case expires is for some reason 0. Not the best solution. //if (expires == 0) expires = 60; reg->setExpiresInterval(expires); if (type == TYPE_PEERING) { registrations_peers[object_id] = reg; registration_ltags_peers[handle] = object_id; } else { registrations[object_id] = reg; registration_ltags[handle] = object_id; } if (NULL != uac_auth_i) { DBG("REGISTER: Enabling UAC Auth for new registration of type: <%s>\n", TYPE_TO_STRING(type)); // get a sessionEventHandler from uac_auth AmArg di_args, ret; AmArg a; a.setBorrowedPointer(reg); di_args.push(a); di_args.push(a); uac_auth_i->invoke("getHandler", di_args, ret); if (!ret.size()) { ERROR("Can not add auth handler to new registration!\n"); } else { AmObject* p = ret.get(0).asObject(); if (p != NULL) { AmSessionEventHandler* h = dynamic_cast(p); if (h != NULL) reg->setSessionEventHandler(h); } } } } catch (const std::exception& e) { ERROR("%s", e.what()); } catch (...) { ERROR("unknown exception occured\n"); } registrations_mut.unlock(); // register us as SIP event receiver for this ltag AmEventDispatcher::instance()->addEventQueue(handle,this); DBG("created new registration with ID <%ld>, ltag '%s' and type '%s'\n", object_id, handle.c_str(), TYPE_TO_STRING(type)); } void DBRegAgent::updateRegistration(long object_id, const string& auth_user, const string& user, const string& pass, const string& realm, const string& contact, const regType type) { string auth_user_temp = (auth_user.empty() || auth_user == "" || auth_user == "NULL") ? user : auth_user; DBG("REGISTER: authentication user picked out: <%s> \n", auth_user_temp.c_str()); string _user = user; if (username_with_domain && user.find('@')!=string::npos) { _user = user.substr(0, user.find('@')); } registrations_mut.lock(); map::iterator it = (type == TYPE_PEERING ? registrations_peers.find(object_id) : registrations.find(object_id)); /* handle peerings */ if (type == TYPE_PEERING && it == registrations_peers.end()) { handleRegistrationScheduling(object_id, auth_user_temp, user, pass, realm, contact, type); /* unlocks */ return; /* handle subscribers */ } else if (type != TYPE_PEERING && it == registrations.end()) { handleRegistrationScheduling(object_id, auth_user_temp, user, pass, realm, contact, type); /* unlocks */ return; } bool need_reregister = it->second->getInfo().domain != realm || it->second->getInfo().auth_user != auth_user_temp || it->second->getInfo().user != _user || it->second->getInfo().pwd != pass || it->second->getInfo().contact != contact; string old_realm = it->second->getInfo().domain; string old_user = it->second->getInfo().user; string old_auth_user = it->second->getInfo().auth_user; it->second->setRegistrationInfo(SIPRegistrationInfo(realm, _user, _user, // name auth_user_temp, // auth_user pass, outbound_proxy, // proxy contact)); // contact registrations_mut.unlock(); if (need_reregister) { DBG("user/realm for registration %ld changed (%s@%s -> %s@%s). Auth user (%s -> %s)." "Triggering immediate re-registration\n", object_id, old_user.c_str(), old_realm.c_str(), user.c_str(), realm.c_str(), old_auth_user.c_str(), auth_user_temp.c_str()); scheduleRegistration(object_id, type); } } /** * Remove registration from our list. * Must only be used with registrations_mut lock held. */ void DBRegAgent::removeRegistration(long object_id, const regType type) { bool res = false; string handle; map::iterator it = (type == TYPE_PEERING ? registrations_peers.find(object_id) : registrations.find(object_id)); /* remove reg for peerings */ if (type == TYPE_PEERING && it != registrations_peers.end()) { if (!it->second) { WARN("Something went wrong while removing '%ld'\n", object_id); return; } handle = it->second->getHandle(); if (!handle.empty()) registration_ltags_peers.erase(handle); delete it->second; registrations_peers.erase(it); res = true; /* remove reg for subscribers */ } else if (type != TYPE_PEERING && it != registrations.end()) { if (!it->second) { WARN("Something went wrong while removing '%ld'\n", object_id); return; } handle = it->second->getHandle(); if (!handle.empty()) registration_ltags.erase(handle); delete it->second; registrations.erase(it); res = true; } if (res) { // deregister us as SIP event receiver for this ltag AmEventDispatcher::instance()->delEventQueue(handle); DBG("removed registration with ID %ld, type: %s \n", object_id, TYPE_TO_STRING(type)); } else { DBG("registration with ID %ld not found for removing, type: %s \n", object_id, TYPE_TO_STRING(type)); } } /** schedule this registration to REGISTER (immediately) */ void DBRegAgent::scheduleRegistration(long object_id, const regType type) { if (enable_ratelimiting) { registration_processor. postEvent(new RegistrationActionEvent(RegistrationActionEvent::Register, object_id, type)); } else { // use our own thread postEvent(new RegistrationActionEvent(RegistrationActionEvent::Register, object_id, type)); } DBG("Added to pending actions: REGISTER of %ld, type: %s\n", object_id, TYPE_TO_STRING(type)); } /** schedule this registration to de-REGISTER (immediately) */ void DBRegAgent::scheduleDeregistration(long object_id, const regType type) { if (enable_ratelimiting) { registration_processor. postEvent(new RegistrationActionEvent(RegistrationActionEvent::Deregister, object_id, type)); } else { // use our own thread postEvent(new RegistrationActionEvent(RegistrationActionEvent::Deregister, object_id, type)); } DBG("added to pending actions: DEREGISTER of %ld, type: %s\n", object_id, TYPE_TO_STRING(type)); } void DBRegAgent::process(AmEvent* ev) { if (ev->event_id == RegistrationActionEventID) { RegistrationActionEvent* reg_action_ev = dynamic_cast(ev); if (reg_action_ev) { onRegistrationActionEvent(reg_action_ev); return; } } AmSipReplyEvent* sip_rep = dynamic_cast(ev); if (sip_rep) { onSipReplyEvent(sip_rep); return; } if (ev->event_id == E_SYSTEM) { AmSystemEvent* sys_ev = dynamic_cast(ev); if(sys_ev){ DBG("Session received system Event\n"); if (sys_ev->sys_event == AmSystemEvent::ServerShutdown) { stop(); registration_scheduler.stop(); registration_scheduler.join(); } return; } } ERROR("unknown event received!\n"); } // uses ProcessorDBConnection void DBRegAgent::onRegistrationActionEvent(RegistrationActionEvent* reg_action_ev) { switch (reg_action_ev->action) { case RegistrationActionEvent::Register: { DBG("REGISTER of registration %ld, type: %s\n", reg_action_ev->object_id, TYPE_TO_STRING(reg_action_ev->type)); registrations_mut.lock(); map::iterator it = (reg_action_ev->type == TYPE_PEERING ? registrations_peers.find(reg_action_ev->object_id) : registrations.find(reg_action_ev->object_id)); bool marker = true; /* handle peerings */ if (reg_action_ev->type == TYPE_PEERING && it == registrations_peers.end()) { DBG("ignoring scheduled REGISTER of unknown registration %ld\n", reg_action_ev->object_id); marker = false; /* handle subscribers */ } else if (reg_action_ev->type != TYPE_PEERING && it == registrations.end()) { DBG("ignoring scheduled REGISTER of unknown registration %ld\n", reg_action_ev->object_id); marker = false; } if (marker) { if (!it->second->doRegistration()) { updateDBRegistration(ProcessorDBConnection, reg_action_ev->object_id, reg_action_ev->type, 480, ERR_REASON_UNABLE_TO_SEND_REQUEST, true, REG_STATUS_FAILED); if (error_retry_interval) { // schedule register-refresh after error_retry_interval setRegistrationTimer(reg_action_ev->object_id, error_retry_interval, RegistrationActionEvent::Register, reg_action_ev->type); } } } registrations_mut.unlock(); } break; case RegistrationActionEvent::Deregister: { DBG("De-REGISTER of registration %ld, type: %s\n", reg_action_ev->object_id, TYPE_TO_STRING(reg_action_ev->type)); registrations_mut.lock(); map::iterator it = (reg_action_ev->type == TYPE_PEERING ? registrations_peers.find(reg_action_ev->object_id) : registrations.find(reg_action_ev->object_id)); bool marker = true; /* handle peerings */ if (reg_action_ev->type == TYPE_PEERING && it == registrations_peers.end()) { DBG("ignoring scheduled De-REGISTER of unknown registration %ld\n", reg_action_ev->object_id); marker = false; /* handle subscribers */ } else if (reg_action_ev->type != TYPE_PEERING && it == registrations.end()) { DBG("ignoring scheduled De-REGISTER of unknown registration %ld\n", reg_action_ev->object_id); marker = false; } if (marker) { if (!it->second->doUnregister()) { if (delete_removed_registrations && delete_failed_deregistrations) { DBG("sending de-Register failed - deleting registration %ld " "(delete_failed_deregistrations=yes)\n", reg_action_ev->object_id); deleteDBRegistration(reg_action_ev->object_id, reg_action_ev->type, ProcessorDBConnection); } else { DBG("failed sending de-register, updating DB with REG_STATUS_TO_BE_REMOVED " ERR_REASON_UNABLE_TO_SEND_REQUEST "for subscriber %ld\n", reg_action_ev->object_id); updateDBRegistration(ProcessorDBConnection, reg_action_ev->object_id, reg_action_ev->type, 480, ERR_REASON_UNABLE_TO_SEND_REQUEST, true, REG_STATUS_TO_BE_REMOVED); } } } registrations_mut.unlock(); } break; case RegistrationActionEvent::Unknown: { DBG("Unknown action, cannot handle this in correct way."); } break; } } void DBRegAgent::createDBRegistration(long object_id, const regType type, mysqlpp::Connection& conn) { string column_id = COLNAME_SUBSCRIBER_ID; if (type == TYPE_PEERING) { column_id = COLNAME_PEER_ID; } // depending on if that is a registration for a subscriber or for a peering // do a mysql insertion string insert_query = "insert into "+registrations_table+ " (" + column_id.c_str() + ")" + "values ("+ long2str(object_id)+");"; DBG("MYSQL: trying to execute: <%s>\n", insert_query.c_str()); try { mysqlpp::Query query = conn.query(); query << insert_query; mysqlpp::SimpleResult res = query.execute(); if (!res) { WARN("creating registration in DB with query '%s' failed: '%s', type: %s\n", insert_query.c_str(), res.info(), TYPE_TO_STRING(type)); } } catch (const mysqlpp::Exception& er) { // Catch-all for any MySQL++ exceptions ERROR("MySQL++ error: %s\n", er.what()); return; } } void DBRegAgent::deleteDBRegistration(long object_id, const regType type, mysqlpp::Connection& conn) { string column_id = COLNAME_SUBSCRIBER_ID; if (type == TYPE_PEERING) { column_id = COLNAME_PEER_ID; } // depending on if that is a de-registration for a subscriber or for a peering // do a mysql deletion string insert_query = "delete from "+registrations_table+ " where " + column_id.c_str() + "=" + long2str(object_id) + ";"; try { mysqlpp::Query query = conn.query(); query << insert_query; mysqlpp::SimpleResult res = query.execute(); if (!res) { WARN("removing registration in DB with query '%s' failed: '%s', type: %s\n", insert_query.c_str(), res.info(), TYPE_TO_STRING(type)); } } catch (const mysqlpp::Exception& er) { // Catch-all for any MySQL++ exceptions ERROR("MySQL++ error: %s\n", er.what()); return; } } void DBRegAgent::updateDBRegistration(mysqlpp::Connection& db_connection, long object_id, const regType type, int last_code, const string& last_reason, bool update_status, int status, bool update_ts, unsigned int expiry, bool update_contacts, const string& contacts) { try { mysqlpp::Query query = db_connection.query(); query << "update "+registrations_table+" set last_code="+ int2str(last_code) +", "; query << "last_reason="; query << mysqlpp::quote << last_reason; if (update_status) { query << ", registration_status="+int2str(status); } if (update_ts) { query << ", last_registration=NOW(), expiry=TIMESTAMPADD(SECOND," + int2str(expiry) + ", NOW())"; } if (update_contacts) { query << ", contacts=" << mysqlpp::quote << contacts; } // depending on if that is an update for a subscriber or for a peering // do a mysql update if (type == TYPE_SUBSCRIBER) query << " where " COLNAME_SUBSCRIBER_ID "="+long2str(object_id) + ";"; else query << " where " COLNAME_PEER_ID "="+long2str(object_id) + ";"; string query_str = query.str(); DBG("updating registration in DB with query '%s'\n", query_str.c_str()); mysqlpp::SimpleResult res = query.execute(); if (!res) { WARN("updating registration in DB with query '%s' failed: '%s'\n", query_str.c_str(), res.info()); } else { if (!res.rows()) { // should not happen - DB entry is created on load or on createRegistration DBG("creating registration DB entry for subscriber %ld, type: %s\n", object_id, TYPE_TO_STRING(type)); createDBRegistration(object_id, type, db_connection); query.reset(); query << query_str; mysqlpp::SimpleResult res = query.execute(); if (!res || !res.rows()) { WARN("updating registration in DB with query '%s' failed: '%s'\n", query_str.c_str(), res.info()); } } } } catch (const mysqlpp::Exception& er) { // Catch-all for any MySQL++ exceptions ERROR("MySQL++ error: %s\n", er.what()); return; } } // uses MainDBConnection void DBRegAgent::onSipReplyEvent(AmSipReplyEvent* ev) { if (!ev) return; DBG("received SIP reply event for '%s'\n", #ifdef HAS_OFFER_ANSWER ev->reply.from_tag.c_str() #else ev->reply.local_tag.c_str() #endif ); std::lock_guard _l(registrations_mut); string local_tag = #ifdef HAS_OFFER_ANSWER ev->reply.from_tag; #else ev->reply.local_tag; #endif map::iterator it; bool marker = false; regType type; // not the best solution to match coming reply against needed object // we need to find a way how to better differentiate, // if that is related to a peering type or subscriber type // now a basic attempt to first look into subscribers cache buckets then into // peerings cache buckets (can this happen that ltag will overlap? must be unique) // first try to find a registration object in a cache for subscribers (most common case) it=registration_ltags.find(local_tag); if (it!=registration_ltags.end()) { marker=true; type = TYPE_SUBSCRIBER; // secondly, if we didn't find anything before, try in a cache for peerings } else { it=registration_ltags_peers.find(local_tag); if (it!=registration_ltags_peers.end()) { marker=true; type = TYPE_PEERING; } } if (marker) { long object_id = it->second; map::iterator r_it; marker = false; if (type == TYPE_SUBSCRIBER) { // find registration for peering r_it=registrations.find(object_id); if (r_it != registrations.end()) marker = true; } else { // find registration for subscriber r_it=registrations_peers.find(object_id); if (r_it != registrations_peers.end()) marker = true; } if (marker) { AmSIPRegistration* registration = r_it->second; if (!registration) { ERROR("Internal error: registration object missing, type: %s\n", TYPE_TO_STRING(type)); return; } unsigned int cseq_before = registration->getDlg()->cseq; #ifdef HAS_OFFER_ANSWER registration->getDlg()->onRxReply(ev->reply); #else registration->getDlg()->updateStatus(ev->reply); #endif //update registrations set bool update_status = false; int status = 0; bool update_ts = false; unsigned int expiry = 0; bool delete_status = false; bool auth_pending = false; if (ev->reply.code >= 300) { // REGISTER or de-REGISTER failed if ((ev->reply.code == 401 || ev->reply.code == 407) && // auth response codes // processing reply triggered sending request: resent by auth (cseq_before != registration->getDlg()->cseq)) { DBG("received negative reply, but still in pending state (auth).\n"); auth_pending = true; } else { if (!registration->getUnregistering()) { // REGISTER failed - mark in DB DBG("registration failed - mark in DB\n"); update_status = true; status = REG_STATUS_FAILED; if (error_retry_interval) { // schedule register-refresh after error_retry_interval setRegistrationTimer(object_id, error_retry_interval, RegistrationActionEvent::Register, type); } } else { // de-REGISTER failed if (delete_removed_registrations && delete_failed_deregistrations) { DBG("de-Register failed - deleting registration %ld " "(delete_failed_deregistrations=yes)\n", object_id); delete_status = true; } else { update_status = true; status = REG_STATUS_TO_BE_REMOVED; } } } } else if (ev->reply.code >= 200) { // positive reply if (!registration->getUnregistering()) { time_t now_time = time(0); setRegistrationTimer(object_id, registration->getExpiresTS(), now_time, now_time, type); update_status = true; status = REG_STATUS_ACTIVE; update_ts = true; expiry = registration->getExpiresLeft(); } else { if (delete_removed_registrations) { delete_status = true; } else { update_status = true; status = REG_STATUS_REMOVED; } } } // skip provisional replies & auth if (ev->reply.code >= 200 && !auth_pending) { // remove unregistered if (registration->getUnregistering()) { removeRegistration(object_id, type); } } if (!delete_status) { if (auth_pending && !save_auth_replies) { DBG("not updating DB with auth reply %u %s\n", ev->reply.code, ev->reply.reason.c_str()); } else { DBG("update DB with reply %u %s\n", ev->reply.code, ev->reply.reason.c_str()); updateDBRegistration(MainDBConnection, object_id, type, ev->reply.code, ev->reply.reason, update_status, status, update_ts, expiry, save_contacts, ev->reply.contact); } } else { DBG("delete DB registration of subscriber %ld\n", object_id); deleteDBRegistration(object_id, type, MainDBConnection); } } else { ERROR("internal: inconsistent registration list\n"); } } else { DBG("ignoring reply for unknown registration\n"); } } void DBRegAgent::run() { DBG("DBRegAgent thread: waiting 2 sec for server startup ...\n"); sleep(2); mysqlpp::Connection::thread_start(); if (enable_ratelimiting) { DBG("starting processor thread\n"); registration_processor.start(); } DBG("running DBRegAgent thread...\n"); while (!stop_requested() || eventPending()) { waitForEventTimed(500); // 500 milliseconds processEvents(); } DBG("DBRegAgent done, removing all registrations from Event Dispatcher for peerings...\n"); registrations_mut.lock(); for (map::iterator it=registration_ltags_peers.begin(); it != registration_ltags_peers.end(); it++) { AmEventDispatcher::instance()->delEventQueue(it->first); } registrations_mut.unlock(); DBG("removing %s registrations from Event Dispatcher...\n", MOD_NAME); AmEventDispatcher::instance()->delEventQueue(MOD_NAME); mysqlpp::Connection::thread_end(); DBG("DBRegAgent thread stopped.\n"); } void DBRegAgent::on_stop() { DBG("DBRegAgent on_stop(): removing %s registrations from Event Dispatcher...\n", MOD_NAME); AmEventDispatcher::instance()->delEventQueue(MOD_NAME); } void DBRegAgent::setRegistrationTimer(long object_id, uint64_t timeout, RegistrationActionEvent::RegAction reg_action, const regType type) { DBG("setting Register timer for subscription %ld, timeout %" PRIu64 ", reg_action %u\n", object_id, timeout, reg_action); RegTimer* timer = NULL; map::iterator it = (type == TYPE_PEERING ? registration_timers_peers.find(object_id) : registration_timers.find(object_id)); bool marker = false; /* handle peerings */ if (type == TYPE_PEERING && it == registration_timers_peers.end()) { marker = true; /* handle subscribers */ } else if (type != TYPE_PEERING && it == registration_timers.end()) { marker = true; } if (marker) { DBG("timer object for subscription %ld not found, type: %s\n", object_id, TYPE_TO_STRING(type)); timer = new RegTimer(); timer->object_id = object_id; timer->type = type; // 'peering' or 'subscriber' DBG("created timer object [%p] for subscription %ld, type: %s\n", timer, object_id, TYPE_TO_STRING(type)); } else { if (it->second) { timer = it->second; DBG("removing scheduled timer...\n"); registration_scheduler.remove_timer(timer, false); } else { WARN("Failed to get existing timer for removing.\n"); return; } } timer->action = reg_action; DBG("placing timer for %ld in T-%" PRIu64 ", type: %s\n", object_id, timeout, TYPE_TO_STRING(type)); registration_scheduler.insert_timer(timer, timeout * 1000000); if (type == TYPE_PEERING) { registration_timers_peers.insert(std::make_pair(object_id, timer)); } else { registration_timers.insert(std::make_pair(object_id, timer)); } } void DBRegAgent::setRegistrationTimer(long object_id, uint64_t expiry, uint64_t reg_start_ts, uint64_t now_time, const regType type) { DBG("setting re-Register timer for subscription %ld, expiry %ld, reg_start_t %ld, type: %s\n", object_id, expiry, reg_start_ts, TYPE_TO_STRING(type)); RegTimer* timer = NULL; map::iterator it = (type == TYPE_PEERING ? registration_timers_peers.find(object_id) : registration_timers.find(object_id)); bool marker = false; /* handle peerings */ if (type == TYPE_PEERING && it == registration_timers_peers.end()) { marker = true; /* handle subscribers */ } else if (type != TYPE_PEERING && it == registration_timers.end()) { marker = true; } if (marker) { DBG("timer object for subscription %ld not found, type: %s\n", object_id, TYPE_TO_STRING(type)); timer = new RegTimer(); timer->object_id = object_id; timer->type = type; // 'peering' or 'subscriber' DBG("created timer object [%p] for subscription %ld, type: %s\n", timer, object_id, TYPE_TO_STRING(type)); } else { if (it->second) { timer = it->second; DBG("removing scheduled timer...\n"); registration_scheduler.remove_timer(timer, false); } else { WARN("Failed to get existing timer for removing.\n"); return; } } timer->action = RegistrationActionEvent::Register; /* place timer */ if (minimum_reregister_interval>0.0) { uint64_t t_expiry_max = reg_start_ts; uint64_t t_expiry_min = reg_start_ts; if (expiry > reg_start_ts) t_expiry_max+=(expiry - reg_start_ts) * reregister_interval; if (expiry > reg_start_ts) t_expiry_min+=(expiry - reg_start_ts) * minimum_reregister_interval; if (t_expiry_max < now_time) { // calculated interval completely in the past - immediate re-registration // by setting the timer to now t_expiry_max = now_time; } if (t_expiry_min > t_expiry_max) t_expiry_min = t_expiry_max; if (t_expiry_max == now_time) { // immediate re-registration DBG("calculated re-registration at TS (%ld)" "(reg_start_ts=%ld, reg_expiry=%ld, reregister_interval=%f, " "minimum_reregister_interval=%f)\n", t_expiry_max, reg_start_ts, expiry, reregister_interval, minimum_reregister_interval); registration_scheduler.insert_timer_abs(timer, t_expiry_max * 1000000); } else { DBG("calculated re-registration at TS %ld .. %ld" "(reg_start_ts=%ld, reg_expiry=%ld, reregister_interval=%f, " "minimum_reregister_interval=%f)\n", t_expiry_min, t_expiry_max, reg_start_ts, expiry, reregister_interval, minimum_reregister_interval); registration_scheduler.insert_timer_abs(timer, t_expiry_min * 1000000, t_expiry_max * 1000000); } } else { uint64_t t_expiry = reg_start_ts; if (expiry > reg_start_ts) t_expiry+=(expiry - reg_start_ts) * reregister_interval; if (t_expiry < now_time) { t_expiry = now_time; DBG("re-registering at TS (%ld)\n", now_time); } DBG("calculated re-registration at TS %ld " "(reg_start_ts=%ld, reg_expiry=%ld, reregister_interval=%f)\n", t_expiry, reg_start_ts, expiry, reregister_interval); registration_scheduler.insert_timer_abs(timer, t_expiry * 1000000); } if (type == TYPE_PEERING) { registration_timers_peers.insert(std::make_pair(object_id, timer)); } else { registration_timers.insert(std::make_pair(object_id, timer)); } } void DBRegAgent::clearRegistrationTimer(long object_id, const regType type) { DBG("Removing timer for subscription %ld, type: %s", object_id, TYPE_TO_STRING(type)); map::iterator it = (type == TYPE_PEERING ? registration_timers_peers.find(object_id) : registration_timers.find(object_id)); /* clear registration timer for peerings */ if (type == TYPE_PEERING && it == registration_timers_peers.end()) { DBG("timer object for subscription %ld not found, type: %s\n", object_id, TYPE_TO_STRING(type)); return; /* clear registration timer for subscribers */ } else if (type != TYPE_PEERING && it == registration_timers.end()) { DBG("timer object for subscription %ld not found, type: %s\n", object_id, TYPE_TO_STRING(type)); return; } DBG("removing timer [%p] from scheduler\n", it->second); /* remote_timer() in this case takes care to deallocate the timer object */ registration_scheduler.remove_timer(it->second); if (type == TYPE_PEERING) { registration_timers_peers.erase(it); } else { registration_timers.erase(it); } } void DBRegAgent::removeRegistrationTimer(long object_id, const regType type) { DBG("removing timer object for subscription %ld, type: %s", object_id, TYPE_TO_STRING(type)); map::iterator it = (type == TYPE_PEERING ? registration_timers_peers.find(object_id) : registration_timers.find(object_id)); /* remove registration timer for peerings */ if (type == TYPE_PEERING && it == registration_timers_peers.end()) { DBG("timer object for subscription %ld not found, type: %s\n", object_id, TYPE_TO_STRING(type)); return; /* remove registration timer for subscribers */ } else if (type != TYPE_PEERING && it == registration_timers.end()) { DBG("timer object for subscription %ld not found, type: %s\n", object_id, TYPE_TO_STRING(type)); return; } if (it->second) { DBG("deleting timer object [%p]\n", it->second); delete it->second; } if (type == TYPE_PEERING) { registration_timers_peers.erase(it); } else { registration_timers.erase(it); } } void DBRegAgent::timer_cb(RegTimer* timer) { DBG("re-registration timer expired: subscriber %ld, timer=[%p], action %d, type %s\n", timer->object_id, timer, timer->action, TYPE_TO_STRING(timer->type)); registrations_mut.lock(); removeRegistrationTimer(timer->object_id, timer->type); registrations_mut.unlock(); switch (timer->action) { case RegistrationActionEvent::Register: scheduleRegistration(timer->object_id, timer->type); break; case RegistrationActionEvent::Deregister: scheduleDeregistration(timer->object_id, timer->type); break; case RegistrationActionEvent::Unknown: ERROR("internal: unknown reg_action %d for subscriber %ld timer event\n", timer->action, timer->object_id); break; default: ERROR("internal: unknown reg_action %d for subscriber %ld timer event\n", timer->action, timer->object_id); }; } void DBRegAgent::DIcreateRegistration(int object_id, const string& user, const string& pass, const string& realm, const string& contact, const string& auth_user, const regType type, AmArg& ret) { string auth_user_temp = (auth_user.empty() || auth_user == "" || auth_user == "NULL") ? user : auth_user; DBG("DI method: createRegistration(%i, %s, %s, %s, %s, %s)\n", object_id, auth_user_temp.c_str(), user.c_str(), pass.c_str(), realm.c_str(), contact.c_str()); createRegistration(object_id, auth_user_temp, user, pass, realm, contact, type); scheduleRegistration(object_id, type); ret.push(200); ret.push("OK"); } void DBRegAgent::DIupdateRegistration(int object_id, const string& user, const string& pass, const string& realm, const string& contact, const string& auth_user, const regType type, AmArg& ret) { string auth_user_temp = (auth_user.empty() || auth_user == "" || auth_user == "NULL") ? user : auth_user; DBG("DI method: updateRegistration(%i, %s, %s, %s, %s)\n", object_id, auth_user_temp.c_str(), user.c_str(), pass.c_str(), realm.c_str()); string contact_uri = contact; if (contact_uri.empty() && !contact_hostport.empty()) { contact_uri = "sip:"+ user + "@" + contact_hostport; } updateRegistration(object_id, auth_user_temp, user, pass, realm, contact_uri, type); ret.push(200); ret.push("OK"); } void DBRegAgent::DIremoveRegistration(int object_id, const regType type, AmArg& ret) { DBG("DI method: removeRegistration(%i)\n", object_id); scheduleDeregistration(object_id, type); registrations_mut.lock(); clearRegistrationTimer(object_id, type); registrations_mut.unlock(); ret.push(200); ret.push("OK"); } void DBRegAgent::DIrefreshRegistration(int object_id, const regType type, AmArg& ret) { DBG("DI method: refreshRegistration(%i)\n", object_id); scheduleRegistration(object_id, type); ret.push(200); ret.push("OK"); } // ///////// DI API /////////////////// void DBRegAgent::invoke(const string& method, const AmArg& args, AmArg& ret) { // create a brand new registration if (method == "createRegistration"){ args.assertArrayFmt("issssss"); // object_id, user, pass, realm, contact, auth_user, type string contact; string auth_user; string type_str; // 'peering' or 'subscriber' regType type = TYPE_UNDEFINED; // for case when: object_id, user, pass, realm, contact if (args.size() == 5) { assertArgCStr(args.get(4)); contact = args.get(4).asCStr(); // for case when: object_id, user, pass, realm, contact, auth_user } else if (args.size() == 6) { assertArgCStr(args.get(4)); assertArgCStr(args.get(5)); contact = args.get(4).asCStr(); auth_user = args.get(5).asCStr(); // for case when: object_id, user, pass, realm, contact, auth_user, type } else if (args.size() == 7) { assertArgCStr(args.get(4)); assertArgCStr(args.get(5)); assertArgCStr(args.get(6)); contact = args.get(4).asCStr(); auth_user = args.get(5).asCStr(); type_str = args.get(6).asCStr(); /* handle types setting */ if (!type_str.empty()) { if (type_str == TYPE_PEERING_STR) type = TYPE_PEERING; else if (type_str == TYPE_SUBSCRIBER_STR) type = TYPE_SUBSCRIBER; } } /* we only allow three possible types: 'peering', 'subscriber' and 'undefined' * But if undefined, we warn. */ if (type == TYPE_UNDEFINED) { WARN("REGISTER: Type of the registration object is undefined.\n"); } DBG("REGISTER: SEMS is about to Create a registration for: object_id=<%d>, type=<%s>, user=<%s>, realm=<%s> \n", args.get(0).asInt(), type_str.c_str(), args.get(1).asCStr(), args.get(3).asCStr()); DIcreateRegistration(args.get(0).asInt(), args.get(1).asCStr(), args.get(2).asCStr(), args.get(3).asCStr(), contact, auth_user, type, ret); // update an existing registration } else if (method == "updateRegistration"){ args.assertArrayFmt("issssss"); // object_id, user, pass, realm, contact, auth_user, type string contact; string auth_user; string type_str; // 'peering' or 'subscriber' regType type = TYPE_UNDEFINED; // for case when: object_id, user, pass, realm, contact if (args.size() == 5) { assertArgCStr(args.get(4)); contact = args.get(4).asCStr(); // for case when: object_id, user, pass, realm, contact, auth_user } else if (args.size() == 6) { assertArgCStr(args.get(4)); assertArgCStr(args.get(5)); contact = args.get(4).asCStr(); auth_user = args.get(5).asCStr(); // for case when: object_id, user, pass, realm, contact, auth_user, type } else if (args.size() == 7) { assertArgCStr(args.get(4)); assertArgCStr(args.get(5)); assertArgCStr(args.get(6)); contact = args.get(4).asCStr(); auth_user = args.get(5).asCStr(); type_str = args.get(6).asCStr(); /* handle types setting */ if (!type_str.empty()) { if (type_str == TYPE_PEERING_STR) type = TYPE_PEERING; else if (type_str == TYPE_SUBSCRIBER_STR) type = TYPE_SUBSCRIBER; } } /* we only allow three possible types: 'peering', 'subscriber' and 'undefined' * But if undefined, we warn. */ if (type == TYPE_UNDEFINED) { WARN("REGISTER: Type of the registration object is undefined.\n"); } DBG("REGISTER: SEMS is about to Update a registration for: object_id=<%d>, type=<%s>, user=<%s>, realm=<%s> \n", args.get(0).asInt(), type_str.c_str(), args.get(1).asCStr(), args.get(3).asCStr()); DIupdateRegistration(args.get(0).asInt(), args.get(1).asCStr(), args.get(2).asCStr(), args.get(3).asCStr(), contact, auth_user, type, ret); // remove an existing registration } else if (method == "removeRegistration") { args.assertArrayFmt("is"); // object_id, type string type_str; // must be 'peering' or 'subscriber' regType type = TYPE_UNDEFINED; if (args.size() == 2) type_str = args.get(1).asCStr(); /* handle types setting */ if (!type_str.empty()) { if (type_str == TYPE_PEERING_STR) type = TYPE_PEERING; else if (type_str == TYPE_SUBSCRIBER_STR) type = TYPE_SUBSCRIBER; } DIremoveRegistration(args.get(0).asInt(), type, ret); // refresh an existing registration } else if (method == "refreshRegistration") { args.assertArrayFmt("is"); // object_id, type string type_str; // must be 'peering' or 'subscriber' regType type = TYPE_UNDEFINED; if (args.size() == 2) type_str = args.get(1).asCStr(); /* handle types setting */ if (!type_str.empty()) { if (type_str == TYPE_PEERING_STR) type = TYPE_PEERING; else if (type_str == TYPE_SUBSCRIBER_STR) type = TYPE_SUBSCRIBER; } DIrefreshRegistration(args.get(0).asInt(), type, ret); } else if(method == "_list") { ret.push(AmArg("createRegistration")); ret.push(AmArg("updateRegistration")); ret.push(AmArg("removeRegistration")); ret.push(AmArg("refreshRegistration")); } else { throw AmDynInvoke::NotImplemented(method); } } // /////////////// processor thread ///////////////// DBRegAgentProcessorThread::DBRegAgentProcessorThread() : AmEventQueue(this) { } DBRegAgentProcessorThread::~DBRegAgentProcessorThread() { } void DBRegAgentProcessorThread::on_stop() { DBG("removing %s registrations from Event Dispatcher...\n", MOD_NAME "_processor"); AmEventDispatcher::instance()->delEventQueue(MOD_NAME "_processor"); } void DBRegAgentProcessorThread::rateLimitWait() { DBG("applying rate limit %u initial requests per %us\n", DBRegAgent::ratelimit_rate, DBRegAgent::ratelimit_per); DBG("allowance before ratelimit: %f\n", allowance); struct timeval current; struct timeval time_passed; gettimeofday(¤t, 0); timersub(¤t, &last_check, &time_passed); memcpy(&last_check, ¤t, sizeof(struct timeval)); double seconds_passed = (double)time_passed.tv_sec + (double)time_passed.tv_usec / 1000000.0; allowance += seconds_passed * (double) DBRegAgent::ratelimit_rate / (double)DBRegAgent::ratelimit_per; if (allowance > (double)DBRegAgent::ratelimit_rate) allowance = (double)DBRegAgent::ratelimit_rate; // enough time passed, but limit to max if (allowance < 1.0) { useconds_t sleep_time = 1000000.0 * (1.0 - allowance) * ((double)DBRegAgent::ratelimit_per/(double)DBRegAgent::ratelimit_rate); DBG("not enough allowance (%f), sleeping %d useconds\n", allowance, sleep_time); usleep(sleep_time); allowance=0.0; gettimeofday(&last_check, 0); } else { allowance -= 1.0; } DBG("allowance left: %f\n", allowance); } void DBRegAgentProcessorThread::run() { DBG("DBRegAgentProcessorThread thread started\n"); // register us as SIP event receiver for MOD_NAME_processor AmEventDispatcher::instance()->addEventQueue(MOD_NAME "_processor",this); mysqlpp::Connection::thread_start(); // initialize ratelimit gettimeofday(&last_check, NULL); if (DBRegAgent::ratelimit_slowstart) allowance = 0.0; else allowance = DBRegAgent::ratelimit_rate; reg_agent = DBRegAgent::instance(); while (!stop_requested() || eventPending()) { waitForEventTimed(500); while (eventPending()) { rateLimitWait(); processSingleEvent(); } } mysqlpp::Connection::thread_end(); DBG("DBRegAgentProcessorThread thread stopped\n"); } void DBRegAgentProcessorThread::process(AmEvent* ev) { if (ev->event_id == E_SYSTEM) { AmSystemEvent* sys_ev = dynamic_cast(ev); if(sys_ev){ DBG("Session received system Event\n"); if (sys_ev->sys_event == AmSystemEvent::ServerShutdown) { DBG("stopping processor thread\n"); stop(); } return; } } if (ev->event_id == RegistrationActionEventID) { RegistrationActionEvent* reg_action_ev = dynamic_cast(ev); if (reg_action_ev) { reg_agent->onRegistrationActionEvent(reg_action_ev); return; } } ERROR("unknown event received!\n"); } #if 0 void test_cb(RegTimer* tr, long data1, void* data2) { DBG("cb called: [%p], data %ld / [%p]\n", tr, data1, data2); } void DBRegAgent::run_tests() { registration_timer.start(); struct timeval now; gettimeofday(&now, 0); RegTimer rt; rt.expires = now.tv_sec + 10; rt.cb=test_cb; registration_scheduler.insert_timer(&rt); RegTimer rt2; rt2.expires = now.tv_sec + 5; rt2.cb=test_cb; registration_scheduler.insert_timer(&rt2); RegTimer rt3; rt3.expires = now.tv_sec + 15; rt3.cb=test_cb; registration_scheduler.insert_timer(&rt3); RegTimer rt4; rt4.expires = now.tv_sec - 1; rt4.cb=test_cb; registration_scheduler.insert_timer(&rt4); RegTimer rt5; rt5.expires = now.tv_sec + 100000; rt5.cb=test_cb; registration_scheduler.insert_timer(&rt5); RegTimer rt6; rt6.expires = now.tv_sec + 100; rt6.cb=test_cb; registration_scheduler.insert_timer_leastloaded(&rt6, now.tv_sec+5, now.tv_sec+50); sleep(30); gettimeofday(&now, 0); RegTimer rt7; rt6.expires = now.tv_sec + 980; rt6.cb=test_cb; registration_scheduler.insert_timer_leastloaded(&rt6, now.tv_sec+9980, now.tv_sec+9990); vector rts; for (int i=0;i<1000;i++) { RegTimer* t = new RegTimer(); rts.push_back(t); t->expires = now.tv_sec + i; t->cb=test_cb; registration_scheduler.insert_timer_leastloaded(t, now.tv_sec, now.tv_sec+1000); } sleep(200); } #endif