diff --git a/core/AmEventDispatcher.cpp b/core/AmEventDispatcher.cpp index 512e1f01..916a5889 100644 --- a/core/AmEventDispatcher.cpp +++ b/core/AmEventDispatcher.cpp @@ -32,18 +32,17 @@ unsigned int AmEventDispatcher::hash(const string& s1) { - return hashlittle(s1.c_str(),s1.length(),0) - & (EVENT_DISPATCHER_BUCKETS-1); + return hashlittle(s1.c_str(),s1.length(),0) & (EVENT_DISPATCHER_BUCKETS-1); } unsigned int AmEventDispatcher::hash(const string& s1, const string s2) { - unsigned int h=0; + unsigned int h=0; - h = hashlittle(s1.c_str(),s1.length(),h); - h = hashlittle(s2.c_str(),s2.length(),h); + h = hashlittle(s1.c_str(),s1.length(),h); + h = hashlittle(s2.c_str(),s2.length(),h); - return h & (EVENT_DISPATCHER_BUCKETS-1); + return h & (EVENT_DISPATCHER_BUCKETS-1); } AmEventDispatcher* AmEventDispatcher::_instance=NULL; @@ -53,191 +52,189 @@ AmEventDispatcher* AmEventDispatcher::instance() return _instance ? _instance : ((_instance = new AmEventDispatcher())); } - bool AmEventDispatcher::addEventQueue(const string& local_tag, - AmEventQueueInterface* q) + AmEventQueueInterface* q) { - unsigned int queue_bucket = hash(local_tag); + unsigned int queue_bucket = hash(local_tag); - lock_guard lock(queues_mut[queue_bucket]); + lock_guard lock(queues_mut[queue_bucket]); - if (queues[queue_bucket].find(local_tag) != queues[queue_bucket].end()) { - return false; - } + if (queues[queue_bucket].find(local_tag) != queues[queue_bucket].end()) { + return false; + } - queues[queue_bucket][local_tag] = QueueEntry(q); - return true; + queues[queue_bucket][local_tag] = QueueEntry(q); + return true; } - /** @return false on error */ bool AmEventDispatcher::addEventQueue(const string& local_tag, - AmEventQueueInterface* q, - const string& callid, - const string& remote_tag, - const string& via_branch) + AmEventQueueInterface* q, + const string& callid, + const string& remote_tag, + const string& via_branch) { - if(local_tag.empty () ||callid.empty() || remote_tag.empty() || via_branch.empty()) { - ERROR("local_tag, callid, remote_tag or via_branch is empty"); - return false; - } + if(local_tag.empty () ||callid.empty() || remote_tag.empty() || via_branch.empty()) { + ERROR("local_tag, callid, remote_tag or via_branch is empty"); + return false; + } - unsigned int queue_bucket = hash(local_tag); + unsigned int queue_bucket = hash(local_tag); - lock_guard lock(queues_mut[queue_bucket]); + lock_guard lock(queues_mut[queue_bucket]); - if (queues[queue_bucket].find(local_tag) != queues[queue_bucket].end()) { - return false; - } + if (queues[queue_bucket].find(local_tag) != queues[queue_bucket].end()) { + return false; + } - // try to find via id_lookup - string id = callid+remote_tag; - if(AmConfig::AcceptForkedDialogs){ - id += via_branch; - } - unsigned int id_bucket = hash(id); + // try to find via id_lookup + string id = callid+remote_tag; + if(AmConfig::AcceptForkedDialogs){ + id += via_branch; + } + unsigned int id_bucket = hash(id); - lock_guard lock_l(id_lookup_mut[id_bucket]); - - if (id_lookup[id_bucket].find(id) != - id_lookup[id_bucket].end()) { - return false; - } + lock_guard lock_l(id_lookup_mut[id_bucket]); - queues[queue_bucket][local_tag] = QueueEntry(q,id); - id_lookup[id_bucket][id] = local_tag; - - return true; + if (id_lookup[id_bucket].find(id) != id_lookup[id_bucket].end()) { + return false; + } + + queues[queue_bucket][local_tag] = QueueEntry(q,id); + id_lookup[id_bucket][id] = local_tag; + + return true; } AmEventQueueInterface* AmEventDispatcher::delEventQueue(const string& local_tag) { - AmEventQueueInterface* q = NULL; - unsigned int queue_bucket = hash(local_tag); + AmEventQueueInterface* q = NULL; + unsigned int queue_bucket = hash(local_tag); + + lock_guard lock(queues_mut[queue_bucket]); - lock_guard lock(queues_mut[queue_bucket]); + EvQueueMapIter qi = queues[queue_bucket].find(local_tag); + if (qi != queues[queue_bucket].end()) { + + QueueEntry qe(qi->second); + queues[queue_bucket].erase(qi); + q = qe.q; - EvQueueMapIter qi = queues[queue_bucket].find(local_tag); - if(qi != queues[queue_bucket].end()) { - - QueueEntry qe(qi->second); - queues[queue_bucket].erase(qi); - q = qe.q; - - if(!qe.id.empty()) { - unsigned int id_bucket = hash(qe.id); - - lock_guard lock_l(id_lookup_mut[id_bucket]); - - DictIter di = id_lookup[id_bucket].find(qe.id); - if(di != id_lookup[id_bucket].end()) { - id_lookup[id_bucket].erase(di); - } - + if (!qe.id.empty()) { + unsigned int id_bucket = hash(qe.id); + + lock_guard lock_l(id_lookup_mut[id_bucket]); + DictIter di = id_lookup[id_bucket].find(qe.id); + if (di != id_lookup[id_bucket].end()) { + id_lookup[id_bucket].erase(di); } } - - return q; + } + + return q; } bool AmEventDispatcher::post(const string& local_tag, AmEvent* ev) { - bool posted = false; - - unsigned int queue_bucket = hash(local_tag); - - lock_guard lock(queues_mut[queue_bucket]); - - EvQueueMapIter it = queues[queue_bucket].find(local_tag); - if(it != queues[queue_bucket].end()){ - it->second.q->postEvent(ev); - posted = true; - } - - return posted; -} + bool posted = false; + + unsigned int queue_bucket = hash(local_tag); + lock_guard lock(queues_mut[queue_bucket]); + + EvQueueMapIter it = queues[queue_bucket].find(local_tag); + if (it != queues[queue_bucket].end()) { + it->second.q->postEvent(ev); + posted = true; + } + + return posted; +} bool AmEventDispatcher::post(const string& callid, - const string& remote_tag, - const string& via_branch, - AmEvent* ev) + const string& remote_tag, + const string& via_branch, + AmEvent* ev) { - string id = callid+remote_tag; - if(AmConfig::AcceptForkedDialogs){ - id += via_branch; - } - unsigned int id_bucket = hash(id); + string id = callid+remote_tag; + if (AmConfig::AcceptForkedDialogs) { + id += via_branch; + } + unsigned int id_bucket = hash(id); - lock_guard lock(id_lookup_mut[id_bucket]); + lock_guard lock(id_lookup_mut[id_bucket]); - DictIter di = id_lookup[id_bucket].find(id); - if (di == id_lookup[id_bucket].end()) { - return false; - } - string local_tag = di->second; - - return post(local_tag, ev); + DictIter di = id_lookup[id_bucket].find(id); + if (di == id_lookup[id_bucket].end()) { + return false; + } + string local_tag = di->second; + + return post(local_tag, ev); } bool AmEventDispatcher::broadcast(AmEvent* ev) { - if (!ev) - return false; - - bool posted = false; - for (size_t i=0;isecond.q->postEvent(ev->clone()); - queues_mut[i].lock(); - posted = true; - } + if (!ev) + return false; + + bool posted = false; + for (size_t i=0; i < EVENT_DISPATCHER_BUCKETS; i++) + { + queues_mut[i].lock(); + + EvQueueMapIter it = queues[i].begin(); + while (it != queues[i].end()) + { + EvQueueMapIter this_evq = it; + it++; queues_mut[i].unlock(); + this_evq->second.q->postEvent(ev->clone()); + queues_mut[i].lock(); + posted = true; } + queues_mut[i].unlock(); + } - delete ev; + delete ev; - return posted; + return posted; } bool AmEventDispatcher::empty() { - bool res = true; - for (size_t i=0;i lock(queues_mut[i]); - res = res&queues[i].empty(); - if (!res) - break; - } - return res; + bool res = true; + for (size_t i=0; i < EVENT_DISPATCHER_BUCKETS; i++) + { + lock_guard lock(queues_mut[i]); + res = res&queues[i].empty(); + if (!res) + break; + } + return res; } void AmEventDispatcher::dump() { - DBG("*** dumping Event dispatcher buckets ***\n"); - for (size_t i=0;i %p\n",it->first.c_str(),it->second.q); - } + DBG("*** dumping Event dispatcher buckets ***\n"); + for (size_t i=0; i %p\n",it->first.c_str(),it->second.q); } - queues_mut[i].unlock(); + } + queues_mut[i].unlock(); - lock_guard lock(id_lookup_mut[i]); - if(!id_lookup[i].empty()) { - DBG("id_lookup[%zu].size() = %zu",i,id_lookup[i].size()); - } + lock_guard lock(id_lookup_mut[i]); + if(!id_lookup[i].empty()) { + DBG("id_lookup[%zu].size() = %zu",i,id_lookup[i].size()); } - DBG("*** End of Event dispatcher bucket dump ***\n"); + } + DBG("*** End of Event dispatcher bucket dump ***\n"); } void AmEventDispatcher::dispose() @@ -251,41 +248,41 @@ void AmEventDispatcher::dispose() } } -/** this function optimizes posting of SIP Requests +/** this function optimizes posting of SIP Requests - if the session does not exist, no event need to be created (req copied) */ bool AmEventDispatcher::postSipRequest(const AmSipRequest& req) { - // get local tag - bool posted = false; - string callid = req.callid; - string remote_tag = req.from_tag; - - string id = callid+remote_tag; - if(AmConfig::AcceptForkedDialogs){ - id += req.via_branch; - } - unsigned int id_bucket = hash(id); + // get local tag + bool posted = false; + string callid = req.callid; + string remote_tag = req.from_tag; + + string id = callid+remote_tag; + if (AmConfig::AcceptForkedDialogs) { + id += req.via_branch; + } + unsigned int id_bucket = hash(id); - id_lookup_mut[id_bucket].lock(); + id_lookup_mut[id_bucket].lock(); - DictIter di = id_lookup[id_bucket].find(id); - if (di == id_lookup[id_bucket].end()) { - id_lookup_mut[id_bucket].unlock(); - return false; - } - string local_tag = di->second; + DictIter di = id_lookup[id_bucket].find(id); + if (di == id_lookup[id_bucket].end()) { id_lookup_mut[id_bucket].unlock(); - - // post(local_tag) - unsigned int queue_bucket = hash(local_tag); - - lock_guard lock(queues_mut[queue_bucket]); - - EvQueueMapIter it = queues[queue_bucket].find(local_tag); - if(it != queues[queue_bucket].end()){ - it->second.q->postEvent(new AmSipRequestEvent(req)); - posted = true; - } + return false; + } + string local_tag = di->second; + id_lookup_mut[id_bucket].unlock(); + + // post(local_tag) + unsigned int queue_bucket = hash(local_tag); + + lock_guard lock(queues_mut[queue_bucket]); + + EvQueueMapIter it = queues[queue_bucket].find(local_tag); + if (it != queues[queue_bucket].end()) { + it->second.q->postEvent(new AmSipRequestEvent(req)); + posted = true; + } - return posted; + return posted; }