Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
cmake -S . -B build
-G "${{ matrix.generator }}"
-DCMAKE_CXX_COMPILER=${{ env.CXX }}
-DENABLE_BM=ON
shell: bash

- name: Build
Expand All @@ -42,6 +43,10 @@ jobs:
- name: Run tests
run: ctest --test-dir build --output-on-failure --build-config Release

- name: Run benchmarks
run: >
./build/benchmarks/benchmark_pubsub --benchmark_counters_tabular=true

windows:
name: Windows Verification
runs-on: windows-latest
Expand Down Expand Up @@ -80,13 +85,18 @@ jobs:
-G "${{ matrix.generator }}"
-DCMAKE_CXX_COMPILER="${{ matrix.compiler }}"
-DCMAKE_TOOLCHAIN_FILE="${{ env.CMAKE_TOOLCHAIN_FILE }}"
-DENABLE_BM=ON
shell: bash

- name: Build
run: cmake --build build --config Release

- name: Run tests
run: ctest --test-dir build --output-on-failure --build-config Release

- name: Run benchmarks
run: >
./build/benchmarks/Release/benchmark_pubsub.exe --benchmark_counters_tabular=true
macos:
name: macOS Verification
runs-on: macos-latest
Expand Down Expand Up @@ -115,10 +125,15 @@ jobs:
cmake -S . -B build
-G "${{ matrix.generator }}"
-DCMAKE_CXX_COMPILER=${{ env.CXX }}
-DENABLE_BM=ON
shell: bash

- name: Build
run: cmake --build build --config Release

- name: Run tests
run: ctest --test-dir build --output-on-failure --build-config Release
run: ctest --test-dir build --output-on-failure --build-config Release

- name: Run benchmarks
run: >
./build/benchmarks/benchmark_pubsub --benchmark_counters_tabular=true
56 changes: 37 additions & 19 deletions benchmarks/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
#include <vector>
#include <unordered_map>
#include <cstdint>
#include <chrono>

#include "pubsub.h"

#ifdef WITH_OMP
#include <omp.h>
#ifdef __linux__
#include <sys/types.h>
#include <sys/sysinfo.h>
#include <unistd.h>
#elif defined(_WIN32)
#include <windows.h>
#include <psapi.h>
#endif

// ========== Event Definition ==========
Expand Down Expand Up @@ -37,23 +43,37 @@ std::unique_ptr<pubsub::Publisher> create_publisher_with_heavy_subs(int num_subs
std::vector<int> subscriber_counts = {1, 10, 100, 500, 1000};
std::unordered_map<int, std::unique_ptr<pubsub::Publisher>> heavy_publishers;

// ========== Benchmark Macro ==========
#define DEFINE_HEAVY_EMIT_BENCH(name, emit_method) \
static void name(benchmark::State& state) { \
int subs = state.range(0); \
auto it = heavy_publishers.find(subs); \
if (it == heavy_publishers.end()) state.SkipWithError("Missing pub"); \
auto& pub = it->second; \
for (auto _ : state) { \
pub->emit_method; \
} \
} \
BENCHMARK(name)->Args({1})->Args({10})->Args({100})->Args({500})->Args({1000});
// ========== Updated Benchmark Macro with Memory + Time Metrics ==========
#define DEFINE_HEAVY_EMIT_BENCH(name, emit_method) \
static void name(benchmark::State& state) { \
std::unordered_map<int, std::unique_ptr<pubsub::Publisher>>::iterator it; \
int subs = state.range(0); \
it = heavy_publishers.find(subs); \
if (it == heavy_publishers.end()) state.SkipWithError("Missing pub"); \
auto& pub = it->second; \
for (auto _ : state) { \
auto start_time = std::chrono::high_resolution_clock::now(); \
benchmark::DoNotOptimize(pub->emit_method); \
auto end_time = std::chrono::high_resolution_clock::now(); \
auto duration = end_time - start_time; \
state.counters["time_per_sub_ns"] = \
std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count() / static_cast<double>(subs); \
state.counters["subs_per_sec"] = \
benchmark::Counter(subs, benchmark::Counter::kIsRate); \
} \
state.SetComplexityN(state.range(0)); \
} \
BENCHMARK(name)->MeasureProcessCPUTime() \
->UseRealTime() \
->Repetitions(10) \
->Args({1})->Args({10})->Args({100})->Args({500})->Args({1000}) \
->Complexity(benchmark::oN) \
;

// ========== Emit Variants ==========
DEFINE_HEAVY_EMIT_BENCH(BM_PubSub_Emit_Sync, emit<MyEvent>(42))

DEFINE_HEAVY_EMIT_BENCH(BM_PubSub_Emit_StdAsync, emit_thread_async<MyEvent>(42))
DEFINE_HEAVY_EMIT_BENCH(BM_PubSub_Emit_StdAsync_NoWait, emit_thread_async<MyEvent>(42))

#if defined(__cpp_lib_execution)
DEFINE_HEAVY_EMIT_BENCH(BM_PubSub_Emit_StdExec_seq, emit_async<MyEvent>(std::execution::seq, 42))
Expand All @@ -62,10 +82,6 @@ DEFINE_HEAVY_EMIT_BENCH(BM_PubSub_Emit_StdExec_par_unseq, emit_async<MyEvent>(st
DEFINE_HEAVY_EMIT_BENCH(BM_PubSub_Emit_StdExec_unseq, emit_async<MyEvent>(std::execution::unseq, 42))
#endif

#ifdef WITH_OMP
DEFINE_HEAVY_EMIT_BENCH(BM_PubSub_Emit_OpenMP, emit_omp_async<MyEvent>(42))
#endif

#ifdef WITH_TBB
DEFINE_HEAVY_EMIT_BENCH(BM_PubSub_Emit_TBB, emit_tbb_async<MyEvent>(42))
#endif
Expand All @@ -77,6 +93,8 @@ int main(int argc, char** argv) {
}

benchmark::Initialize(&argc, argv);
if (benchmark::ReportUnrecognizedArguments(argc, argv)) return 1;

benchmark::RunSpecifiedBenchmarks();
return 0;
}
88 changes: 45 additions & 43 deletions include/pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@
#include <execution>
#include <algorithm>

#ifdef WITH_OMP
#include <omp.h>
#endif

#ifdef WITH_TBB
#include <tbb/parallel_for_each.h>
#endif
Expand Down Expand Up @@ -154,10 +150,17 @@ namespace pubsub {
* @brief Emit an event synchronously.
*/
template<typename... Args>
void emit(Args... args) {
[[nodiscard]] bool emit(Args... args) {
bool success = true;
for (const auto& cb : callbacks) {
cb(args...);
try {
cb(args...);
}
catch(...) {
success = false;
}
}
return success;
}

/**
Expand All @@ -167,34 +170,32 @@ namespace pubsub {
* a queue and calls them when a thread in the pool is free
*/
template<typename... Args>
void emit_thread_async(Args... args) {
for (const auto& cb : callbacks) {
(void)std::async(std::launch::async, cb, args...);
}
}

#ifdef WITH_OMP
/**
* @brief Emit an event asynchronously using OpenMP.
*/
template<typename... Args>
void emit_omp_async(Args... args) {
#pragma omp parallel for
[[nodiscard]] bool emit_thread_async(Args... args) {
bool success = true;
std::vector<std::future<void>> futures;
futures.reserve(callbacks.size());
for (const auto& cb : callbacks) {
cb(args...);
futures.emplace_back(std::async(std::launch::async, cb, args...));
}
return success;
}
#endif

#ifdef WITH_TBB
/**
* @brief Emit an event asynchronously using oneTBB.
*/
template<typename... Args>
void emit_tbb_async(Args... args) {
[[nodiscard]] bool emit_tbb_async(Args... args) {
bool success = true;
tbb::parallel_for_each(callbacks.begin(), callbacks.end(), [&](const auto& cb) {
cb(args...);
try {
cb(args...);
}
catch(...) {
success = false;
}
});
return success;
}
#endif

Expand All @@ -203,12 +204,19 @@ namespace pubsub {
* @brief Emit an event asynchronously using <execution>.
*/
template<typename ExecutionPolicy, typename... Args, typename = std::is_execution_policy<ExecutionPolicy>>
void emit_async(ExecutionPolicy policy, Args... args) {
[[nodiscard]] bool emit_async(ExecutionPolicy policy, Args... args) {
bool success = true;
std::for_each(policy, callbacks.begin(), callbacks.end(),
[&](const auto& cb) {
cb(args...);
try {
cb(args...);
}
catch(...) {
success = false;
}
}
);
return success;
}
#endif
};
Expand Down Expand Up @@ -258,46 +266,40 @@ namespace pubsub {
* @brief Emit an event synchronously to all listeners.
*/
template<auto Event, typename... Args> requires IsEvent<decltype(Event)>
void emit(Args... args) {
get_handler<Event>()->emit(args...);
bool emit(Args... args) {
return get_handler<Event>()->emit(args...);
}

/**
* @brief Emit an event asynchronously to all listeners.
*/
template<auto Event, typename... Args> requires IsEvent<decltype(Event)>
void emit_thread_async(Args... args) {
get_handler<Event>()->emit_thread_async(args...);
bool emit_thread_async(Args... args) {
return get_handler<Event>()->emit_thread_async(args...);
}

#ifdef WITH_OMP
/**
* @brief Emit an event asynchronously using OpenMP to all listeners.
*/
template<auto Event, typename... Args> requires IsEvent<decltype(Event)>
void emit_omp_async(Args... args) {
get_handler<Event>()->emit_omp_async(args...);
}
#endif

#ifdef WITH_TBB
/**
* @brief Emit an event asynchronously using oneTBB to all listeners.
*/
template<auto Event, typename... Args> requires IsEvent<decltype(Event)>
void emit_tbb_async(Args... args) {
get_handler<Event>()->emit_tbb_async(args...);
bool emit_tbb_async(Args... args) {
return get_handler<Event>()->emit_tbb_async(args...);
}
#else
#warning TBB not available
#endif

#if defined(__cpp_lib_execution)
/**
* @brief Emit an event asynchronously to all listeners.
*/
template<auto Event, typename... Args> requires IsEvent<decltype(Event)>
void emit_async(Args... args) {
get_handler<Event>()->emit_async(args...);
bool emit_async(Args... args) {
return get_handler<Event>()->emit_async(args...);
}
#else
#warning std::execution not available
#endif

/**
Expand Down
Loading