From 87488c4ef19380c25302619e68060be27298c038 Mon Sep 17 00:00:00 2001 From: Bryan Wong Date: Sun, 2 Jun 2024 21:37:28 +0900 Subject: [PATCH 1/8] can't figure out dependency tree --- src/experimental/makefile | 63 +++ src/experimental/private/main.cpp | 7 + src/experimental/public/latch.h | 75 +++ src/experimental/public/pooled_thread.h | 0 src/experimental/public/semaphore.h | 106 ++++ src/experimental/public/thread_pool.h | 505 ++++++++++++++++++ src/utl/public/utl/memory/utl_intrusive_ptr.h | 1 - utl.sublime-project | 28 +- 8 files changed, 782 insertions(+), 3 deletions(-) create mode 100644 src/experimental/makefile create mode 100644 src/experimental/private/main.cpp create mode 100644 src/experimental/public/latch.h create mode 100644 src/experimental/public/pooled_thread.h create mode 100644 src/experimental/public/semaphore.h create mode 100644 src/experimental/public/thread_pool.h diff --git a/src/experimental/makefile b/src/experimental/makefile new file mode 100644 index 00000000..6b601766 --- /dev/null +++ b/src/experimental/makefile @@ -0,0 +1,63 @@ +MKFILE_PATH := $(abspath $(lastword $(MAKEFILE_LIST))) +MKFILE_DIR := $(patsubst %/,%,$(dir $(MKFILE_PATH))) +PROJECT_ROOT := $(shell git rev-parse --show-toplevel) +MODULE_ROOT := $(MKFILE_DIR) +PRIVATE_DIR := $(MODULE_ROOT)/private +PUBLIC_DIR := $(MODULE_ROOT)/public +CONFIG_DIR := $(PROJECT_ROOT)/src/utl/public/utl/preprocessor + +BIN = main +OUTPUT_DIR := $(MODULE_ROOT)/build +INTERMEDIATE_DIR := $(OUTPUT_DIR)/obj +MODULE_SRCS := $(shell find $(PRIVATE_DIR) $(PUBLIC_DIR) -name '*.cpp') +MODULE_INCLUDES := $(shell find $(PRIVATE_DIR) $(PUBLIC_DIR) -name '*.h') + +.PHONY = all clean print preprocess compile +CXX := c++ +CXX_FLAGS := -std=c++17 -fPIC -O1 -I$(PUBLIC_DIR) -I$(PRIVATE_DIR) -I$(CONFIG_DIR) +LINKER_FLAGS := -lm +OBJECTS := $(addsuffix .o, $(basename $(MODULE_SRCS:$(MODULE_ROOT)/%=%))) +OBJECT_PATHS := $(addprefix $(INTERMEDIATE_DIR)/,$(OBJECTS)) +DEPENDENCIES := $(OBJECTS:.o=.d) +PREPROCESSED := $(OBJECTS:.o=.prep.cpp) + +all: $(BIN) + +$(BIN): $(OBJECT_PATHS) + @echo "Linking $(OBJECTS)" + @$(CXX) $(LINKER_FLAGS) $(OBJECT_PATHS) -o $(OUTPUT_DIR)/$(BIN) + +compile: $(OBJECTS) + @ + +$(OBJECTS):%.o: $(MODULE_ROOT)/%.cpp + @mkdir -p '$(INTERMEDIATE_DIR)/$(@D)' + @echo "Creating object" $@ + @$(CXX) $(CXX_FLAGS) -MMD -MP -MF '$(INTERMEDIATE_DIR)/$(patsubst %.o,%.d,$@)' -MT '$(INTERMEDIATE_DIR)/$@' -c $< -o '$(INTERMEDIATE_DIR)/$@' + +-include $(INTERMEDIATE_DIR)/$(DEPENDENCIES) + +preprocess: $(PREPROCESSED) + @ + +$(PREPROCESSED):%.prep.cpp: $(MODULE_ROOT)/%.cpp + @mkdir -p '$(INTERMEDIATE_DIR)/$(@D)' + @echo "Running preprocessor" $@ + @$(CXX) $(CXX_FLAGS) -MMD -MP -MF '$(INTERMEDIATE_DIR)/$(patsubst %.prep.cpp,%.d,$@)' -MT '$(INTERMEDIATE_DIR)/$@' -E $< -o '$(INTERMEDIATE_DIR)/$@' + +clean: + @echo "Cleaning up..." + @rm -rvf $(INTERMEDIATE_DIR)/**/*.o $(INTERMEDIATE_DIR)/**/*.d $(INTERMEDIATE_DIR)/**/*.prep.cpp $(OUTPUT_DIR)/$(BIN) + +print: + @echo "Bin: $(BIN)\n" + @echo "Makefile Directory: $(MKFILE_DIR)\n" + @echo "Sources: $(MODULE_SRCS)\n" + @echo "Objects: $(OBJECTS)\n" + @echo "Preprocessed: $(PREPROCESSED)\n" + @echo "Includes: $(MODULE_INCLUDES)\n" + @echo "Dependencies: $(DEPENDENCIES)\n" + @echo "CXX Flags: $(CXX_FLAGS)\n" + @echo "Linker Flags: $(LINKER_FLAGS)\n" + @echo "Output Directory: $(OUTPUT_DIR)\n" + @echo "Intermediate Directory: $(INTERMEDIATE_DIR)\n" diff --git a/src/experimental/private/main.cpp b/src/experimental/private/main.cpp new file mode 100644 index 00000000..2b53b3ef --- /dev/null +++ b/src/experimental/private/main.cpp @@ -0,0 +1,7 @@ + +#include + +int main(int argc, char** argv) { + puts("hello world"); + return 0; +} diff --git a/src/experimental/public/latch.h b/src/experimental/public/latch.h new file mode 100644 index 00000000..7824421c --- /dev/null +++ b/src/experimental/public/latch.h @@ -0,0 +1,75 @@ +#pragma once +#include "gcc/atomics.h" +#include "posix/futex.h" +#include "utl_config.h" + +#include + +namespace utl::experimental { + +class latch { + static [[noreturn]] void throw_system_error(posix::result r) { + char error_msg[1024]; + sprintf(error_msg, "Internal latch error: %s", posix::to_string(r)); + throw std::system_error(error_msg); + } + +public: + explicit latch(int32_t n) : remaining_(n) {} + latch(latch const&) = delete; + latch& operator=(latch const&) = delete; + + template + bool wait_for(std::chrono::duration d) { + int32_t val = gcc::atomic_load(remaining_, std::memory_order_relaxed); + while (val > 0) { + auto const begin = std::chrono::high_resolution_clock::now(); + auto const r = posix::futex_wait(remaining_, &val, d); + if (r == posix::result::success || r == posix::result::timeout) { + return r == posix::result::success; + } + + if (r == posix::result::interrupted) { + using clock_duration = decltype(std::chrono::high_resolution_clock::now() - begin); + auto const min_val = std::min>( + (std::chrono::high_resolution_clock::now() - begin), d); + + d -= min_val; + continue; + } + + throw_system_error(r); + } + } + + void wait() { + int32_t val = gcc::atomic_load(remaining_, std::memory_order_relaxed); + while (val > 0) { + auto const r = posix::futex_wait(remaining_, &val); + if (r == posix::result::success) { + return; + } + if (r == posix::result::interrupted) { + continue; + } + + throw_system_error(r); + } + } + + void count_down(uint32_t n = 1) { + int32_t val = gcc::atomic_fetch_sub(remaining_, n, std::memory_order_acq_rel); + if (val <= n) { + posix::futex_notify_all(remaining_); + } + } + + bool try_wait() const noexcept { + return gcc::atomic_load(&remaining_, std::memory_order_relaxed) <= 0; + } + +private: + uint32_t remaining_; +}; + +} // namespace utl::experimental diff --git a/src/experimental/public/pooled_thread.h b/src/experimental/public/pooled_thread.h new file mode 100644 index 00000000..e69de29b diff --git a/src/experimental/public/semaphore.h b/src/experimental/public/semaphore.h new file mode 100644 index 00000000..a6546f81 --- /dev/null +++ b/src/experimental/public/semaphore.h @@ -0,0 +1,106 @@ +#pragma once +#include "gcc/atomics.h" +#include "posix/futex.h" +#include "utl_config.h" + +#include + +namespace utl::experimental { + +class semaphore { + static [[noreturn]] void throw_system_error(posix::result r) { + char error_msg[1024]; + sprintf(error_msg, "Internal semaphore error: %s", posix::to_string(r)); + throw std::system_error(error_msg); + } + +public: + explicit semaphore(int32_t n) : current_(n) {} + semaphore(semaphore const&) = delete; + semaphore& operator=(semaphore const&) = delete; + + template + bool wait_for(std::chrono::duration d) { + uint32_t val = gcc::atomic_load(remaining_, std::memory_order_relaxed); + if (val > 0 && try_decrease(&val)) { + return true; + } + + do { + auto const begin = std::chrono::high_resolution_clock::now(); + + UTL_ON_SCOPE_EXIT { + using clock_duration = decltype(std::chrono::high_resolution_clock::now() - begin); + auto const min_val = std::min>( + (std::chrono::high_resolution_clock::now() - begin), d); + + d -= min_val; + }; + + auto const r = posix::futex_wait(current_, &val, d); + if (r == posix::result::success) { + if (try_decrease(&val)) { + return true; + } + + continue; + } + + if (r == posix::result::timeout) { + return false; + } + if (r == posix::result::interrupted) { + continue; + } + + throw_system_error(r); + } while (val == 0); + } + + void wait() { + int32_t val = 0; + while (val == 0) { + auto const r = posix::futex_wait(current_, &val, d); + if (r == posix::result::success) { + if (try_decrease(&val)) { + return; + } + + continue; + } + + if (r == posix::result::interrupted) { + continue; + } + + throw_system_error(r); + } + } + + void signal() { + gcc::atomic_fetch_add(current_, 1, std::memory_order_acq_rel); + posix::futex_notify_one(current_); + } + +private: + bool try_decrease(uint32_t* current_val) { + auto& val = *current_val; + auto new_val = val - 1; + while (!gcc::compare_exchange( + current_, &val, new_val, std::memory_order_release, std::memory_order_relaxed)) { + + if (val == 0) { + return false; + } + + new_val = val - 1; + } + + gcc::atomic_thread_fence(std::memory_order_acquire); + return true; + } + + uint32_t current_; +}; + +} // namespace utl::experimental diff --git a/src/experimental/public/thread_pool.h b/src/experimental/public/thread_pool.h new file mode 100644 index 00000000..c9670860 --- /dev/null +++ b/src/experimental/public/thread_pool.h @@ -0,0 +1,505 @@ +#pragma once +#include "latch.h" +#include "semaphore.h" +#include "utl_config.h" + +#include +#include +#include +#include +#include +#include +// todo intrusive_ptr +// todo atomic_reference_counter + +namespace utl::experimental { +using size_t = decltype(sizeof(0)); + +class job_handle; +class jthread : public std::thread { +public: + using thread::thread; + + ~jthread() { + if (this->joinable()) { + this->join(); + } + } +}; + +inline constexpr size_t dynamic_extent = (size_t)-1; + +template +class thread_pool; + +namespace details { +namespace thread_pool { + +class completion_interface { +public: + struct deleter { + void operator()(completion_interface* ptr) const noexcept { ptr->~completion_interface(); } + }; + + virtual bool wait_for(std::chrono::microseconds) = 0; + virtual void wait() = 0; + virtual bool try_wait() const = 0; + + template + bool wait_for(std::chrono::duration d) { + return this->wait_for(std::chrono::duration_cast(d)); + } + +protected: + completion_interface(completion_interface const&) = delete; + completion_interface& operator=(completion_interface const&) = delete; + completion_interface() = default; + virtual ~completion_interface() = default; +}; + +class execution_interface { +public: + virtual size_t count() const noexcept = 0; + virtual void execute(size_t thread_idx, size_t job_idx) final = 0; + +protected: + execution_interface(execution_interface const&) = delete; + execution_interface& operator=(execution_interface const&) = delete; + execution_interface() = default; + virtual ~execution_interface() = default; +}; + +template +class multi_execution : public completion_interface, public execution_interface { + +public: + template + multi_execution(Args&&... args) : executions(std::forward(args)...) {} + + ~multi_execution() final = default; + + size_t count() const noexcept final { return sizeof...(Fs); } + + void execute(size_t thread_idx, size_t job_idx) final { + execute(thread_idx, job_idx, index_sequence_for{}); + } + +private: + bool wait_for(std::chrono::microseconds d) final { + return latch.wait_for(d) && latch.try_wait(); + } + + void wait() final { + latch.wait(); + std::ignore = try_wait(); + } + + bool try_wait() const final { + if (!latch.try_wait()) { + return false; + } + + propogate_exceptions(); + return true; + } + + template + void execute(size_t, size_t job_idx, std::index_sequence) { + static constexpr auto vtable[] = {&multi_execution::execute...}; + (this->*vtable[job_idx])(); + } + + template + void execute() { + UTL_ON_SCOPE_EXIT { + latch.count_down(); + }; + try { + std::get(executions)(); + } catch (std::exception const& e) { + std::get(exceptions)(std::current_exception()); + } catch (...) { + std::terminate(); + } + } + template + void propogate_exceptions(std::index_sequence) const { + std::atomic_thread_fence(std::memory_order_acquire); + if ((... || std::get(executions))) { + throw std::runtime_error("Uncaught exception"); + } + } + + void propogate_exceptions() const { propogate_exceptions(index_sequence_for{}); } + + std::tuple exceptions; + std::tuple executions; + latch latch; +}; + +template +class single_execution : public completion_interface, public execution_interface { +public: + template + job_execution(A&& arg) : execution(std::forward(arg)) + , latch(1) {} + ~job_execution() final = default; + + constexpr size_t count() const noexcept { return 1; } + + void execute(size_t, size_t) { execute(); } + +private: + bool wait_for(std::chrono::microseconds d) final { + return latch.wait_for(d) && latch.try_wait(); + } + + void wait() final { + latch.wait(); + if (exception) { + rethrow_exception(exception); + } + } + + bool try_wait() const final { + if (!latch.try_wait()) { + return true; + } + + if (exception) { + rethrow_exception(exception); + } + + return true; + } + + void execute() { + UTL_ON_SCOPE_EXIT { + latch.count_down(); + }; + try { + execution(); + } catch (std::exception const& e) { + exception = std::current_exception(); + } catch (...) { + std::terminate(); + } + } + + std::exception_ptr exception; + F execution; + latch latch; +}; + +template +class parallel_execution : public completion_interface, public execution_interface { +public: + template + parallel_execution(A&& arg, size_t count) : execution(std::forward(arg)) + , latch(count) {} + ~parallel_execution() final = default; + + size_t count() const noexcept { return count; } + + void execute(size_t thread_idx, size_t job_idx) { + UTL_ON_SCOPE_EXIT { + latch.count_down(); + }; + try { + execution(thread_idx, job_idx); + } catch (std::exception const& e) { + exceptions_thrown.store(true, std::memory_order_release); + } catch (...) { + std::terminate(); + } + } + +private: + bool wait_for(std::chrono::microseconds d) final { + return latch.wait_for(d) && latch.try_wait(); + } + + void wait() final { + latch.wait(); + if (exceptions_thrown.load(std::memory_order_acquire)) { + // TODO + throw std::runtime_error("Uncaught exception"); + } + } + + bool try_wait() const final { + if (!latch.try_wait()) { + return true; + } + + if (exceptions_thrown.load(std::memory_order_acquire)) { + // TODO + throw std::runtime_error("Uncaught exception"); + } + + return true; + } + + // TODO handle exception properly + std::atomic exceptions_thrown = false; + size_t count; + F execution; + latch latch; +}; + +template +class job_collection : public completion_interface { + +public: + template = 0> + job_collection(J const&... header_ptr) : jobs_{header_ptr...} {} + +private: + bool wait_for(std::chrono::microseconds d) final { + for (auto const& ptr : jobs_) { + auto const begin = std::chrono::high_resolution_clock::now(); + if (!ptr->wait_for(d)) { + return; + } + + using clock_duration = decltype(std::chrono::high_resolution_clock::now() - begin); + auto const min_val = std::min>( + (std::chrono::high_resolution_clock::now() - begin), d); + + d -= min_val; + } + } + void wait() final { + for (auto const& ptr : jobs_) { + ptr->wait(); + } + } + + bool try_wait() const final { + return std::all_of( + begin(jobs_), end(jobs_), [](auto const& ptr) { return ptr->try_wait(); }); + } + + std::array> jobs_; +}; + +class pool_handle : private utl::atomic_reference_count { + template + friend class ::thread_pool; + + static constexpr struct construct_tag_t { + } construct_tag = {}; + +public: + virtual ~pool_handle() = default; + + template + void schedule_execution(Executor& execution) { + auto const count = execution.count(); + } + +protected: + pool_handle(size_t count) : threads_(new jthread[count]), size_(count) {} + +private: + std::unique_ptr threads_; + size_t size_; +}; + +class job_header : private utl::atomic_reference_count { +public: + job_header() = default; + +protected: + void set_completion(completion_interface* interface) { completion_.reset(interface); } + +private: + utl::intrusive_ptr dependency_; + std::unique_ptr completion_; +}; + +template +class job : public job_header, public T { + static_assert(std::is_base_of_v, "Invalid job"); + static_assert(std::is_base_of_v, "Invalid job"); + using execution_base = T; + +public: + template + job(pool_handle& pool, Args&&... args) : execution_base{std::forward(args)...} { + job_header::set_completion(this); + } +}; + +} // namespace thread_pool +} // namespace details + +class job_handle { + using pool_type = details::thread_pool::pool_handle; + using header_type = details::thread_pool::job_header; + using header_ptr = utl::intrusive_ptr; + + template + static auto create_handle(intrusive_ptr pool, Fs&&... f); + + template , header_ptr>), bool> = true> + static auto create_collection_header(intrusive_ptr pool, Headers&&... headers) {} + + template , header_ptr>), bool> = true> + job_handle(intrusive_ptr pool, Headers&&... headers) + : pool_(std::move(pool)) + , job_(create_collection_header(pool_, std::forward(headers)...)) {} + +public: + template 1) && ... && std::is_same_v, job_handle>), bool> = + true> + static job_handle combine(Jobs&&... jobs) { + Jobs const* array[]{&jobs...}; + auto const& pool = (*array)->pool_; + if (!std::all_of( + array, array + sizeof...(Jobs), [&](auto ptr) { return ptr->pool_ == pool; })) { + throw std::runtime_error("[job_handle] only jobs from the same pool can be combined"); + } + + return job_handle(pool, std::forward(jobs).job_...); + } + +public: + job_handle() = default; + job_handle(job_handle const&) = default; + job_handle& operator=(job_handle const&) = default; + job_handle(job_handle&&) noexcept = default; + job_handle& operator=(job_handle&&) noexcept = default; + + template + explicit job_handle(intrusive_ptr pool, Fs&&... f) + : job_(create_handle(std::move(pool), std::forward(f)...)) {} + + void wait() { + if (job_) { + job_->wait(); + } + } + + bool try_wait() const { return !job_ || job_->try_wait(); } + + ~job_handle() { wait(); } + +private: + utl::intrusive_ptr pool_; + utl::intrusive_ptr job_; +}; + +class thread_pool { + template + using multi_handle_t = + std::std::enable_if_t<((sizeof...(Fs) > 1) && ... && std::is_invocable_v), job_handle>; + template + using multi_result_t = + std::std::enable_if_t<(sizeof...(Fs) > 1), std::tuple...>>; + template + using single_handle_t = std::enable_if_t, job_handle>; + + using pool_handle_t = details::thread_pool::pool_handle; + + template + using parallel_handle_t = std::enable_if_t, job_handle>; + template + using parallel_t = std::enable_if_t, void>; + + using init_arg_t = std::conditional_t; + +public: + constexpr explicit thread_pool(size_t n) noexcept : size_(n) {} + constexpr size_t size() const noexcept { return size_; } + + template + single_handle_t schedule(F&& execution) { + return job_handle(pool_handle(), std::forward(execution)); + } + + template + multi_handle_t schedule(Fs&&... executions) {} + + template + parallel_handle_t schedule(size_t count, F&& executions); + + template + single_handle_t schedule(job_handle dependency, F&& execution); + + template + multi_handle_t schedule(job_handle dependency, Fs&&... executions); + + template + parallel_handle_t schedule(job_handle dependency, size_t count, F&& executions); + + template + std::invoke_result_t execute(F&& execution) { + return std::invoke(std::forward(execution)); + } + + template + multi_result_t execute(Fs&&... executions) { + return multi_execute(index_sequence_for{}, std::forward(executions)...); + } + + template + parallel_t execute(size_t count, F&& execution) { + schedule(count, executions).wait(); + } + + template + std::invoke_result_t execute(job_handle dependency, F&& execution) { + return dependency.wait(), std::invoke(std::forward(execution)); + } + + template + multi_result_t execute(job_handle dependency, Fs&&... executions) { + dependency.wait(); + return multi_execute(index_sequence_for{}, std::forward(executions)...); + } + + template + parallel_t execute(job_handle dependency, size_t count, F&& execution) { + schedule(dependency, count, executions).wait(); + } + +private: + template + static auto tuple_executor(T& t, U& exe) { + return [&t, &exe]() { std::get(t).emplace(std::forward(std::get(exe))()); }; + } + + template + static std::tuple optional_to_tuple( + std::tuple...>&& t, std::index_sequence) { + return std::tuple{(*std::get(std::move(t)))...}; + } + + template + multi_result_t multi_execute(std::index_sequence, Fs&&... executions) { + std::tuple>...> t; + auto exe = std::forward_as_tuple(std::forward(executions)...); + + schedule(tuple_executor(t, exe)...).wait(); + + return optional_to_tuple(std::move(t), index_sequence{}); + } + + intrusive_ptr const& pool_handle() { + if (!pool_handle_) { + pool_handle_ = utl::make_intrusive_ptr(size()); + } + + return pool_handle_; + } + + intrusive_ptr pool_handle_; + size_t size_; +}; + +} // namespace utl::experimental diff --git a/src/utl/public/utl/memory/utl_intrusive_ptr.h b/src/utl/public/utl/memory/utl_intrusive_ptr.h index 50967746..881b1f39 100644 --- a/src/utl/public/utl/memory/utl_intrusive_ptr.h +++ b/src/utl/public/utl/memory/utl_intrusive_ptr.h @@ -2,7 +2,6 @@ #pragma once -#include "utl/atomic.h" #include "utl/compare/utl_pointer_comparable.h" #include "utl/exception/utl_program_exception.h" #include "utl/memory/utl_addressof.h" diff --git a/utl.sublime-project b/utl.sublime-project index 99b5e7d8..989ffafd 100644 --- a/utl.sublime-project +++ b/utl.sublime-project @@ -5,6 +5,10 @@ "path": "src/utl", "name": "UTL" }, + { + "path": "src/experimental", + "name": "EXP" + }, { "path": ".", "name": "Root", @@ -32,11 +36,11 @@ "variants": [ { - "name": "Compile Tests", + "name": "Compile Private", "shell_cmd": "make -C $project_path/src/utl compile -j8" }, { - "name": "Preprocess Tests", + "name": "Preprocess Private", "shell_cmd": "make -C $project_path/src/utl preprocess -j8" }, { @@ -44,6 +48,26 @@ "shell_cmd": "make -C $project_path/src/utl print -j8" } ] + }, + { + "name": "EXP Build", + "cancel": { "kill": true }, + "file_regex": "^(/...*?):([0-9]+):?([0-9]+)", + "variants": + [ + { + "name": "Compile Private", + "shell_cmd": "make -C $project_path/src/experimental compile -j8" + }, + { + "name": "Preprocess Private", + "shell_cmd": "make -C $project_path/src/experimental preprocess -j8" + }, + { + "name": "Print Build Variables", + "shell_cmd": "make -C $project_path/src/experimental print -j8" + } + ] } ] From 129d38e0203b075f3e189757f02434eb4035c457 Mon Sep 17 00:00:00 2001 From: Bryan Wong Date: Sun, 2 Jun 2024 21:57:47 +0900 Subject: [PATCH 2/8] add notes --- src/experimental/public/thread_pool.h | 32 ++++++++++++++++++--------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/experimental/public/thread_pool.h b/src/experimental/public/thread_pool.h index c9670860..82ffbffe 100644 --- a/src/experimental/public/thread_pool.h +++ b/src/experimental/public/thread_pool.h @@ -12,6 +12,14 @@ // todo intrusive_ptr // todo atomic_reference_counter +/** + * Notes + * + * We have a "plan" API which creates a dependency graph + * + * We then have an execution API which executes the jobs, either async or sync + */ + namespace utl::experimental { using size_t = decltype(sizeof(0)); @@ -314,7 +322,7 @@ class job_header : private utl::atomic_reference_count { void set_completion(completion_interface* interface) { completion_.reset(interface); } private: - utl::intrusive_ptr dependency_; + utl::intrusive_ptr dependent_; std::unique_ptr completion_; }; @@ -418,24 +426,24 @@ class thread_pool { constexpr size_t size() const noexcept { return size_; } template - single_handle_t schedule(F&& execution) { - return job_handle(pool_handle(), std::forward(execution)); + single_handle_t plan(F&& execution) { + return job_handle(std::forward(execution)); } template - multi_handle_t schedule(Fs&&... executions) {} + multi_handle_t plan(Fs&&... executions) {} template - parallel_handle_t schedule(size_t count, F&& executions); + parallel_handle_t plan(size_t count, F&& executions); template - single_handle_t schedule(job_handle dependency, F&& execution); + single_handle_t plan(job_handle& dependency, F&& execution); template - multi_handle_t schedule(job_handle dependency, Fs&&... executions); + multi_handle_t plan(job_handle& dependency, Fs&&... executions); template - parallel_handle_t schedule(job_handle dependency, size_t count, F&& executions); + parallel_handle_t plan(job_handle& dependency, size_t count, F&& executions); template std::invoke_result_t execute(F&& execution) { @@ -447,24 +455,26 @@ class thread_pool { return multi_execute(index_sequence_for{}, std::forward(executions)...); } + void execute(job_handle&& job); + template parallel_t execute(size_t count, F&& execution) { schedule(count, executions).wait(); } template - std::invoke_result_t execute(job_handle dependency, F&& execution) { + std::invoke_result_t execute(job_handle&& dependency, F&& execution) { return dependency.wait(), std::invoke(std::forward(execution)); } template - multi_result_t execute(job_handle dependency, Fs&&... executions) { + multi_result_t execute(job_handle&& dependency, Fs&&... executions) { dependency.wait(); return multi_execute(index_sequence_for{}, std::forward(executions)...); } template - parallel_t execute(job_handle dependency, size_t count, F&& execution) { + parallel_t execute(job_handle&& dependency, size_t count, F&& execution) { schedule(dependency, count, executions).wait(); } From 5b4066a587f9e69fe710419a8647540c672e2164 Mon Sep 17 00:00:00 2001 From: Bryan Wong Date: Sun, 2 Jun 2024 22:07:11 +0900 Subject: [PATCH 3/8] move handle --- src/experimental/public/thread_pool.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/experimental/public/thread_pool.h b/src/experimental/public/thread_pool.h index 82ffbffe..d2977df0 100644 --- a/src/experimental/public/thread_pool.h +++ b/src/experimental/public/thread_pool.h @@ -437,13 +437,13 @@ class thread_pool { parallel_handle_t plan(size_t count, F&& executions); template - single_handle_t plan(job_handle& dependency, F&& execution); + single_handle_t plan(job_handle&& dependency, F&& execution); template - multi_handle_t plan(job_handle& dependency, Fs&&... executions); + multi_handle_t plan(job_handle&& dependency, Fs&&... executions); template - parallel_handle_t plan(job_handle& dependency, size_t count, F&& executions); + parallel_handle_t plan(job_handle&& dependency, size_t count, F&& executions); template std::invoke_result_t execute(F&& execution) { From 14f68631bc6454b1674c975da712d519ac7f6aba Mon Sep 17 00:00:00 2001 From: Bryan Wong Date: Fri, 21 Jun 2024 23:55:50 +0900 Subject: [PATCH 4/8] Oh hold --- src/experimental/public/job_handle.h | 282 ++++++++++ src/experimental/public/pooled_thread.h | 0 src/experimental/public/thread_pool.h | 704 ++++++++++-------------- utl.sublime-project | 3 + 4 files changed, 574 insertions(+), 415 deletions(-) create mode 100644 src/experimental/public/job_handle.h delete mode 100644 src/experimental/public/pooled_thread.h diff --git a/src/experimental/public/job_handle.h b/src/experimental/public/job_handle.h new file mode 100644 index 00000000..ba82f5d9 --- /dev/null +++ b/src/experimental/public/job_handle.h @@ -0,0 +1,282 @@ + + +namespace utl::experimental { +namespace details::thread_pool { +class group_tag; +class dependency_tag; + +template +class job_node : private UTL_SCOPE intrusive::bidirectional_node> { +public: + using bidirectional_node::linked; + + friend constexpr T* node_cast(job_node* node) { + static_assert(UTL_SCOPE is_base_of::value); + return (T*)node; + } + + friend constexpr T& node_cast(job_node& node) { + static_assert(UTL_SCOPE is_base_of::value); + return (T&)(node); + } +}; + +template +job_node* node_cast(job_node* other) noexcept { + static_assert(UTL_SCOPE is_base_of, T>::value); + return (job_node*)node_cast(other); +} + +template +job_node& node_cast(job_node& other) noexcept { + static_assert(UTL_SCOPE is_base_of, T>::value); + return (job_node&)node_cast(other); +} + +template +job_node* node_cast(T* other) noexcept { + static_assert(UTL_SCOPE is_base_of, T>::value); + return (job_node*)other; +} + +template +job_node& node_cast(job_executable& other) noexcept { + static_assert(UTL_SCOPE is_base_of, T>::value); + return (job_node&)other; +} + +template +using group_node = job_node; +template +using dependency_node = job_node; + +class job_executable : group_node, dependency_node { +public: + virtual size_t size() const noexcept = 0; + virtual void execute(size_t thread_idx, size_t job_idx) = 0; + virtual ~job_executable() noexcept = default; +}; + +template +class single_executor : public job_executable { +public: + template >> + single_executor(Args&&... args) noexcept(UTL_SCOPE is_nothrow_constructible_v) + : callable_(UTL_SCOPE forward(args)...) {} + +private: + UTL_CONSTEXPR_CXX20 size_t size() const noexcept final { return 1; } + void execute(size_t thread_idx, size_t job_idx) final { callable_(); } + UTL_ATTRIBUTE(NO_UNIQUE_ADDRESS) F callable_; +}; + +template +class multi_executor : public job_executable { +public: + template >> + multi_executor(Args&&... args) noexcept(UTL_SCOPE is_nothrow_constructible_v) + : callables_(UTL_SCOPE forward(args)...) {} + +private: + UTL_CONSTEXPR_CXX20 size_t size() const noexcept final { return sizeof...(Fs); } + + template + void execute() { + UTL_SCOPE get(callables_)(); + } + + template + void execute(size_t job_idx, UTL_SCOPE index_sequence) { + static_assert( + UTL_SCOPE + is_same_v, UTL_SCOPE index_sequence_for>, + ""); + UTL_ASSERT(job_idx < sizeof...(Is)); + static constexpr auto vtable[] = {&multi_executor::execute...}; + (this->*vtable[job_idx])(); + } + + void execute(size_t, size_t job_idx) final { + static constexpr UTL_SCOPE index_sequence_for sequence = {}; + execute(job_idx, sequence); + } + + UTL_ATTRIBUTE(NO_UNIQUE_ADDRESS) UTL_SCOPE tuple callables_; +}; + +template +class parallel_executor : public job_executable { +public: + template >> + parallel_executor(size_t size, Args&&... args) noexcept( + UTL_SCOPE is_nothrow_constructible_v) + : callable_(UTL_SCOPE forward(args)...) + , size_(size) {} + +private: + size_t size() const noexcept final { return size_; } + void execute(size_t thread_idx, size_t job_idx) final { callable_(thread_idx, job_idx); } + + UTL_ATTRIBUTE(NO_UNIQUE_ADDRESS) F callable_; + size_t size_; +}; + +} // namespace details::thread_pool + +class job_handle { + template + using type_for = T; + using job_executable = details::thread_pool::job_executable; + template + using node_type = job_node; + +public: + template + [[nodiscard]] static job_handle combine(job_handle&& first, Jobs&&... jobs) { + static constexpr UTL_SCOPE make_index_sequence sequence = {}; + static_assert(sizeof...(Jobs) > 0); + static_assert(!(... || UTL_SCOPE is_reference_v)); + static_assert((... && UTL_SCOPE is_convertible_v)); + return (move(first) + ... + move(jobs)); + } + + job_handle() noexcept = default; + template + explicit job_handle(F&& callable) + : job_(new details::thread_pool::single_executor>( + UTL_SCOPE forward(callable))) {} + template 1 && UTL_SCOPE conjunction...>::value)> + explicit job_handle(Fs&&... callables) + : job_(new details::thread_pool::multi_executor...>( + UTL_SCOPE forward(callables)...)) {} + template ) F UTL_REQUIRES_CXX11( + UTL_TRAIT_is_invocable(F, size_t, size_t))> + explicit job_handle(size_t size, F&& callable) + : job_(new details::thread_pool::parallel_executor>( + size, UTL_SCOPE forward(callable))) {} + + template + explicit job_handle(job_handle&& dependency, F&& callable) + : job_((job_handle(UTL_SCOPE forward(callable)) << UTL_SCOPE move(dependency))) { + *this = (UTL_SCOPE move(*this) << UTL_SCOPE move(dependency)); + } + template 1 && UTL_SCOPE conjunction...>::value)> + explicit job_handle(job_handle&& dependency, Fs&&... callables) + : job_((job_handle(size, UTL_SCOPE forward(callable)...) + << UTL_SCOPE move(dependency))) {} + template ) F UTL_REQUIRES_CXX11( + UTL_TRAIT_is_invocable(F, size_t, size_t))> + explicit job_handle(job_handle&& dependency, size_t size, F&& callable) + : job_handle( + (job_handle(size, UTL_SCOPE forward(callable)) << UTL_SCOPE move(dependency))) {} + + job_handle(job_handle const&) = delete; + job_handle& operator=(job_handle const&) = delete; + job_handle(job_handle&& other) noexcept : job_(UTL_SCOPE exchange(other.job_, nullptr)) {} + job_handle& operator=(job_handle&& other) noexcept { + auto to_delete = UTL_SCOPE exchange(job_, UTL_SCOPE exchange(other.job_, nullptr)); + destroy(); + } + + ~job_handle() noexcept { destroy(); } + + [[nodiscard]] explicit operator bool() const noexcept { return to_delete != nullptr; } + +private: + explicit job_handle(job_executable* job) noexcept : job_(job) {} + + template + static constexpr void splice_back(node_type* list, node_type* list2) noexcept { + auto head2 = list2; + auto tail2 = get_prev(*list2); + auto tail = get_prev(*list); + set_next(*tail2, list); + set_prev(*head2, tail); + set_prev(*list, tail2); + set_next(*tail, head2); + } + + static constexpr void destroy(dependency_node* ptr) noexcept { + dependency_node* const origin = ptr; + do { + auto next = get_next(*ptr); + + auto const group = node_cast(ptr); + for (auto c = get_next(*group); c != group; c = get_next(*c)) { + splice_back(origin, node_cast(c)); + } + + delete ptr; + ptr = next; + } while (origin != ptr); + } + + void destroy() noexcept { + auto ptr = UTL_SCOPE exchange(job_, nullptr); + if (ptr != nullptr) { + destroy(ptr); + } + } + + template + node_type* get() const noexcept { + return node_cast(job_); + } + + template + node_type* release() noexcept { + return node_cast(UTL_SCOPE exchange(job_, nullptr)); + } + + [[nodiscard]] friend job_handle operator+(job_handle&& left, job_handle&& right) UTL_THROWS { + auto left_ptr = left.release(); + auto right_ptr = right.release(); + if (left_ptr == nullptr) { + return {right_ptr}; + } + if (right_ptr == nullptr) { + return {left_ptr}; + } + + splice_back(left_ptr, right_ptr); + return {left_ptr}; + } + + friend job_handle& operator+=(job_handle& left, job_handle&& other) UTL_THROWS { + return left = UTL_SCOPE move(left) + UTL_SCOPE move(other); + } + + [[nodiscard]] friend job_handle operator>>( + job_handle&& upstream, job_handle&& downstream) UTL_THROWS { + return move(downstream) << move(upstream); + } + + [[nodiscard]] friend job_handle operator<<( + job_handle&& dependent, job_handle&& dependency) UTL_THROWS { + + auto downstream = dependent.release(); + auto upstream = dependency.release(); + if (downstream == nullptr) { + return job_handle{node_cast(upstream)}; + } + if (upstream == nullptr) { + return job_handle{node_cast(downstream)}; + } + + auto downstream_previous = get_previous(*downstream); + bool const has_dependencies_already = downstream_previous != downstream; + if (has_dependencies_already) { + splice_back(node_cast(downstream_previous), node_cast(upstream)); + } else { + splice_back(upstream, downstream); + } + + return job_handle{downstream}; + } + + job_executable* job_; +}; + +} // namespace utl::experimental diff --git a/src/experimental/public/pooled_thread.h b/src/experimental/public/pooled_thread.h deleted file mode 100644 index e69de29b..00000000 diff --git a/src/experimental/public/thread_pool.h b/src/experimental/public/thread_pool.h index d2977df0..1f104eba 100644 --- a/src/experimental/public/thread_pool.h +++ b/src/experimental/public/thread_pool.h @@ -12,504 +12,378 @@ // todo intrusive_ptr // todo atomic_reference_counter -/** - * Notes - * - * We have a "plan" API which creates a dependency graph - * - * We then have an execution API which executes the jobs, either async or sync - */ - -namespace utl::experimental { -using size_t = decltype(sizeof(0)); - -class job_handle; -class jthread : public std::thread { -public: - using thread::thread; +/* + + +UTL_NAMESPACE_BEGIN + +template +class group_t; +template +class graph_t; +template +class scatter_t; +UTL_INLINE_CXX17 constexpr class identity_t { +} identity = {}; +template +struct unwrap; + +template +struct is_group : UTL_SCOPE false_type {}; +template +struct is_group> : UTL_SCOPE true_type {}; +template +struct is_graph : UTL_SCOPE false_type {}; +template +struct is_graph> : UTL_SCOPE true_type {}; +template +struct is_scatter : UTL_SCOPE false_type {}; +template +struct is_scatter> : UTL_SCOPE bool_constant<(sizeof...(Ts) <= 2)> {}; + +template