Skip to content

Commit 0e7e70c

Browse files
apacheGH-47384: [C++][Acero] Isolate BackpressureHandler from ExecNode (apache#47386)
### Rationale for this change Current BackpressureHandler needs to be provided with ExecNode, however the backpressure concept can be applied outside of ExecNode. It is currently needed to Facilitate ForceShutdown in AsofJoinNode. Current implementation however is not elegant and may still lead to deadlock after ForceShutdown in extreme case - when several batched get pushed after ForceShutdown and exceed the threshold. ### What changes are included in this PR? -Remove ForceShutdown from BackpressureHandler. -Reimplement ForceShutdown in BackpressureConcurrentQueue as onetime nonrecoverable queue clear that effectively unpauses source using handler. ### Are these changes tested? There is no new functionality. Current tests should cover it. ### Are there any user-facing changes? No * GitHub Issue: apache#47384 Lead-authored-by: Rafał Hibner <rafal.hibner@secom.com.pl> Co-authored-by: Rossi Sun <zanmato1984@gmail.com> Co-authored-by: gitmodimo <g.modimo@gmail.com> Signed-off-by: Rossi Sun <zanmato1984@gmail.com>
1 parent 7ef5648 commit 0e7e70c

File tree

5 files changed

+52
-32
lines changed

5 files changed

+52
-32
lines changed

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -514,9 +514,9 @@ class InputState : public util::SerialSequencingQueue::Processor {
514514
std::unique_ptr<BackpressureControl> backpressure_control =
515515
std::make_unique<BackpressureController>(
516516
/*node=*/asof_input, /*output=*/asof_node, backpressure_counter);
517-
ARROW_ASSIGN_OR_RAISE(
518-
auto handler, BackpressureHandler::Make(asof_input, low_threshold, high_threshold,
519-
std::move(backpressure_control)));
517+
ARROW_ASSIGN_OR_RAISE(auto handler,
518+
BackpressureHandler::Make(low_threshold, high_threshold,
519+
std::move(backpressure_control)));
520520
return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
521521
key_hasher, asof_node, std::move(handler), schema,
522522
time_col_index, key_col_index);
@@ -763,10 +763,10 @@ class InputState : public util::SerialSequencingQueue::Processor {
763763
total_batches_ = n;
764764
}
765765

766-
Status ForceShutdown() {
766+
void ForceShutdown() {
767767
// Force the upstream input node to unpause. Necessary to avoid deadlock when we
768768
// terminate the process thread
769-
return queue_.ForceShutdown();
769+
queue_.ForceShutdown();
770770
}
771771

772772
private:
@@ -1046,8 +1046,10 @@ class AsofJoinNode : public ExecNode {
10461046
if (st.ok()) {
10471047
st = output_->InputFinished(this, batches_produced_);
10481048
}
1049-
for (const auto& s : state_) {
1050-
st &= s->ForceShutdown();
1049+
for (size_t i = 0; i < state_.size(); ++i) {
1050+
const auto& s = state_[i];
1051+
s->ForceShutdown();
1052+
st &= inputs_[i]->StopProducing();
10511053
}
10521054
}));
10531055
}
@@ -1499,8 +1501,11 @@ class AsofJoinNode : public ExecNode {
14991501
if (st.ok()) {
15001502
st = output_->InputFinished(this, batches_produced_);
15011503
}
1502-
for (const auto& s : state_) {
1503-
st &= s->ForceShutdown();
1504+
1505+
for (size_t i = 0; i < state_.size(); ++i) {
1506+
const auto& s = state_[i];
1507+
s->ForceShutdown();
1508+
st &= inputs_[i]->StopProducing();
15041509
}
15051510
}
15061511

cpp/src/arrow/acero/backpressure_handler.h

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,15 @@ namespace arrow::acero {
2525

2626
class BackpressureHandler {
2727
private:
28-
BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
28+
BackpressureHandler(size_t low_threshold, size_t high_threshold,
2929
std::unique_ptr<BackpressureControl> backpressure_control)
30-
: input_(input),
31-
low_threshold_(low_threshold),
30+
: low_threshold_(low_threshold),
3231
high_threshold_(high_threshold),
3332
backpressure_control_(std::move(backpressure_control)) {}
3433

3534
public:
3635
static Result<BackpressureHandler> Make(
37-
ExecNode* input, size_t low_threshold, size_t high_threshold,
36+
size_t low_threshold, size_t high_threshold,
3837
std::unique_ptr<BackpressureControl> backpressure_control) {
3938
if (low_threshold >= high_threshold) {
4039
return Status::Invalid("low threshold (", low_threshold,
@@ -43,7 +42,7 @@ class BackpressureHandler {
4342
if (backpressure_control == NULLPTR) {
4443
return Status::Invalid("null backpressure control parameter");
4544
}
46-
BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
45+
BackpressureHandler backpressure_handler(low_threshold, high_threshold,
4746
std::move(backpressure_control));
4847
return backpressure_handler;
4948
}
@@ -56,16 +55,7 @@ class BackpressureHandler {
5655
}
5756
}
5857

59-
Status ForceShutdown() {
60-
// It may be unintuitive to call Resume() here, but this is to avoid a deadlock.
61-
// Since acero's executor won't terminate if any one node is paused, we need to
62-
// force resume the node before stopping production.
63-
backpressure_control_->Resume();
64-
return input_->StopProducing();
65-
}
66-
6758
private:
68-
ExecNode* input_;
6959
size_t low_threshold_;
7060
size_t high_threshold_;
7161
std::unique_ptr<BackpressureControl> backpressure_control_;

cpp/src/arrow/acero/concurrent_queue_internal.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class ConcurrentQueue {
113113
};
114114

115115
template <typename T>
116-
class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
116+
class BackpressureConcurrentQueue : private ConcurrentQueue<T> {
117117
private:
118118
struct DoHandle {
119119
explicit DoHandle(BackpressureConcurrentQueue& queue)
@@ -134,6 +134,9 @@ class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
134134
explicit BackpressureConcurrentQueue(BackpressureHandler handler)
135135
: handler_(std::move(handler)) {}
136136

137+
using ConcurrentQueue<T>::Empty;
138+
using ConcurrentQueue<T>::Front;
139+
137140
// Pops the last item from the queue but waits if the queue is empty until new items are
138141
// pushed.
139142
T WaitAndPop() {
@@ -152,6 +155,7 @@ class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
152155

153156
// Pushes an item to the queue
154157
void Push(const T& item) {
158+
if (shutdown_) return;
155159
std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
156160
DoHandle do_handle(*this);
157161
ConcurrentQueue<T>::PushUnlocked(item);
@@ -164,10 +168,14 @@ class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
164168
ConcurrentQueue<T>::ClearUnlocked();
165169
}
166170

167-
Status ForceShutdown() { return handler_.ForceShutdown(); }
171+
void ForceShutdown() {
172+
shutdown_ = true;
173+
Clear();
174+
}
168175

169176
private:
170177
BackpressureHandler handler_;
178+
std::atomic<bool> shutdown_{false};
171179
};
172180

173181
} // namespace arrow::acero

cpp/src/arrow/acero/sorted_merge_node.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class InputState {
119119
std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control =
120120
std::make_unique<BackpressureController>(input, output, backpressure_counter);
121121
ARROW_ASSIGN_OR_RAISE(auto handler,
122-
BackpressureHandler::Make(input, low_threshold, high_threshold,
122+
BackpressureHandler::Make(low_threshold, high_threshold,
123123
std::move(backpressure_control)));
124124
return PtrType(new InputState(index, std::move(handler), schema, time_col_index));
125125
}

cpp/src/arrow/acero/util_test.cc

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,7 @@ class TestBackpressureControl : public BackpressureControl {
263263
TEST(BackpressureConcurrentQueue, BasicTest) {
264264
BackpressureTestExecNode dummy_node;
265265
auto ctrl = std::make_unique<TestBackpressureControl>(&dummy_node);
266-
ASSERT_OK_AND_ASSIGN(auto handler,
267-
BackpressureHandler::Make(&dummy_node, 2, 4, std::move(ctrl)));
266+
ASSERT_OK_AND_ASSIGN(auto handler, BackpressureHandler::Make(2, 4, std::move(ctrl)));
268267
BackpressureConcurrentQueue<int> queue(std::move(handler));
269268

270269
ConcurrentQueueBasicTest(queue);
@@ -275,8 +274,7 @@ TEST(BackpressureConcurrentQueue, BasicTest) {
275274
TEST(BackpressureConcurrentQueue, BackpressureTest) {
276275
BackpressureTestExecNode dummy_node;
277276
auto ctrl = std::make_unique<TestBackpressureControl>(&dummy_node);
278-
ASSERT_OK_AND_ASSIGN(auto handler,
279-
BackpressureHandler::Make(&dummy_node, 2, 4, std::move(ctrl)));
277+
ASSERT_OK_AND_ASSIGN(auto handler, BackpressureHandler::Make(2, 4, std::move(ctrl)));
280278
BackpressureConcurrentQueue<int> queue(std::move(handler));
281279

282280
queue.Push(6);
@@ -299,9 +297,28 @@ TEST(BackpressureConcurrentQueue, BackpressureTest) {
299297
queue.Push(11);
300298
ASSERT_TRUE(dummy_node.paused);
301299
ASSERT_FALSE(dummy_node.stopped);
302-
ASSERT_OK(queue.ForceShutdown());
300+
queue.ForceShutdown();
301+
ASSERT_FALSE(dummy_node.paused);
302+
}
303+
304+
TEST(BackpressureConcurrentQueue, BackpressureTestStayUnpaused) {
305+
BackpressureTestExecNode dummy_node;
306+
auto ctrl = std::make_unique<TestBackpressureControl>(&dummy_node);
307+
ASSERT_OK_AND_ASSIGN(
308+
auto handler, BackpressureHandler::Make(/*low_threshold=*/2, /*high_threshold=*/4,
309+
std::move(ctrl)));
310+
BackpressureConcurrentQueue<int> queue(std::move(handler));
311+
312+
queue.Push(6);
313+
queue.Push(7);
314+
queue.Push(8);
315+
ASSERT_FALSE(dummy_node.paused);
316+
ASSERT_FALSE(dummy_node.stopped);
317+
queue.ForceShutdown();
318+
for (int i = 0; i < 10; ++i) {
319+
queue.Push(i);
320+
}
303321
ASSERT_FALSE(dummy_node.paused);
304-
ASSERT_TRUE(dummy_node.stopped);
305322
}
306323

307324
} // namespace acero

0 commit comments

Comments
 (0)