Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions inst/include/RcppThread/RMonitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,28 @@ class RMonitor {
//! adds a printable version of `object` to a buffer for deferred printing.
//! @param object a string or coercible object to print.
template<class T>
void safelyPrint(const T& object)
{
void safelyPrint(const T& object, bool asErr = false) {
std::string to_print;
{
std::lock_guard<std::mutex> lk(m_);
msgs_ << object;
if ( calledFromMainThread() && (msgs_.str() != std::string("")) ) {
// release messages in buffer
Rprintf("%s", msgs_.str().c_str());
R_FlushConsole();
// clear message buffer

if (calledFromMainThread()) {
to_print = msgs_.str();
if (!to_print.empty()) {
msgs_.str("");
msgs_.clear();
}
}
} // mutex released here, before touching R API

if (!to_print.empty()) {
// Contain possible longjmp from Rgui/event loop
PrintData pd{&to_print};
void (*printfun)(void*) = asErr ? &printErr : &print;
Rboolean ok = R_ToplevelExec(printfun, &pd);
if (!ok) throw UserInterruptException();
}
}

//! prints `object` to R error stream íf called from main thread; otherwise
Expand All @@ -109,18 +120,23 @@ class RMonitor {
template<class T>
void safelyPrintErr(const T& object)
{
std::lock_guard<std::mutex> lk(m_);
msgsErr_ << object;
if ( calledFromMainThread() && (msgsErr_.str() != std::string("")) ) {
// release messages in buffer
REprintf("%s", msgsErr_.str().c_str());
// R_FlushConsole();
// clear message buffer
msgsErr_.str("");
}
safelyPrint(object, true);
}

private:
struct PrintData { std::string* s; };

static void print(void* data) {
auto* pd = static_cast<PrintData*>(data);
Rprintf("%s", pd->s->c_str());
R_FlushConsole();
}

static void printErr(void* data) {
auto* pd = static_cast<PrintData*>(data);
REprintf("%s", pd->s->c_str());
}

//! Ctors declared private, to instantiate class use `::instance()`.
RMonitor(void) : isInterrupted_(false) {}

Expand Down
14 changes: 11 additions & 3 deletions inst/include/RcppThread/ThreadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ ThreadPool::map(F&& f, I&& items)
//! The parallel equivalent is given by:
//! ```
//! ThreadPool pool(2);
//! pool.forIndex(0, 10, [&] (size_t i) {
//! pool.parallelFor(0, 10, [&] (size_t i) {
//! x[i] = i;
//! });
//! ```
Expand Down Expand Up @@ -228,6 +228,11 @@ ThreadPool::parallelFor(int begin, int end, F f, size_t nBatches)
//! ```
//! **Caution**: if the iterations are not independent from another,
//! the tasks need to be synchronized manually (e.g., using mutexes).
//!
//' The worker callable `f(i)` should be **interrupt-friendly**:
//' insert `RcppThread::isInterrupted()` checks at a reasonable cadence and
//' **return early** when it is true. On user interrupt, RcppThread waits for
//' running tasks to reach a check point or finish, and then unwinds to R.
template<class F, class I>
inline void
ThreadPool::parallelForEach(I& items, F f, size_t nBatches)
Expand All @@ -248,8 +253,11 @@ ThreadPool::wait()
pool_->wait(100);
Rcout << "";
Rcerr << "";
checkUserInterrupt();

try {
checkUserInterrupt();
} catch (const UserInterruptException& err) {
pool_->stop_and_reset();
}
} while (!pool_->done());
Rcout << "";
Rcerr << "";
Expand Down
10 changes: 10 additions & 0 deletions inst/include/RcppThread/quickpool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,15 @@ class ThreadPool
//! @param millis if > 0: stops waiting after millis ms.
void wait(size_t millis = 0) { task_manager_.wait_for_finish(millis); }

//! @brief Stops the pool, waits for all tasks to finish, resets to neutral
// state, and rethrows an exception if one is pending.
void stop_and_reset()
{
task_manager_.report_fail(std::current_exception());
task_manager_.wait_for_finish();
task_manager_.rethrow_exception();
}

//! @brief checks whether all jobs are done.
bool done() const { return task_manager_.done(); }

Expand All @@ -939,6 +948,7 @@ class ThreadPool
static void operator delete(void* ptr) { mem::aligned::free(ptr); }

private:

//! joins all worker threads.
void join_threads()
{
Expand Down