diff --git a/service/message_loop.cc b/service/message_loop.cc index 8856cce4..449eac35 100644 --- a/service/message_loop.cc +++ b/service/message_loop.cc @@ -42,9 +42,11 @@ MessageLoop(int numThreads, double maxAddedLatency, int epollTimeout) : sourceActions_([&] () { handleSourceActions(); }), numThreadsCreated(0), shutdown_(true), - totalSleepTime_(0.0) + totalSleepTime_(0.0), + timerSource_(new TimerEventSource()) { init(numThreads, maxAddedLatency, epollTimeout); + addSource("timer_", timerSource_); } MessageLoop:: diff --git a/service/message_loop.h b/service/message_loop.h index a16a45ae..91bc2a54 100644 --- a/service/message_loop.h +++ b/service/message_loop.h @@ -7,16 +7,19 @@ #pragma once -#include #include +#include +#include +#include #include "jml/arch/wakeup_fd.h" #include "jml/arch/spinlock.h" #include "epoller.h" #include "async_event_source.h" -#include "typed_message_channel.h" #include "logs.h" +#include "timer_event_source.h" +#include "typed_message_channel.h" namespace Datacratic { @@ -32,9 +35,10 @@ struct MessageLoopLogs static Logging::Category trace; }; -/*****************************************************************************/ -/* MESSAGE LOOP */ -/*****************************************************************************/ + +/****************************************************************************/ +/* MESSAGE LOOP */ +/****************************************************************************/ struct MessageLoop : public Epoller { typedef std::function OnStop; @@ -90,7 +94,21 @@ struct MessageLoop : public Epoller { double timePeriodSeconds, std::function toRun, int priority = 0); - + + /* Add a timer. Returns an id that is guaranteed to be > 0, which enables + * "0" to be used as a test value. */ + uint64_t addTimer(double delay, const TimerEventSource::OnTick & onTick) + { + return timerSource_->addTimer(delay, onTick); + } + + /* Cancel a timer. Return "true" when the corresponding timer could be + found and deleted and "false" otherwise. */ + bool cancelTimer(uint64_t timerId) + { + return timerSource_->cancelTimer(timerId); + } + typedef std::function SubordinateThreadFn; @@ -214,6 +232,8 @@ struct MessageLoop : public Epoller { void processAddSource(const SourceEntry & entry); void processRemoveSource(const SourceEntry & entry); void processRunAction(const SourceEntry & entry); + + std::shared_ptr timerSource_; }; } // namespace Datacratic diff --git a/service/service.mk b/service/service.mk index e40a682c..10c993a6 100644 --- a/service/service.mk +++ b/service/service.mk @@ -53,6 +53,7 @@ LIBSERVICES_SOURCES := \ async_event_source.cc \ async_writer_source.cc \ tcp_client.cc \ + timer_event_source.cc \ rest_service_endpoint.cc \ http_named_endpoint.cc \ rest_proxy.cc \ diff --git a/service/testing/service_testing.mk b/service/testing/service_testing.mk index 05fbe6b1..df2cb7c1 100644 --- a/service/testing/service_testing.mk +++ b/service/testing/service_testing.mk @@ -35,6 +35,8 @@ $(eval $(call test,http_rest_proxy_stress_test,services,boost manual)) $(eval $(call test,service_proxies_test,endpoint,boost manual)) $(eval $(call test,message_loop_test,services,boost)) +$(eval $(call test,timer_event_source_test,services,boost)) +$(eval $(call test,timer_bench,services,boost manual)) $(eval $(call program,runner_test_helper,utils)) $(eval $(call test,runner_test,services,boost)) diff --git a/service/testing/timer_bench.cc b/service/testing/timer_bench.cc new file mode 100644 index 00000000..905869df --- /dev/null +++ b/service/testing/timer_bench.cc @@ -0,0 +1,4 @@ +int main() +{ + return 0; +} diff --git a/service/testing/timer_event_source_test.cc b/service/testing/timer_event_source_test.cc new file mode 100644 index 00000000..d75da28f --- /dev/null +++ b/service/testing/timer_event_source_test.cc @@ -0,0 +1,75 @@ +#define BOOST_TEST_MAIN +#define BOOST_TEST_DYN_LINK + +#include +#include + +#include + +#include "jml/arch/timers.h" +#include "jml/utils/testing/watchdog.h" +#include "soa/types/date.h" +#include "soa/service/message_loop.h" +#include "soa/service/timer_event_source.h" + +using namespace std; +using namespace Datacratic; + + +BOOST_AUTO_TEST_CASE( test_addTimer ) +{ + ML::Watchdog wd(10); + std::atomic ticks(0); + MessageLoop loop(1, 0, -1); + loop.start(); + auto timer = make_shared(); + loop.addSource("timer", timer); + timer->waitConnectionState(AsyncEventSource::CONNECTED); + + auto onTick = [&] (uint64_t) { + Date now = Date::now(); + ticks++; + return ticks < 3; + }; + timer->addTimer(0.2, onTick); + + while (true) { + if (ticks == 3) { + Date now = Date::now(); + break; + } + ML::sleep(1); + } + + loop.shutdown(); +} + + +BOOST_AUTO_TEST_CASE( test_cancelTimer ) +{ + ML::Watchdog wd(10); + std::atomic ticks(0); + MessageLoop loop(1, 0, -1); + loop.start(); + auto timer = make_shared(); + loop.addSource("timer", timer); + timer->waitConnectionState(AsyncEventSource::CONNECTED); + + auto onTick = [&] (uint64_t) { + Date now = Date::now(); + ticks++; + return true; + }; + auto timerId = timer->addTimer(0.2, onTick); + BOOST_CHECK(timerId > 0); + + ML::sleep(1); + BOOST_CHECK_EQUAL(timer->cancelTimer(timerId), true); + BOOST_CHECK_NE(ticks, 0); + int oldTicks = ticks; + ML::sleep(1); + BOOST_CHECK_EQUAL(ticks, oldTicks); + BOOST_CHECK_EQUAL(timer->cancelTimer(timerId), false); + + loop.shutdown(); +} diff --git a/service/timer_event_source.cc b/service/timer_event_source.cc new file mode 100644 index 00000000..25b5b71d --- /dev/null +++ b/service/timer_event_source.cc @@ -0,0 +1,193 @@ +/* timer_event_source.cc + Wolfgang Sourdeau, August 2015 + Copyright (c) 2015 Datacratic. All rights reserved. +*/ + +#include +#include + +#include "jml/utils/exc_assert.h" +#include "timer_event_source.h" + +using namespace std; +using namespace Datacratic; + + +/****************************************************************************/ +/* TIMER EVENT SOURCE */ +/****************************************************************************/ + +TimerEventSource:: +TimerEventSource() + : timerFd_(::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)), + counter_(1), + nextTick_(Date::negativeInfinity()) +{ + if (timerFd_ == -1) { + throw ML::Exception(errno, "timerfd_create"); + } +} + +TimerEventSource:: +~TimerEventSource() +{ + int res = ::close(timerFd_); + if (res == -1) { + cerr << "warning: close on timerfd: " << strerror(errno) << endl; + } +} + +int +TimerEventSource:: +selectFd() + const +{ + return timerFd_; +} + +bool +TimerEventSource:: +processOne() +{ +start: + uint64_t numWakeups = 0; + int res = ::read(timerFd_, &numWakeups, 8); + if (res == -1) { + if (errno == EINTR) { + goto start; + } + else if (errno == EAGAIN || errno == EWOULDBLOCK) { + return false; + } + else { + throw ML::Exception(errno, "timerfd read"); + } + } + ExcAssertEqual(res, 8); + onTimerTick(); + + return false; +} + +void +TimerEventSource:: +onTimerTick() +{ + Date now = Date::now(); + vector triggered = collectTriggeredTimers(now); + + for (auto & timer: triggered) { + double interval = timer.lastTick.secondsUntil(now); + uint64_t numTicks = (uint64_t) ::floor(interval / timer.interval); + if (timer.onTick(numTicks)) { + timer.lastTick = now; + timer.nextTick = now.plusSeconds(timer.interval); + insertTimer(move(timer)); + } + } + + adjustNextTick(now); +} + +vector +TimerEventSource:: +collectTriggeredTimers(Date refDate) +{ + vector triggered; + TimersGuard guard(timersLock_); + + size_t nbrTriggered, nbrTimers(timerQueue_.size()); + for (nbrTriggered = 0; nbrTriggered < nbrTimers; nbrTriggered++) { + if (timerQueue_[nbrTriggered].nextTick > refDate) { + break; + } + } + + if (nbrTriggered > 0) { + triggered.reserve(nbrTriggered); + for (size_t i = 0; i < nbrTriggered; i++) { + triggered.emplace_back(move(timerQueue_[i])); + } + timerQueue_.erase(timerQueue_.begin(), timerQueue_.begin() + nbrTriggered); + } + + return triggered; +} + +void +TimerEventSource:: +adjustNextTick(Date now) +{ + TimersGuard guard(timersLock_); + + Date negInfinity = Date::negativeInfinity(); + Date nextTick = ((timerQueue_.size() > 0) + ? timerQueue_[0].nextTick + : negInfinity); + + if (nextTick != nextTick_) { + struct itimerspec spec{{0, 0}, {0, 0}}; + + if (nextTick != negInfinity) { + double delta = nextTick - now; + auto & value = spec.it_value; + value.tv_sec = (time_t) delta; + value.tv_nsec = (delta - spec.it_value.tv_sec) * 1000000000; + } + int res = ::timerfd_settime(timerFd_, 0, &spec, nullptr); + if (res == -1) { + throw ML::Exception(errno, "timerfd_settime"); + } + nextTick_ = nextTick; + } +} + +uint64_t +TimerEventSource:: +addTimer(double delay, const OnTick & onTick) +{ + Date now = Date::now(); + uint64_t timerId = counter_.fetch_add(1); + ExcAssert(timerId != 0); // ensure we never reach the upper limit during a + // program's lifetime + + Timer newTimer{delay, onTick}; + newTimer.nextTick = now.plusSeconds(delay); + newTimer.lastTick = Date::negativeInfinity(); + newTimer.timerId = timerId; + insertTimer(move(newTimer)); + adjustNextTick(now); + + return timerId; +} + +void +TimerEventSource:: +insertTimer(Timer && timer) +{ + TimersGuard guard(timersLock_); + + auto timerCompare = [&] (const Timer & left, + const Timer & right) { + return left.nextTick < right.nextTick; + }; + auto loc = lower_bound(timerQueue_.begin(), timerQueue_.end(), + timer, timerCompare); + timerQueue_.insert(loc, move(timer)); +} + +bool +TimerEventSource:: +cancelTimer(uint64_t timerId) +{ + TimersGuard guard(timersLock_); + + for (auto it = timerQueue_.begin(); it != timerQueue_.end(); it++) { + if (it->timerId == timerId) { + timerQueue_.erase(it); + return true; + } + } + + return false; +} diff --git a/service/timer_event_source.h b/service/timer_event_source.h new file mode 100644 index 00000000..88e38f1c --- /dev/null +++ b/service/timer_event_source.h @@ -0,0 +1,68 @@ +/* timer_event_source.h -*- C++ -*- + Wolfgang Sourdeau, August 2015 + Copyright (c) 2015 Datacratic. All rights reserved. + + A class used internally by MessageLoop to enable multiple timers using the + same timer fd: thereby reducing the number of file descriptors and context + switches required to handle such timers. +*/ + +#pragma once + +#include +#include +#include "soa/types/date.h" +#include "async_event_source.h" + + +namespace Datacratic { + +/****************************************************************************/ +/* TIMER EVENT SOURCE */ +/****************************************************************************/ + +struct TimerEventSource : public AsyncEventSource { + /* Type of callback invoked when a timer tick occurs. The callback should + * return "true" to indicate that the timer must be rescheduled or "false" + * otherwise. */ + typedef std::function OnTick; + + TimerEventSource(); + ~TimerEventSource(); + + virtual int selectFd() const; + virtual bool processOne(); + + /* Adds a timer */ + uint64_t addTimer(double delay, const OnTick & onTick); + + /* Cancel the given timer, returning true when the timer is found */ + bool cancelTimer(uint64_t timerId); + +private: + typedef std::mutex TimersLock; + typedef std::unique_lock TimersGuard; + + /* timers */ + struct Timer { + double interval; + OnTick onTick; + Date nextTick; + Date lastTick; + uint64_t timerId; + }; + + void onTimerTick(); + void insertTimer(Timer && timer); + std::vector collectTriggeredTimers(Date refDate); + void adjustNextTick(Date now); + + int timerFd_; + std::atomic counter_; + + TimersLock timersLock_; + std::vector timerQueue_; + Date nextTick_; +}; + +} // namespace Datacratic