Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
/.cache/
/.vscode/

MODULE.bazel
/MODULE.bazel
/MODULE.bazel.lock
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 0.3.1

* Added threading library:
* Added `PeriodicThread`: Runs a function periodically in its own thread.

# 0.3

* Add llvm/clang which can be triggered with `bazel ... --config=clang`.
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ The C++ library is organized in functional groups each residing in their own dir
* gMock-matcher `IsOkAndHolds`: Tests an absl::StatusOr for absl::OkStatus and contents.
* gMock-Matcher `StatusIs`: Tests an absl::Status or absl::StatusOr against a specific status code and message.
* macro `MBO_ASSERT_OK_AND_ASSIGN`: Simplifies testing with functions that return `absl::StatusOr<T>`.
* Threading
* `namespace mbo::thread`
* mbo/thread:periodic_thread_cc, mbo/thread/periodic_thread.h
* class `PeriodicThread`: Runs a function periodically in its own thread.
* Types
* `namespace mbo::types`
* mbo/types:cases_cc, mbo/types/cases.h
Expand Down
23 changes: 23 additions & 0 deletions mbo/thread/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package(default_visibility = ["//visibility:private"])

cc_library(
name = "periodic_thread_cc",
srcs = ["periodic_thread.cc"],
hdrs = ["periodic_thread.h"],
deps = [
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
],
)


cc_binary(
name = "periodic",
srcs = ["periodic_main.cc"],
deps = [
":periodic_thread_cc",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/time",
],
)
63 changes: 63 additions & 0 deletions mbo/thread/periodic_main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024 M. Boerger (helly25.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <atomic>
#include <cstdint>

#include "absl/strings/str_format.h"
#include "absl/time/time.h"
#include "mbo/thread/periodic_thread.h"

void Test() {
static constexpr uint64_t kMaxCycle = 9'999;
static constexpr absl::Duration kInterval = absl::Milliseconds(100);
absl::Time start;
absl::Duration total_correction;
std::atomic_size_t cycle = 0;
mbo::thread::PeriodicThread periodic_thread({
.interval = kInterval,
.initial_wait = kInterval,
.func =
[&] {
const absl::Time now = absl::Now();
if (cycle == 0) {
start = now;
const std::string time = absl::FormatTime("%Y-%m-%d at %H:%M:%E6S", start, absl::LocalTimeZone());
absl::PrintF("[%04d]: %s %13s %12s %13s %12s\n", cycle, time, "duration", "average", "correction", "avg-corr");
} else {
const std::size_t this_cycle = cycle;
const absl::Duration dur = now - start;
const absl::Duration avg = dur / this_cycle;
const double dur_sec = absl::ToDoubleSeconds(dur);
const double avg_sec = absl::ToDoubleSeconds(avg);
const absl::Duration correction = avg - kInterval;
total_correction += correction;
const std::string time_str = absl::FormatTime("%Y-%m-%d at %H:%M:%E6S", now, absl::LocalTimeZone());
const std::string corr_str = absl::FormatDuration(correction);
const std::string avg_corr_str = absl::FormatDuration(total_correction / this_cycle);
absl::PrintF(
"[%04d]: %s %+13.6f ~%11.9f %13s %12s\n", this_cycle, time_str, dur_sec, avg_sec, corr_str,
avg_corr_str);
}
return ++cycle <= kMaxCycle;
},
});
periodic_thread.Join();
}

int main() {
Test();
absl::PrintF("All done!\n");
return 0;
}
90 changes: 90 additions & 0 deletions mbo/thread/periodic_thread.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2024 M. Boerger (helly25.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "mbo/thread/periodic_thread.h"

#include <atomic>
#include <thread>

#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"

namespace mbo::thread {

PeriodicThread::~PeriodicThread() noexcept {
stop_ = true;
Join();
}

PeriodicThread::PeriodicThread(Options options) noexcept : options_(std::move(options)), thread_([this] { Run(); }) {}

bool PeriodicThread::IsDone() const {
absl::MutexLock lock(&mx_);
return done_;
}

void PeriodicThread::Join() const {
{
absl::MutexLock lock(&mx_);
mx_.Await(absl::Condition(&done_));
}
if (thread_.joinable()) {
thread_.join();
}
}

void PeriodicThread::Run() {
running_ = true;
if (options_.initial_wait > absl::ZeroDuration()) {
absl::SleepFor(options_.initial_wait);
}
std::size_t cycle = 0;
absl::Time begin = absl::Now();
absl::Time start = begin;
absl::Duration adjust = absl::ZeroDuration();
while (!stop_) {
if (!options_.func() || stop_) {
break;
}
++cycle;
const absl::Time end = absl::Now();
const absl::Duration took = end - start + adjust;
absl::Duration sleep = options_.interval >= took ? options_.interval - took : options_.min_interval;
absl::SleepFor(sleep);
start = absl::Now();
// We adjust based on average divergence as well as the most recent runtime.
// This allows for small overcorrections over time as well as handling inconsistent runtime of actual function.
// Adjusting the average too little may result in an inability to keep to the intended interval time. Values in the
// low percentages (1.01..1.1) appear to be working.
// Adjusting the recent run strongly (e.g. >1) only works if callback runtimes are steady. Smaller values are
// better at dealing with inconsisten runtimes, even for very small inconsistencies. Setting the value to 1 appears
// to be a good compromise.
// This is not configurable in the `Options` for now as that prevent using better algorithms later.
static constexpr double kAdjustAverage = 1.05;
static constexpr double kAdjustRecent = 1;
adjust = kAdjustAverage * ((start - begin) / cycle - options_.interval) + kAdjustRecent * ((start - end) - sleep);
static constexpr std::size_t kMaxCycleAdjustWindow = 1000;
if (cycle % kMaxCycleAdjustWindow == 0) {
// We actually reset the cycle to 0 as we otherwise would need to handle cycle overrun in the next window
// `max(size_t) - cycle < kMaxCycleAdjustWindow`. Further we would need to use a modulo operation for cycle when
// computing the adjustment (`cycle % (kMaxCycleAdjustWindow + 1)`).
cycle = 0;
begin = start;
}
}
absl::MutexLock lock(&mx_);
done_ = true;
}

} // namespace mbo::thread
82 changes: 82 additions & 0 deletions mbo/thread/periodic_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2024 M. Boerger (helly25.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <atomic>
#include <thread>

#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"

namespace mbo::thread {

// The `PeriodicThread` runs a function periodically in its own thread.
//
// The implementation uses simple heuristics to adjust to the intended interval over time using a
// sliding window. This allows for inconsistent function run times to eventually lead to closely
// following the interval on average. The implementation may still not be able to respect the
// interval correctly, even over longer periods of time. However, the divergence from the intended
// interval should be very small (high accuracy). Use binary `//mbo/thread:periodic` to determine
// the accuracy for actual machines.
//
// The advantage of 'allowing' the imperfect interval handling is that the management has a low cost
// and thus the most real time can be spent sleeping.
//
// The `PeriodicThread`:
// * Automatically stops if the function returns `false`.
// * Automatically starts on creation, but an `initial_wait` time can be configured.
// * Cannot be restarted.
// * Can be stopped, but may have to sleep for a full interval time before actually stopping.
// * The destructor will wait for the thread to stop if it is running.
// * The behavior is undefined if the function runtime exceeds the interval (or does not allow time
// for interval management).
class PeriodicThread {
public:
struct Options {
absl::Duration interval;
absl::Duration min_interval = absl::Milliseconds(1);
absl::Duration initial_wait = absl::ZeroDuration();
std::function<bool()> func; // Return `true` for continue, `false` for stop.
};

~PeriodicThread() noexcept;
explicit PeriodicThread(Options options) noexcept;

PeriodicThread(const PeriodicThread&) = delete;
PeriodicThread& operator=(const PeriodicThread&) = delete;
PeriodicThread(PeriodicThread&& other) = delete;
PeriodicThread& operator=(PeriodicThread&&) = delete;

void Stop() { stop_ = true; }

bool IsStopping() const { return stop_; }

bool IsRunning() const { return running_; }

bool IsDone() const;

void Join() const;

private:
void Run();

const Options options_;
mutable absl::Mutex mx_;
std::atomic_bool stop_ = false;
std::atomic_bool running_ = false;
bool done_ ABSL_GUARDED_BY(mx_) = false;
mutable std::thread thread_;
};

} // namespace mbo::thread