diff --git a/.gitignore b/.gitignore index 8c85227..f2a67fe 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,5 @@ /.cache/ /.vscode/ -MODULE.bazel +/MODULE.bazel +/MODULE.bazel.lock diff --git a/CHANGELOG.md b/CHANGELOG.md index 286abe9..0aa5d8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 0.3.1 + +* Added threading library: +* Added `PeriodicThread`: Runs a function periodically in its own thread. + # 0.3 * Add llvm/clang which can be triggered with `bazel ... --config=clang`. diff --git a/README.md b/README.md index 3509014..312fac4 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,10 @@ The C++ library is organized in functional groups each residing in their own dir * gMock-matcher `IsOkAndHolds`: Tests an absl::StatusOr for absl::OkStatus and contents. * gMock-Matcher `StatusIs`: Tests an absl::Status or absl::StatusOr against a specific status code and message. * macro `MBO_ASSERT_OK_AND_ASSIGN`: Simplifies testing with functions that return `absl::StatusOr`. +* Threading + * `namespace mbo::thread` + * mbo/thread:periodic_thread_cc, mbo/thread/periodic_thread.h + * class `PeriodicThread`: Runs a function periodically in its own thread. * Types * `namespace mbo::types` * mbo/types:cases_cc, mbo/types/cases.h diff --git a/mbo/thread/BUILD.bazel b/mbo/thread/BUILD.bazel new file mode 100644 index 0000000..e812286 --- /dev/null +++ b/mbo/thread/BUILD.bazel @@ -0,0 +1,23 @@ +package(default_visibility = ["//visibility:private"]) + +cc_library( + name = "periodic_thread_cc", + srcs = ["periodic_thread.cc"], + hdrs = ["periodic_thread.h"], + deps = [ + "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/synchronization", + "@com_google_absl//absl/time", + ], +) + + +cc_binary( + name = "periodic", + srcs = ["periodic_main.cc"], + deps = [ + ":periodic_thread_cc", + "@com_google_absl//absl/strings:str_format", + "@com_google_absl//absl/time", + ], +) \ No newline at end of file diff --git a/mbo/thread/periodic_main.cc b/mbo/thread/periodic_main.cc new file mode 100644 index 0000000..47f42d3 --- /dev/null +++ b/mbo/thread/periodic_main.cc @@ -0,0 +1,63 @@ +// Copyright 2024 M. Boerger (helly25.com) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "absl/strings/str_format.h" +#include "absl/time/time.h" +#include "mbo/thread/periodic_thread.h" + +void Test() { + static constexpr uint64_t kMaxCycle = 9'999; + static constexpr absl::Duration kInterval = absl::Milliseconds(100); + absl::Time start; + absl::Duration total_correction; + std::atomic_size_t cycle = 0; + mbo::thread::PeriodicThread periodic_thread({ + .interval = kInterval, + .initial_wait = kInterval, + .func = + [&] { + const absl::Time now = absl::Now(); + if (cycle == 0) { + start = now; + const std::string time = absl::FormatTime("%Y-%m-%d at %H:%M:%E6S", start, absl::LocalTimeZone()); + absl::PrintF("[%04d]: %s %13s %12s %13s %12s\n", cycle, time, "duration", "average", "correction", "avg-corr"); + } else { + const std::size_t this_cycle = cycle; + const absl::Duration dur = now - start; + const absl::Duration avg = dur / this_cycle; + const double dur_sec = absl::ToDoubleSeconds(dur); + const double avg_sec = absl::ToDoubleSeconds(avg); + const absl::Duration correction = avg - kInterval; + total_correction += correction; + const std::string time_str = absl::FormatTime("%Y-%m-%d at %H:%M:%E6S", now, absl::LocalTimeZone()); + const std::string corr_str = absl::FormatDuration(correction); + const std::string avg_corr_str = absl::FormatDuration(total_correction / this_cycle); + absl::PrintF( + "[%04d]: %s %+13.6f ~%11.9f %13s %12s\n", this_cycle, time_str, dur_sec, avg_sec, corr_str, + avg_corr_str); + } + return ++cycle <= kMaxCycle; + }, + }); + periodic_thread.Join(); +} + +int main() { + Test(); + absl::PrintF("All done!\n"); + return 0; +} diff --git a/mbo/thread/periodic_thread.cc b/mbo/thread/periodic_thread.cc new file mode 100644 index 0000000..42d8d1d --- /dev/null +++ b/mbo/thread/periodic_thread.cc @@ -0,0 +1,90 @@ +// Copyright 2024 M. Boerger (helly25.com) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "mbo/thread/periodic_thread.h" + +#include +#include + +#include "absl/synchronization/mutex.h" +#include "absl/time/time.h" + +namespace mbo::thread { + +PeriodicThread::~PeriodicThread() noexcept { + stop_ = true; + Join(); +} + +PeriodicThread::PeriodicThread(Options options) noexcept : options_(std::move(options)), thread_([this] { Run(); }) {} + +bool PeriodicThread::IsDone() const { + absl::MutexLock lock(&mx_); + return done_; +} + +void PeriodicThread::Join() const { + { + absl::MutexLock lock(&mx_); + mx_.Await(absl::Condition(&done_)); + } + if (thread_.joinable()) { + thread_.join(); + } +} + +void PeriodicThread::Run() { + running_ = true; + if (options_.initial_wait > absl::ZeroDuration()) { + absl::SleepFor(options_.initial_wait); + } + std::size_t cycle = 0; + absl::Time begin = absl::Now(); + absl::Time start = begin; + absl::Duration adjust = absl::ZeroDuration(); + while (!stop_) { + if (!options_.func() || stop_) { + break; + } + ++cycle; + const absl::Time end = absl::Now(); + const absl::Duration took = end - start + adjust; + absl::Duration sleep = options_.interval >= took ? options_.interval - took : options_.min_interval; + absl::SleepFor(sleep); + start = absl::Now(); + // We adjust based on average divergence as well as the most recent runtime. + // This allows for small overcorrections over time as well as handling inconsistent runtime of actual function. + // Adjusting the average too little may result in an inability to keep to the intended interval time. Values in the + // low percentages (1.01..1.1) appear to be working. + // Adjusting the recent run strongly (e.g. >1) only works if callback runtimes are steady. Smaller values are + // better at dealing with inconsisten runtimes, even for very small inconsistencies. Setting the value to 1 appears + // to be a good compromise. + // This is not configurable in the `Options` for now as that prevent using better algorithms later. + static constexpr double kAdjustAverage = 1.05; + static constexpr double kAdjustRecent = 1; + adjust = kAdjustAverage * ((start - begin) / cycle - options_.interval) + kAdjustRecent * ((start - end) - sleep); + static constexpr std::size_t kMaxCycleAdjustWindow = 1000; + if (cycle % kMaxCycleAdjustWindow == 0) { + // We actually reset the cycle to 0 as we otherwise would need to handle cycle overrun in the next window + // `max(size_t) - cycle < kMaxCycleAdjustWindow`. Further we would need to use a modulo operation for cycle when + // computing the adjustment (`cycle % (kMaxCycleAdjustWindow + 1)`). + cycle = 0; + begin = start; + } + } + absl::MutexLock lock(&mx_); + done_ = true; +} + +} // namespace mbo::thread diff --git a/mbo/thread/periodic_thread.h b/mbo/thread/periodic_thread.h new file mode 100644 index 0000000..2eef1f0 --- /dev/null +++ b/mbo/thread/periodic_thread.h @@ -0,0 +1,82 @@ +// Copyright 2024 M. Boerger (helly25.com) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/synchronization/mutex.h" +#include "absl/time/time.h" + +namespace mbo::thread { + +// The `PeriodicThread` runs a function periodically in its own thread. +// +// The implementation uses simple heuristics to adjust to the intended interval over time using a +// sliding window. This allows for inconsistent function run times to eventually lead to closely +// following the interval on average. The implementation may still not be able to respect the +// interval correctly, even over longer periods of time. However, the divergence from the intended +// interval should be very small (high accuracy). Use binary `//mbo/thread:periodic` to determine +// the accuracy for actual machines. +// +// The advantage of 'allowing' the imperfect interval handling is that the management has a low cost +// and thus the most real time can be spent sleeping. +// +// The `PeriodicThread`: +// * Automatically stops if the function returns `false`. +// * Automatically starts on creation, but an `initial_wait` time can be configured. +// * Cannot be restarted. +// * Can be stopped, but may have to sleep for a full interval time before actually stopping. +// * The destructor will wait for the thread to stop if it is running. +// * The behavior is undefined if the function runtime exceeds the interval (or does not allow time +// for interval management). +class PeriodicThread { + public: + struct Options { + absl::Duration interval; + absl::Duration min_interval = absl::Milliseconds(1); + absl::Duration initial_wait = absl::ZeroDuration(); + std::function func; // Return `true` for continue, `false` for stop. + }; + + ~PeriodicThread() noexcept; + explicit PeriodicThread(Options options) noexcept; + + PeriodicThread(const PeriodicThread&) = delete; + PeriodicThread& operator=(const PeriodicThread&) = delete; + PeriodicThread(PeriodicThread&& other) = delete; + PeriodicThread& operator=(PeriodicThread&&) = delete; + + void Stop() { stop_ = true; } + + bool IsStopping() const { return stop_; } + + bool IsRunning() const { return running_; } + + bool IsDone() const; + + void Join() const; + + private: + void Run(); + + const Options options_; + mutable absl::Mutex mx_; + std::atomic_bool stop_ = false; + std::atomic_bool running_ = false; + bool done_ ABSL_GUARDED_BY(mx_) = false; + mutable std::thread thread_; +}; + +} // namespace mbo::thread \ No newline at end of file