MT#62181 wheeltimer: rework using std::map

Change-Id: I2e4e655e05ecdbc871712bbf364544679f142271
mr13.3.1
Richard Fuchs 11 months ago
parent 06466ee843
commit b58cb99611

@ -203,7 +203,7 @@ void trans_timer::fire()
if(bucket){
bucket->lock();
if(bucket->exist(t)){
DBG("Transaction timer expired: type=%s, trans=%p, eta=%" PRIu64 ", t=%i\n",
DBG("Transaction timer expired: type=%s, trans=%p, eta=%" PRIu64 ", t=%" PRIu64 "\n",
timer_name(type),t,expires,wheeltimer::instance()->get_wall_clock());
trans_timer* tt = t->get_timer(this->type & 0xFFFF);

@ -50,9 +50,7 @@ void _wheeltimer::insert_timer(timer* t)
//add new timer to user request list
std::lock_guard<AmMutex> lock(reqs_m);
reqs_backlog.push_back(timer_req(t,true));
// Wake up worker thread: This triggers turn_wheel() based on how many ticks have passed,
// and in turn brings wall_clock up to date. Finally the events queue is processed, which
// adds the timer to the wheel based on the now-updated wall_clock.
// Wake up worker thread: This causes the timer to be added to the appropriate bucket
reqs_cond.set(true);
}
@ -75,83 +73,45 @@ void _wheeltimer::remove_timer(timer* t)
void _wheeltimer::run()
{
uint64_t now, next_tick, diff; // microseconds
now = gettimeofday_us();
next_tick = now + resolution;
while(true){
now = gettimeofday_us();
if(now < next_tick){
// make sure everything that's been added or removed is taken into account
process_events();
diff = next_tick - now;
// figure out whether there's anything to run, and for how long to sleep
// Sleep up to diff ms OR up to 0.5 seconds if there are no timers,
// but wake up early if something is added to reqs_backlog
if (num_timers)
reqs_cond.wait_for_to(diff / 1000);
else
reqs_cond.wait_for_to(500); // 0.5 s
}
//else {
//printf("missed one tick\n");
//}
uint64_t now = gettimeofday_us();
now = gettimeofday_us();
auto beg = buckets.begin();
while (now >= next_tick) {
turn_wheel();
next_tick += resolution;
if (beg == buckets.end()) {
// nothing to wait for - sleep the fixed maximum allowed
reqs_cond.wait_for_to(max_sleep_time / 1000);
continue;
}
process_events();
}
}
void _wheeltimer::update_wheel(int wheel)
{
// do not try do update wheel 0
if(!wheel)
return;
for(;wheel;wheel--){
int pos = (wall_clock >> (wheel*BITS_PER_WHEEL))
& ((1<<BITS_PER_WHEEL)-1);
auto& w = wheels[wheel][pos];
while (!w.empty()) {
auto* t = w.front();
w.pop_front();
place_timer(t, wheel-1);
}
auto next = beg->first;
if (next > now) {
uint64_t diff = next - now;
// don't bother sleeping more than half a millisecond
if (diff > min_sleep_time) {
// Sleep up to diff ms OR up to the allowed maximum if it's longer
// but wake up early if something is added to reqs_backlog
if (diff < max_sleep_time)
reqs_cond.wait_for_to(diff / 1000);
else
reqs_cond.wait_for_to(max_sleep_time / 1000);
continue;
}
}
}
void _wheeltimer::turn_wheel()
{
u_int32_t mask = ((1<<BITS_PER_WHEEL)-1); // 0x00 00 00 FF
int i=0;
//determine which wheel should be updated
for(;i<WHEELS;i++){
if((wall_clock & mask) ^ mask)
break;
mask <<= BITS_PER_WHEEL;
}
// this slot needs to run now
process_current_timers(beg->second);
//increment time
wall_clock++;
// Update existing timer entries
update_wheel(i);
//check for expired timer to process
process_current_timers();
// all done, remove bucket
buckets.erase(beg);
}
}
void _wheeltimer::process_events()
@ -174,22 +134,16 @@ void _wheeltimer::process_events()
delete_timer(rq.t);
}
}
//check for expired timer to process in case we just added some
process_current_timers();
}
void _wheeltimer::process_current_timers()
void _wheeltimer::process_current_timers(timer_list& list)
{
auto& w = wheels[0][wall_clock & 0xFF];
while (!w.empty()) {
auto* t = w.front();
w.pop_front();
while (!list.empty()) {
auto* t = list.front();
list.pop_front();
t->list = NULL;
t->disarm();
num_timers--;
t->fire();
}
@ -197,45 +151,27 @@ void _wheeltimer::process_current_timers()
void _wheeltimer::place_timer(timer* t)
{
if (t->arm())
num_timers++;
if(t->get_absolute_expiry() < gettimeofday_us()) {
// we put the late ones at the beginning of next wheel turn
add_timer_to_wheel(t,0,((1<<BITS_PER_WHEEL)-1) & wall_clock);
return;
}
place_timer(t,WHEELS-1);
}
void _wheeltimer::place_timer(timer* t, int wheel)
{
unsigned int pos;
unsigned int expiry_ticks = t->get_absolute_expiry() / resolution;
unsigned int clock_mask = expiry_ticks ^ wall_clock;
t->arm();
for(; wheel; wheel--){
uint64_t exp = t->get_absolute_expiry();
if( (clock_mask >> (wheel*BITS_PER_WHEEL))
& ((1<<BITS_PER_WHEEL)-1) ) {
// scale expiry based on resolution: this is the bucket index
exp = ((exp / resolution) + 1) * resolution;
break;
}
}
// if expiry is too soon or in the past, put the timer in the next bucket up
auto now = gettimeofday_us();
if (exp <= now)
exp = ((now / resolution) + 1) * resolution;
// we went down to wheel 0
pos = (expiry_ticks >> (wheel*BITS_PER_WHEEL)) & ((1<<BITS_PER_WHEEL)-1);
add_timer_to_wheel(t,wheel,pos);
add_timer_to_bucket(t, exp);
}
void _wheeltimer::add_timer_to_wheel(timer* t, int wheel, unsigned int pos)
void _wheeltimer::add_timer_to_bucket(timer* t, uint64_t bucket)
{
t->list = &wheels[wheel][pos];
t->list->push_front(t);
t->pos = wheels[wheel][pos].begin();
auto& b = buckets[bucket];
t->list = &b;
b.push_front(t);
t->pos = b.begin();
}
void _wheeltimer::delete_timer(timer* t)

@ -37,18 +37,14 @@
#include <time.h>
#include <deque>
#include <list>
#include <map>
#include "atomic_types.h"
#define BITS_PER_WHEEL 8
#define ELMTS_PER_WHEEL (1 << BITS_PER_WHEEL)
// do not change
#define WHEELS 4
#include "log.h"
class timer;
typedef std::list<timer*> timer_list;
typedef std::map<uint64_t, timer_list> timer_buckets;
class timer
{
@ -116,41 +112,43 @@ class _wheeltimer:
{}
};
//the timer wheel
timer_list wheels[WHEELS][ELMTS_PER_WHEEL];
unsigned int num_timers;
timer_buckets buckets;
uint64_t resolution; // microseconds
// Only go to sleep if the next timer to run is at least this far in the future.
// If the required sleep time is less long than this, don't bother going to sleep,
// and just run the timers now. In microseconds.
const uint64_t min_sleep_time = 500; // half a millisecond
// Don't sleep longer than this, even if the next timer to run is further in the
// future (or if no timers exist). Needed not to miss the shutdown flag being set.
const uint64_t max_sleep_time = 500000; // half a second
// request backlog lock (insert/remove)
AmMutex reqs_m;
AmCondition reqs_cond; // to wake up worker thread when a request is added
std::deque<timer_req> reqs_backlog;
u_int32_t wall_clock; // 32 bits, "resolution" based ticks starting from epoch
void turn_wheel();
void process_events();
void update_wheel(int wheel);
void place_timer(timer* t);
void place_timer(timer* t, int wheel);
void add_timer_to_wheel(timer* t, int wheel, unsigned int pos);
void add_timer_to_bucket(timer* t, uint64_t);
void delete_timer(timer* t);
void process_current_timers();
void process_current_timers(timer_list&);
protected:
void run();
void on_stop(){}
_wheeltimer()
: resolution(20000), // 20 ms == 20000 us
wall_clock(gettimeofday_us() / resolution) {}
: resolution(20000) // 20 ms == 20000 us
{}
_wheeltimer(uint64_t _resolution)
: resolution(_resolution),
wall_clock(gettimeofday_us() / resolution) {}
: resolution(_resolution)
{}
public:
//clock reference
@ -163,9 +161,9 @@ public:
_uc unix_clock;
// for debugging/logging only!
u_int32_t get_wall_clock()
uint64_t get_wall_clock()
{
return wall_clock;
return gettimeofday_us();
}
void insert_timer(timer* t);

Loading…
Cancel
Save