lock striping on timers (UserTimer/SessionTimer)

sayer/1.4-spce2.6
Stefan Sayer 15 years ago
parent fe7e188ed2
commit 5bd6bfdff2

@ -1,6 +1,8 @@
#include "UserTimer.h" #include "UserTimer.h"
#include "sip/hash.h"
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
@ -72,29 +74,30 @@ bool operator < (const AmTimer& l, const AmTimer& r)
void UserTimer::checkTimers() { void UserTimer::checkTimers() {
vector<std::pair<string, int> > expired_timers; vector<std::pair<string, int> > expired_timers;
timers_mut.lock();
if(timers.empty()){
timers_mut.unlock();
return;
}
struct timeval cur_time; struct timeval cur_time;
gettimeofday(&cur_time,NULL); gettimeofday(&cur_time,NULL);
std::multiset<AmTimer>::iterator it = timers.begin(); // run through all buckets
for (unsigned int bucket=0;bucket<TIMERS_LOCKSTRIPE_BUCKETS;bucket++) {
// get expired timers in bucket
timers_mut[bucket].lock();
if (!timers[bucket].empty()) {
std::multiset<AmTimer>::iterator it = timers[bucket].begin();
while( timercmp(&it->time,&cur_time,<) while (timercmp(&it->time,&cur_time,<)
|| timercmp(&it->time,&cur_time,==) ){ || timercmp(&it->time,&cur_time,==)) {
int id = it->id; int id = it->id;
string session_id = it->session_id; string session_id = it->session_id;
// erase // erase
timers.erase(it); timers[bucket].erase(it);
expired_timers.push_back(make_pair(session_id, id)); expired_timers.push_back(make_pair(session_id, id));
if(timers.empty()) break; if(timers[bucket].empty()) break;
it = timers.begin(); it = timers[bucket].begin();
}
}
timers_mut[bucket].unlock();
} }
timers_mut.unlock();
for (vector<std::pair<string, int> >::iterator e_it = for (vector<std::pair<string, int> >::iterator e_it =
expired_timers.begin(); e_it != expired_timers.end(); e_it++) { expired_timers.begin(); e_it != expired_timers.end(); e_it++) {
@ -122,31 +125,41 @@ void UserTimer::setTimer(int id, int seconds, const string& session_id) {
void UserTimer::setTimer(int id, struct timeval* t, void UserTimer::setTimer(int id, struct timeval* t,
const string& session_id) const string& session_id)
{ {
timers_mut.lock(); unsigned int bucket = hash(session_id);
timers_mut[bucket].lock();
// erase old timer if exists // erase old timer if exists
unsafe_removeTimer(id, session_id); unsafe_removeTimer(id, session_id, bucket);
// add new // add new
timers.insert(AmTimer(id, session_id, t)); timers[bucket].insert(AmTimer(id, session_id, t));
timers_mut.unlock(); timers_mut[bucket].unlock();
} }
void UserTimer::removeTimer(int id, const string& session_id) { void UserTimer::removeTimer(int id, const string& session_id) {
timers_mut.lock(); unsigned int bucket = hash(session_id);
unsafe_removeTimer(id, session_id); timers_mut[bucket].lock();
timers_mut.unlock(); unsafe_removeTimer(id, session_id, bucket);
timers_mut[bucket].unlock();
}
unsigned int UserTimer::hash(const string& s1)
{
return hashlittle(s1.c_str(),s1.length(),0)
& (TIMERS_LOCKSTRIPE_BUCKETS-1);
} }
void UserTimer::unsafe_removeTimer(int id, const string& session_id) void UserTimer::unsafe_removeTimer(int id, const string& session_id, unsigned int bucket)
{ {
// erase old timer if exists // erase old timer if exists
std::multiset<AmTimer>::iterator it = timers.begin(); std::multiset<AmTimer>::iterator it = timers[bucket].begin();
while (it != timers.end()) { while (it != timers[bucket].end()) {
if ((it->id == id)&&(it->session_id == session_id)) { if ((it->id == id)&&(it->session_id == session_id)) {
timers.erase(it); timers[bucket].erase(it);
break; break;
} }
it++; it++;
@ -155,36 +168,38 @@ void UserTimer::unsafe_removeTimer(int id, const string& session_id)
void UserTimer::removeTimers(const string& session_id) { void UserTimer::removeTimers(const string& session_id) {
// DBG("removing timers for <%s>\n", session_id.c_str()); // DBG("removing timers for <%s>\n", session_id.c_str());
timers_mut.lock(); unsigned int bucket = hash(session_id);
std::multiset<AmTimer>::iterator it = timers.begin(); timers_mut[bucket].lock();
while (it != timers.end()) { std::multiset<AmTimer>::iterator it = timers[bucket].begin();
while (it != timers[bucket].end()) {
if (it->session_id == session_id) { if (it->session_id == session_id) {
std::multiset<AmTimer>::iterator d_it = it; std::multiset<AmTimer>::iterator d_it = it;
it++; it++;
timers.erase(d_it); timers[bucket].erase(d_it);
// DBG(" o timer removed.\n"); // DBG(" o timer removed.\n");
} else { } else {
it++; it++;
} }
} }
timers_mut.unlock(); timers_mut[bucket].unlock();
} }
void UserTimer::removeUserTimers(const string& session_id) { void UserTimer::removeUserTimers(const string& session_id) {
// DBG("removing User timers for <%s>\n", session_id.c_str()); // DBG("removing User timers for <%s>\n", session_id.c_str());
timers_mut.lock(); unsigned int bucket = hash(session_id);
std::multiset<AmTimer>::iterator it = timers.begin(); timers_mut[bucket].lock();
while (it != timers.end()) { std::multiset<AmTimer>::iterator it = timers[bucket].begin();
while (it != timers[bucket].end()) {
if ((it->id > 0)&&(it->session_id == session_id)) { if ((it->id > 0)&&(it->session_id == session_id)) {
std::multiset<AmTimer>::iterator d_it = it; std::multiset<AmTimer>::iterator d_it = it;
it++; it++;
timers.erase(d_it); timers[bucket].erase(d_it);
// DBG(" o timer removed.\n"); // DBG(" o timer removed.\n");
} else { } else {
it++; it++;
} }
} }
timers_mut.unlock(); timers_mut[bucket].unlock();
} }
void UserTimer::invoke(const string& method, const AmArg& args, AmArg& ret) void UserTimer::invoke(const string& method, const AmArg& args, AmArg& ret)

@ -11,6 +11,10 @@
#define TIMEOUT_EVENT_ID 99 #define TIMEOUT_EVENT_ID 99
// 2^5 = 32 timer buckets - should alleviate any lock contention
#define TIMERS_LOCKSTRIPE_POWER 5
#define TIMERS_LOCKSTRIPE_BUCKETS (1<<TIMERS_LOCKSTRIPE_POWER)
#ifdef SESSION_TIMER_THREAD #ifdef SESSION_TIMER_THREAD
#include "AmThread.h" #include "AmThread.h"
#endif #endif
@ -64,10 +68,12 @@ class UserTimer: public AmDynInvoke
{ {
static UserTimer* _instance; static UserTimer* _instance;
std::multiset<AmTimer> timers; std::multiset<AmTimer> timers[TIMERS_LOCKSTRIPE_BUCKETS];
AmMutex timers_mut; AmMutex timers_mut[TIMERS_LOCKSTRIPE_BUCKETS];
unsigned int hash(const string& s1);
void unsafe_removeTimer(int id, const string& session_id); void unsafe_removeTimer(int id, const string& session_id, unsigned int bucket);
public: public:
UserTimer(); UserTimer();
~UserTimer(); ~UserTimer();

Loading…
Cancel
Save