From 0624541e01216c65dbc13b01aedef2e24d50b7df Mon Sep 17 00:00:00 2001 From: tnagler Date: Mon, 15 Sep 2025 23:42:47 +0200 Subject: [PATCH 1/2] fix early termination issue --- inst/include/RcppThread/ThreadPool.hpp | 14 +++++++++++--- inst/include/RcppThread/quickpool.hpp | 10 ++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/inst/include/RcppThread/ThreadPool.hpp b/inst/include/RcppThread/ThreadPool.hpp index 340a9a6..58cb010 100644 --- a/inst/include/RcppThread/ThreadPool.hpp +++ b/inst/include/RcppThread/ThreadPool.hpp @@ -167,7 +167,7 @@ ThreadPool::map(F&& f, I&& items) //! The parallel equivalent is given by: //! ``` //! ThreadPool pool(2); -//! pool.forIndex(0, 10, [&] (size_t i) { +//! pool.parallelFor(0, 10, [&] (size_t i) { //! x[i] = i; //! }); //! ``` @@ -228,6 +228,11 @@ ThreadPool::parallelFor(int begin, int end, F f, size_t nBatches) //! ``` //! **Caution**: if the iterations are not independent from another, //! the tasks need to be synchronized manually (e.g., using mutexes). +//! +//' The worker callable `f(i)` should be **interrupt-friendly**: +//' insert `RcppThread::isInterrupted()` checks at a reasonable cadence and +//' **return early** when it is true. On user interrupt, RcppThread waits for +//' running tasks to reach a check point or finish, and then unwinds to R. template inline void ThreadPool::parallelForEach(I& items, F f, size_t nBatches) @@ -248,8 +253,11 @@ ThreadPool::wait() pool_->wait(100); Rcout << ""; Rcerr << ""; - checkUserInterrupt(); - + try { + checkUserInterrupt(); + } catch (const UserInterruptException& err) { + pool_->stop_and_reset(); + } } while (!pool_->done()); Rcout << ""; Rcerr << ""; diff --git a/inst/include/RcppThread/quickpool.hpp b/inst/include/RcppThread/quickpool.hpp index 1507e14..264c035 100644 --- a/inst/include/RcppThread/quickpool.hpp +++ b/inst/include/RcppThread/quickpool.hpp @@ -926,6 +926,15 @@ class ThreadPool //! @param millis if > 0: stops waiting after millis ms. void wait(size_t millis = 0) { task_manager_.wait_for_finish(millis); } + //! @brief Stops the pool, waits for all tasks to finish, resets to neutral + // state, and rethrows an exception if one is pending. + void stop_and_reset() + { + task_manager_.report_fail(std::current_exception()); + task_manager_.wait_for_finish(); + task_manager_.rethrow_exception(); + } + //! @brief checks whether all jobs are done. bool done() const { return task_manager_.done(); } @@ -939,6 +948,7 @@ class ThreadPool static void operator delete(void* ptr) { mem::aligned::free(ptr); } private: + //! joins all worker threads. void join_threads() { From 5524619f23cce19baf7c76ce3b9fdd9064c57cfa Mon Sep 17 00:00:00 2001 From: tnagler Date: Wed, 14 Jan 2026 16:09:53 +0100 Subject: [PATCH 2/2] print with R_ToplevelExec --- inst/include/RcppThread/RMonitor.hpp | 48 ++++++++++++++++++---------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/inst/include/RcppThread/RMonitor.hpp b/inst/include/RcppThread/RMonitor.hpp index 6b2a4fe..9b913de 100644 --- a/inst/include/RcppThread/RMonitor.hpp +++ b/inst/include/RcppThread/RMonitor.hpp @@ -90,17 +90,28 @@ class RMonitor { //! adds a printable version of `object` to a buffer for deferred printing. //! @param object a string or coercible object to print. template - void safelyPrint(const T& object) - { + void safelyPrint(const T& object, bool asErr = false) { + std::string to_print; + { std::lock_guard lk(m_); msgs_ << object; - if ( calledFromMainThread() && (msgs_.str() != std::string("")) ) { - // release messages in buffer - Rprintf("%s", msgs_.str().c_str()); - R_FlushConsole(); - // clear message buffer + + if (calledFromMainThread()) { + to_print = msgs_.str(); + if (!to_print.empty()) { msgs_.str(""); + msgs_.clear(); + } } + } // mutex released here, before touching R API + + if (!to_print.empty()) { + // Contain possible longjmp from Rgui/event loop + PrintData pd{&to_print}; + void (*printfun)(void*) = asErr ? &printErr : &print; + Rboolean ok = R_ToplevelExec(printfun, &pd); + if (!ok) throw UserInterruptException(); + } } //! prints `object` to R error stream íf called from main thread; otherwise @@ -109,18 +120,23 @@ class RMonitor { template void safelyPrintErr(const T& object) { - std::lock_guard lk(m_); - msgsErr_ << object; - if ( calledFromMainThread() && (msgsErr_.str() != std::string("")) ) { - // release messages in buffer - REprintf("%s", msgsErr_.str().c_str()); - // R_FlushConsole(); - // clear message buffer - msgsErr_.str(""); - } + safelyPrint(object, true); } private: + struct PrintData { std::string* s; }; + + static void print(void* data) { + auto* pd = static_cast(data); + Rprintf("%s", pd->s->c_str()); + R_FlushConsole(); + } + + static void printErr(void* data) { + auto* pd = static_cast(data); + REprintf("%s", pd->s->c_str()); + } + //! Ctors declared private, to instantiate class use `::instance()`. RMonitor(void) : isInterrupted_(false) {}