From b58cb996113247355d8ac6bbfc820122e7783aa0 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 19 Feb 2025 16:03:40 -0400 Subject: [PATCH] MT#62181 wheeltimer: rework using std::map Change-Id: I2e4e655e05ecdbc871712bbf364544679f142271 --- core/sip/sip_trans.cpp | 2 +- core/sip/wheeltimer.cpp | 158 ++++++++++++---------------------------- core/sip/wheeltimer.h | 44 ++++++----- 3 files changed, 69 insertions(+), 135 deletions(-) diff --git a/core/sip/sip_trans.cpp b/core/sip/sip_trans.cpp index a50a5948..529e0d86 100644 --- a/core/sip/sip_trans.cpp +++ b/core/sip/sip_trans.cpp @@ -203,7 +203,7 @@ void trans_timer::fire() if(bucket){ bucket->lock(); if(bucket->exist(t)){ - DBG("Transaction timer expired: type=%s, trans=%p, eta=%" PRIu64 ", t=%i\n", + DBG("Transaction timer expired: type=%s, trans=%p, eta=%" PRIu64 ", t=%" PRIu64 "\n", timer_name(type),t,expires,wheeltimer::instance()->get_wall_clock()); trans_timer* tt = t->get_timer(this->type & 0xFFFF); diff --git a/core/sip/wheeltimer.cpp b/core/sip/wheeltimer.cpp index 66ecfb73..b8d5db5b 100644 --- a/core/sip/wheeltimer.cpp +++ b/core/sip/wheeltimer.cpp @@ -50,9 +50,7 @@ void _wheeltimer::insert_timer(timer* t) //add new timer to user request list std::lock_guard lock(reqs_m); reqs_backlog.push_back(timer_req(t,true)); - // Wake up worker thread: This triggers turn_wheel() based on how many ticks have passed, - // and in turn brings wall_clock up to date. Finally the events queue is processed, which - // adds the timer to the wheel based on the now-updated wall_clock. + // Wake up worker thread: This causes the timer to be added to the appropriate bucket reqs_cond.set(true); } @@ -75,83 +73,45 @@ void _wheeltimer::remove_timer(timer* t) void _wheeltimer::run() { - uint64_t now, next_tick, diff; // microseconds - - now = gettimeofday_us(); - next_tick = now + resolution; - while(true){ - now = gettimeofday_us(); - - if(now < next_tick){ + // make sure everything that's been added or removed is taken into account + process_events(); - diff = next_tick - now; + // figure out whether there's anything to run, and for how long to sleep - // Sleep up to diff ms OR up to 0.5 seconds if there are no timers, - // but wake up early if something is added to reqs_backlog - if (num_timers) - reqs_cond.wait_for_to(diff / 1000); - else - reqs_cond.wait_for_to(500); // 0.5 s - } - //else { - //printf("missed one tick\n"); - //} + uint64_t now = gettimeofday_us(); - now = gettimeofday_us(); + auto beg = buckets.begin(); - while (now >= next_tick) { - turn_wheel(); - next_tick += resolution; + if (beg == buckets.end()) { + // nothing to wait for - sleep the fixed maximum allowed + reqs_cond.wait_for_to(max_sleep_time / 1000); + continue; } - process_events(); - } -} - - - -void _wheeltimer::update_wheel(int wheel) -{ - // do not try do update wheel 0 - if(!wheel) - return; - - for(;wheel;wheel--){ - - int pos = (wall_clock >> (wheel*BITS_PER_WHEEL)) - & ((1<first; + + if (next > now) { + uint64_t diff = next - now; + // don't bother sleeping more than half a millisecond + if (diff > min_sleep_time) { + // Sleep up to diff ms OR up to the allowed maximum if it's longer + // but wake up early if something is added to reqs_backlog + if (diff < max_sleep_time) + reqs_cond.wait_for_to(diff / 1000); + else + reqs_cond.wait_for_to(max_sleep_time / 1000); + continue; + } } -} -void _wheeltimer::turn_wheel() -{ - u_int32_t mask = ((1<second); - //increment time - wall_clock++; - - // Update existing timer entries - update_wheel(i); - - //check for expired timer to process - process_current_timers(); + // all done, remove bucket + buckets.erase(beg); + } } void _wheeltimer::process_events() @@ -174,22 +134,16 @@ void _wheeltimer::process_events() delete_timer(rq.t); } } - - //check for expired timer to process in case we just added some - process_current_timers(); } -void _wheeltimer::process_current_timers() +void _wheeltimer::process_current_timers(timer_list& list) { - auto& w = wheels[0][wall_clock & 0xFF]; - - while (!w.empty()) { - auto* t = w.front(); - w.pop_front(); + while (!list.empty()) { + auto* t = list.front(); + list.pop_front(); t->list = NULL; t->disarm(); - num_timers--; t->fire(); } @@ -197,45 +151,27 @@ void _wheeltimer::process_current_timers() void _wheeltimer::place_timer(timer* t) { - if (t->arm()) - num_timers++; - - if(t->get_absolute_expiry() < gettimeofday_us()) { - - // we put the late ones at the beginning of next wheel turn - add_timer_to_wheel(t,0,((1<get_absolute_expiry() / resolution; - unsigned int clock_mask = expiry_ticks ^ wall_clock; + t->arm(); - for(; wheel; wheel--){ + uint64_t exp = t->get_absolute_expiry(); - if( (clock_mask >> (wheel*BITS_PER_WHEEL)) - & ((1<> (wheel*BITS_PER_WHEEL)) & ((1<list = &wheels[wheel][pos]; - t->list->push_front(t); - t->pos = wheels[wheel][pos].begin(); + auto& b = buckets[bucket]; + t->list = &b; + b.push_front(t); + t->pos = b.begin(); } void _wheeltimer::delete_timer(timer* t) diff --git a/core/sip/wheeltimer.h b/core/sip/wheeltimer.h index b29f7ee4..4447a1ab 100644 --- a/core/sip/wheeltimer.h +++ b/core/sip/wheeltimer.h @@ -37,18 +37,14 @@ #include #include #include +#include -#include "atomic_types.h" - -#define BITS_PER_WHEEL 8 -#define ELMTS_PER_WHEEL (1 << BITS_PER_WHEEL) - -// do not change -#define WHEELS 4 +#include "log.h" class timer; typedef std::list timer_list; +typedef std::map timer_buckets; class timer { @@ -116,41 +112,43 @@ class _wheeltimer: {} }; - //the timer wheel - timer_list wheels[WHEELS][ELMTS_PER_WHEEL]; - unsigned int num_timers; + timer_buckets buckets; uint64_t resolution; // microseconds + // Only go to sleep if the next timer to run is at least this far in the future. + // If the required sleep time is less long than this, don't bother going to sleep, + // and just run the timers now. In microseconds. + const uint64_t min_sleep_time = 500; // half a millisecond + + // Don't sleep longer than this, even if the next timer to run is further in the + // future (or if no timers exist). Needed not to miss the shutdown flag being set. + const uint64_t max_sleep_time = 500000; // half a second + // request backlog lock (insert/remove) AmMutex reqs_m; AmCondition reqs_cond; // to wake up worker thread when a request is added std::deque reqs_backlog; - u_int32_t wall_clock; // 32 bits, "resolution" based ticks starting from epoch - - void turn_wheel(); void process_events(); - void update_wheel(int wheel); void place_timer(timer* t); - void place_timer(timer* t, int wheel); - void add_timer_to_wheel(timer* t, int wheel, unsigned int pos); + void add_timer_to_bucket(timer* t, uint64_t); void delete_timer(timer* t); - void process_current_timers(); + void process_current_timers(timer_list&); protected: void run(); void on_stop(){} _wheeltimer() - : resolution(20000), // 20 ms == 20000 us - wall_clock(gettimeofday_us() / resolution) {} + : resolution(20000) // 20 ms == 20000 us + {} _wheeltimer(uint64_t _resolution) - : resolution(_resolution), - wall_clock(gettimeofday_us() / resolution) {} + : resolution(_resolution) + {} public: //clock reference @@ -163,9 +161,9 @@ public: _uc unix_clock; // for debugging/logging only! - u_int32_t get_wall_clock() + uint64_t get_wall_clock() { - return wall_clock; + return gettimeofday_us(); } void insert_timer(timer* t);